#author("2023-05-02T01:05:26+09:00","default:TeleportDresser","TeleportDresser") #author("2023-05-06T17:34:53+09:00","default:TeleportDresser","TeleportDresser") [[3D_display_ex01]] #code(python){{ ## License: Apache 2.0. See LICENSE file in root directory. ## Copyright(c) 2017 Intel Corporation. All Rights Reserved. #20230506 ##################################################### ## Align Depth to Color ## ##################################################### # First import the library import pyrealsense2 as rs # Import Numpy for easy array manipulation import numpy as np # Import OpenCV for easy image rendering import cv2 import copy import socket import threading import sys from tkinter import * import tkinter as tk from tkinter import scrolledtext from PIL import Image, ImageTk, ImageOps PORT = 9998 class Com: def __init__(self,host): self.host=host self.connected=False self.soc=None self.handle_thread=None self.message_window=None def set_message_window(self,window): self.message_window=window def is_connected(sekf): return self.connected def connect(self): print("re connect to host "+self.host) try: if self.soc !=None: self.soc.connect((self.host,PORT)) else: #socket.socket ... socket を作成し、socに代入 self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #IPアドレスがhostのserver socketにsocを接続 self.soc.connect((self.host,PORT)) if self.handle_thread==None: #受信担当の関数handlerを物threadでうごかすhandle_threadを生成。 self.handle_thread=threading.Thread(target=self.handler) #handle_threadをstart self.handle_thread.start() self.connected=True if self.message_window: self.message_window.write_message("connected to "+self.host+":"+str(PORT)) except: print("connect error.") if self.message_window: self.message_window.write_message("error when connect to "+self.host+":"+str(PORT)) self.connected=False def send_command(self,command): try: self.soc.send(bytes(command,"utf-8")) except KeyboardInterrupt: if self.message_window: self.message_window.write_message("error when sends "+command + " to "+host+":"+str(PORT)) self.soc.close() exit() if self.soc!=None: try: self.soc.send(bytes(command,"utf-8")) except KeyboardInterrupt: if self.message_window: self.message_window.write_message("error when sends "+command + " to "+self.host+":"+str(PORT)) self.soc.close() exit() else: self.message_window.write_message("no socket when sends "+command + " to "+self.host+":"+str(PORT)) #受信の処理。送信threadとは別に、並行して処理を行う。 def handler(self): print("at client, connected, start handler") while True: try: data = self.soc.recv(1024) line="[receive]- {}".format(data.decode("utf-8")) print(line) if self.message_window: self.message_window.write_message("from "+self.host+":"+str(PORT)+", "+line) except socket.error: if self.message_window: self.message_window.write_message(self.host+":"+str(PORT)+" closed") self.soc.close() self.soc=None break exit() def close(self): self.soc.close() self.soc=None class Coms: def __init__(self): self.coms=[None,None,None,None,None,None,None,None] self.coms[0]=Com("192.168.13.21") self.coms[1]=Com("192.168.13.19") self.coms[2]=Com("192.168.13.20") self.coms[3]=Com("192.168.13.22") self.coms[4]=Com("192.168.13.12") self.coms[5]=Com("192.168.13.14") self.coms[6]=Com("192.168.13.15") self.coms[7]=Com("192.168.13.16") #self.coms[7]=Com("192.168.13.16") self.coms[7]=Com("192.168.13.7") #self.coms[7]=Com("192.168.1.22") def connect_all(self): for i in range(8): self.coms[i].connect() try: if self.coms[i]!=None: print("connect to coms["+str(i)+"].. ") self.coms[i].connect() else: print("coms["+str(i)+"] is None, connect, ") except: print("coms["+str(i)+"] connect error") def set_message_window(self,window): for i in range(8): if not self.coms[i]: if self.coms[i]!=None: self.coms[i].set_message_window(window) else: print("coms["+str(i)+"] is None, set_message_window, ") def send_command_to_i(self,i,command): self.coms[i].send_command(command) if self.coms[i]!=None: self.coms[i].send_command(command) def send_command_to_all(self,command): for i in range(8): self.coms[i].send_command(command) if self.coms[i]!=None: self.coms[i].send_command(command) def close_all(self): for i in range(8): self.coms[i].close() if self.coms[i]!=None: self.coms[i].close() class Command_Panel: def __init__(self,root,coms): self.coms=coms self.root=root self.root.title("command panel") self.root.configure(background="white") self.root.geometry("810x1500") self.root.geometry("810x1300") self.command_field_for_ith_server=Entry(self.root,width=40) self.command_field_for_ith_server.place(x=80,y=50) self.ith_server_field=Entry(self.root,width=5) self.ith_server_field.place(x=400,y=50) self.ith_enter_button=Button(self.root,text="enter",bd=4,bg='white',command=self.enter_ith_command,width=8,relief=RIDGE) self.ith_enter_button.place(x=500,y=50) self.command_field_for_all_server=Entry(self.root,width=50) self.command_field_for_all_server.place(x=80,y=90) self.all_enter_button=Button(self.root,text="enter",bd=4,bg='white',command=self.enter_all_command,width=8,relief=RIDGE) self.all_enter_button.place(x=500,y=90) self.message_frame=tk.Frame(self.root,width=50,height=30) self.message_frame.place(x=80,y=120) self.message_area=scrolledtext.ScrolledText(self.message_frame) self.message_area.pack() self.connect_button=Button(self.root, text="connect",bd=4,bg='white',command=self.click_connect_button, width=8, relief=RIDGE) self.connect_button.place(x=80,y=10) self.near_depth_field=Entry(self.root,width=5) self.near_depth_field.place(x=200,y=10) self.near_depth_field.insert(tk.END,"0.1") self.far_depth_field=Entry(self.root,width=5) self.far_depth_field.place(x=300,y=10) self.far_depth_field.insert(tk.END,"1.0") self.sending=BooleanVar() self.real_sense_sending_check=Checkbutton(self.root,variable=self.sending,text="real sense sending") self.real_sense_sending_check.place(x=400,y=10) self.sending.set(False) self.bg_removed_canvas=tk.Canvas(self.root,width=320,height=240) #self.bg_removed_canvas.bind('<Button-1>',self.bg_removed_canvas_click) self.bg_removed_canvas.place(x=80,y=750) self.bg_removed_canvas.place(x=80,y=600) self.depth_colormap_canvas=tk.Canvas(self.root,width=320,height=240) #self.depth_colormap_canvas.bind('<Button-1>',self.bg_removed_canvas_click) self.depth_colormap_canvas.place(x=400,y=750) self.depth_colormap_canvas.place(x=400,y=600) self.pic_panel_0_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_0_canvas.place(x=80,y=850) self.pic_panel_1_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_1_canvas.place(x=240,y=850) self.pic_panel_2_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_2_canvas.place(x=400,y=850) self.pic_panel_3_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_3_canvas.place(x=560,y=850) self.pic_panel_4_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_4_canvas.place(x=80,y=970) self.pic_panel_5_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_5_canvas.place(x=240,y=970) self.pic_panel_6_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_6_canvas.place(x=400,y=970) self.pic_panel_7_canvas=tk.Canvas(self.root,width=160,height=120) self.pic_panel_7_canvas.place(x=560,y=970) def enter_ith_command(self): command=self.command_field_for_ith_server.get() ith_s=self.ith_server_field.get() ith=int(ith_s) print("send "+command+" to "+str(ith)) self.write_message("send "+command+" to "+str(ith)) self.coms.send_command_to_i(ith,command) self.command_field_for_ith_server.delete(0,END) def enter_all_command(self): command=self.command_field_for_all_server.get() print("send "+command+" to all") self.write_message("send "+command+" to all") self.coms.send_command_to_all(command) self.command_field_for_all_server.delete(0,END) def write_message(self, message): self.message_area.insert(tk.END,message) self.message_area.insert(tk.END,'\n') def click_connect_button(self): self.coms.connect_all() class Get_Cross_Sections: def __init__(self,coms,panel): # Create a pipeline self.pipeline = rs.pipeline() self.coms=coms self.panel=panel # Create a config and configure the pipeline to stream # different resolutions of color and depth streams self.config = rs.config() self.is_quitting=False # Get device product line for setting a supporting resolution self.pipeline_wrapper = rs.pipeline_wrapper(self.pipeline) self.pipeline_profile = self.config.resolve(self.pipeline_wrapper) self.device = self.pipeline_profile.get_device() self.device_product_line = str(self.device.get_info(rs.camera_info.product_line)) self.found_rgb = False for s in self.device.sensors: if s.get_info(rs.camera_info.name) == 'RGB Camera': self.found_rgb = True break self.oh=480 self.ow=640 if not self.found_rgb: print("The demo requires Depth camera with Color sensor") exit(0) self.config.enable_stream(rs.stream.depth, self.ow, self.oh, rs.format.z16, 30) if self.device_product_line == 'L500': self.config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, 30) else: self.config.enable_stream(rs.stream.color, self.ow, self.oh, rs.format.bgr8, 30) # Start streaming self.profile = self.pipeline.start(self.config) # Getting the depth sensor's depth scale (see rs-align example for explanation) self.depth_sensor = self.profile.get_device().first_depth_sensor() self.depth_scale = self.depth_sensor.get_depth_scale() print("Depth Scale is: " , self.depth_scale) # We will be removing the background of objects more than # clipping_distance_in_meters meters away self.clipping_distance_in_meters = 1 #1 meter self.clipping_distance = self.clipping_distance_in_meters / self.depth_scale self.streaming_thread=threading.Thread(target=self.streaming_loop) self.streaming_thread.start() def streaming(self): # Create an align object # rs.align allows us to perform alignment of depth frames to others frames # The "align_to" is the stream type to which we plan to align depth frames. self.align_to = rs.stream.color self.align = rs.align(self.align_to) print("depth range near(m):") self.d_near=input() #print("depth range near(m):") #self.d_near=input() self.d_near=float(self.panel.near_depth_field.get()) print("depth range far(m):") self.d_far=input() print("depth range: ("+str(self.d_near)+" - "+str(self.d_far)) #self.d_far=input() self.d_far=float(self.panel.far_depth_field.get()) self.clipping_near = float(self.d_near) / self.depth_scale print("depth range: ("+str(self.d_near)+" - "+str(self.d_far)+")") self.ping_distance_in_meters = float(self.d_far) #1 meter self.ping_distance = self.clipping_distance_in_meters / self.depth_scale self.clipping_near = float(self.d_near) / self.depth_scale self.streaming_thread=threading.Thread(target=self.streaming) self.streaming_thread.start() def streaming(self): self.clipping_distance = self.clipping_distance_in_meters / self.depth_scale # Get frameset of color and depth frames = self.pipeline.wait_for_frames() # frames.get_depth_frame() is a 640(ow)x360(oh) depth image # Align the depth frame to color frame aligned_frames = self.align.process(frames) # Get aligned frames aligned_depth_frame = aligned_frames.get_depth_frame() # aligned_depth_frame is a 640x480 depth image color_frame = aligned_frames.get_color_frame() # Validate that both frames are valid if not aligned_depth_frame or not color_frame: return #continue depth_image = np.asanyarray(aligned_depth_frame.get_data()) color_image = np.asanyarray(color_frame.get_data()) # Remove background - Set pixels further than clipping_distance to grey grey_color = 153 black_color = 0 depth_image_3d = np.dstack((depth_image,depth_image,depth_image)) #depth image is 1 channel, color is 3 channels print("depth_image_3d[100,100]=",end="") print(depth_image_3d[100,100]) self.bg_removed = np.where((depth_image_3d > self.clipping_distance) | (depth_image_3d <= self.clipping_near), black_color, color_image) print("bg_removed[100,100]=",end="") print(self.bg_removed[100,100]) # Render images: # depth align to color on left # depth on right depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET) self.depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET) print("bg_removed[0,0]=",end="") print(self.bg_removed[0,0]) print("bg_removed.shape=",end="") print(self.bg_removed.shape) d_min=0 d_max=0 d_interval=(self.clipping_distance - self.clipping_near)/8 pic_plane=[[],[],[],[],[],[],[],[]] self.d_interval=(self.clipping_distance - self.clipping_near)/8 self.pic_plane=[[],[],[],[],[],[],[],[]] first=0 xii=-1 for ii in range(0,8): wa=copy.deepcopy(self.bg_removed) pic_plane[ii]=np.zeros_like(wa) for iw in range(0,self.ow): for ih in range(0,self.oh): dx=depth_image[ih,iw] if d_min>dx: d_min=dx if d_max<dx: d_max=dx dxx=dx-self.clipping_near pxx=dxx/d_interval ipxx=int(pxx) #for ii in range(0,8): # if ii==ipxx: # #print("dx="+str(dx),end="") # #print(",pxx="+str(pxx),end="") # #print(",ipxx="+str(ipxx),end="") # #print("("+str(ih)+","+str(iw)+")=",end="") # pic_plane[ii][ih,iw]=(int(bg_removed[ih,iw][0]),int(bg_removed[ih,iw][1]),int(bg_removed[ih,iw][2])) # #print(bg_removed[ih,iw]) # if bg_removed[ih,iw][0]!=0: # if first==0: # print("bg_removed["+str(ih)+","+str(iw)+"]="+str(bg_removed[ih,iw][0])) # xih=ih # xiw=iw # xii=ii # first=1 # else: # pic_plane[ii][ih,iw]=(black_color,black_color,black_color) if 0<=ipxx and ipxx<=7: print("pic_plane["+str(ipxx)+"]["+str(ih)+","+str(iw)+"]=",end="") print(pic_plane[ipxx][ih,iw]) #print("self.bg_removed[ih,iw][0]=",end="") #print(self.bg_removed[ih,iw][0]) pic_plane[ipxx][ih,iw][0]=0 #self.bg_removed[ih,iw][0] #pic_plane[ipxx][ih,iw][1]=self.bg_removed[ih,iw][1] #pic_plane[ipxx][ih,iw][2]=self.bg_removed[ih,iw][2] # # sin 3D display # w: 32.5cm, 16 # h: 50.0cm, 75 # z: 16.0cm, 8 # # for i in each ii, for each ih, # send "set4 ix(in ih) iy(in iw) r0,g0,b0 ... r3,g3,b3" # to i's display's p # sending="" for ii in range(0,7): self.pic_plane[ii]=np.zeros_like(wa) #print("pic_plane["+str(ii)+"]=",end="") #print(pic_plane[ii].shape) for iw in range(0,self.ow): for ih in range(0,self.oh): dx=depth_image[ih,iw] if d_min>dx: d_min=dx if d_max<dx: d_max=dx dxx=dx-self.clipping_near pxx=dxx/self.d_interval ipxx=int(pxx) if 0<=ipxx and ipxx<=7: self.pic_plane[ipxx][ih,iw]=self.bg_removed[ih,iw] if self.panel.sending.get(): # # pic_plane's w:640->320, 3d-display's w w':16 # offset 160 # w=160+(320/16)*w' # for iw in range(0,16): # # pic_plane's h:480(oh), 3d-display's h':75 # h=(480/75)*h' # for ih in range(0,75): # sin 3D display # w: 32.5cm, 16 # h: 50.0cm, 75 # z: 16.0cm, 8 # # for i in each ii, for each ih, # send "set4 ix(in ih) iy(in iw) r0,g0,b0 ... r3,g3,b3" # to i's display's p # self.coms.send_command_to_all("clear plane") for ii in range(8): for ih in range(75): # # send "set4 ix iy r0,g0,b0 ... r3,g3,b3" # pic_plane's h:480(oh), 3d-display's h':75 # h=(480/75)*h' # iy=iw % 4 h=(480/75)*(74-ih) for iy in range(4): # # pic_plane's w:640->320, 3d-display's w w':16 # offset 160 # w=160+(320/16)*w' # # # send "setpic4 ix iy r0 g0 b0 ... r3 g3 b3" # sending="setpic4 "+str(ih)+" "+str(iy*4)+" " send_flag=False for ipxx in range(4): w=160+(320/16)*(iy*4+ipxx) dx=depth_image[int(h),int(w)] dxx=dx-self.clipping_near pxx=dxx/self.d_interval if int(pxx)==ii: send_flag=True print("h="+str(h)+",w="+str(w)+",pxx="+str(pxx)) print("color=",end="") print(self.pic_plane[int(pxx)][int(h),int(w)]) print("color2=",end="") print(self.bg_removed[int(h),int(w)]) ccxx=self.bg_removed[int(h),int(w)] ccx=[0,0,0] cw=ccxx[0] ccx[0]=ccxx[2] ccx[1]=ccxx[1] ccx[2]=cw else: ccx=[0,0,0] for cx in range(3): sending=sending+str(ccx[cx])+" " if send_flag: self.panel.write_message("send "+sending+" to "+str(ii)) self.coms.send_command_to_i(ii,sending+'\n') #else: #self.panel.write_message("not send "+sending+" to "+str(ii)) self.coms.send_command_to_all("show plane") print("d_min=",end="") print(d_min) print("d_max=",end="") print(d_max) images2 = np.hstack((pic_plane[0],pic_plane[1],pic_plane[2],pic_plane[3], pic_plane[4])) images = np.hstack((self.bg_removed, depth_colormap)) if xii!=-1: print("pic_plane["+str(xii)+"]["+str(xih)+","+str(xiw)+"]=",end="") print(pic_plane[xii][xih,xiw]) print(self.pic_plane[xii][xih,xiw]) print("bg_removed["+str(xih)+","+str(xiw)+"]=",end="") print(self.bg_removed[xih,xiw]) else: print("xii==-1") cv2.namedWindow('Align Example', cv2.WINDOW_NORMAL) cv2.imshow('Align Example', images) cv2.namedWindow('Align Example2', cv2.WINDOW_NORMAL) cv2.imshow('Align Example2', images2) self.display() key = cv2.waitKey(1) # Press esc or 'q' to close the image window if key & 0xFF == ord('q') or key == 27: cv2.destroyAllWindows() self.is_quitting=True def streaming_loop(self): # Streaming loop try: while True: print("start streaming.") self.streaming() if is_quitting: if self.is_quitting: return finally: print("streaming_thread,finally") self.pipeline.stop() def display(self): def display_panel_to_canvas(self,panel,canvas): '''画像をCanvasに表示する''' # フレーム画像の取得 #ret, frame = self.capture.read() # BGR→RGB変換 cv_image_bg_removed = cv2.cvtColor(self.bg_removed, cv2.COLOR_BGR2RGB) px = cv2.cvtColor(panel, cv2.COLOR_BGR2RGB) # NumPyのndarrayからPillowのImageへ変換 pil_image = Image.fromarray(cv_image_bg_removed) pil_image = Image.fromarray(px) # キャンバスのサイズを取得 canvas0_width = self.panel.bg_removed_canvas.winfo_width() canvas0_height = self.panel.bg_removed_canvas.winfo_height() canvas0_width = canvas.winfo_width() canvas0_height = canvas.winfo_height() # 画像のアスペクト比(縦横比)を崩さずに指定したサイズ(キャンバスのサイズ)全体に画像をリサイズする pil_image = ImageOps.pad(pil_image, (canvas0_width, canvas0_height)) # PIL.ImageからPhotoImageへ変換する self.photo_image = ImageTk.PhotoImage(image=pil_image) photo_image = ImageTk.PhotoImage(image=pil_image) self.imgs.append(photo_image) # 画像の描画 self.self.panel.bg_removed_canvas.create_image( canvas.create_image( canvas0_width / 2, # 画像表示位置(Canvasの中心) canvas0_height / 2, image=self.photo_image # 表示画像データ image=photo_image # 表示画像データ ) def display(self): self.imgs=[] self.display_panel_to_canvas(self.bg_removed,self.panel.bg_removed_canvas) self.display_panel_to_canvas(self.depth_colormap,self.panel.depth_colormap_canvas) self.display_panel_to_canvas(self.pic_plane[0],self.panel.pic_panel_0_canvas) self.display_panel_to_canvas(self.pic_plane[1],self.panel.pic_panel_1_canvas) self.display_panel_to_canvas(self.pic_plane[2],self.panel.pic_panel_2_canvas) self.display_panel_to_canvas(self.pic_plane[3],self.panel.pic_panel_3_canvas) self.display_panel_to_canvas(self.pic_plane[4],self.panel.pic_panel_4_canvas) self.display_panel_to_canvas(self.pic_plane[5],self.panel.pic_panel_5_canvas) self.display_panel_to_canvas(self.pic_plane[6],self.panel.pic_panel_6_canvas) self.display_panel_to_canvas(self.pic_plane[7],self.panel.pic_panel_7_canvas) # disp_image()を10msec後に実行する self.disp_id = self.after(10, self.disp_image) #self.disp_id = self.panel.root.after(10, self.display) if __name__=="__main__": root=Tk() coms=Coms() p=Command_Panel(root,coms) coms.set_message_window(p) cs=Get_Cross_Sections(coms,p) root.mainloop() }}