#author("2024-03-07T18:34:55+09:00","default:Real2Virtual202111","Real2Virtual202111") #author("2024-08-15T07:19:39+09:00","default:TESLA202407","TESLA202407") [[pi_arduino_uart_ex01.py]] #code(Python){{ # -*- coding:utf-8 -*- # takashi yamanoue, fukuyama university, 2024/2/27 # 20230503 # from tkinter import * import tkinter as tk from tkinter import scrolledtext import serial import threading from tkinter.ttk import Scale from tkinter import colorchooser,filedialog,messagebox import time import math import sys import requests import os import socket from collections import deque import subprocess import copy import re import ipget #import ipget import netifaces import queue class Remote_Command_Reader: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) HOST = 'localhost' PORT = 9998 def __init__(self,pd): print("start Remote_Command_Reader.__init__") self.gui=pd print(self.gui) self.gui.set_Remote_Command_Reader(self) self.my_address='localhost' self.get_my_address() def python2fwbln(self,py2x_message): msg=py2x_message+'\n' self.sock.sendall(msg.encode('utf-8')) def python2fwb(self,py2x_message): msg=py2x_message self.sock.sendall(msg.encode('utf-8')) def get_my_address(self): self.my_address = socket.gethostbyname(socket.gethostname()) #ipx=ipget.ipget() #self.my_address=ipx.ipaddr("wlan0") return self.my_address def parse(self,line): #self.python2fwb(">"+line) self.gui.parse_command(line) def returnFileList(self): cpath=os.getcwd() os.chdir("/home/pi/Pictures") filenames = [f.name for f in os.scandir()] filenames.sort() for fn in filenames: self.python2fwbln(fn) os.chdir(cpath) def shutdown(self): os.system("sudo shutdown -h now") def client_start(self,addr): """クライアントのスタート""" """繧ッ繝ゥ繧、繧「繝ウ繝医・繧ケ繧ソ繝シ繝・"" try: self.sock.connect((addr, self.PORT)) print("pi_arduino_uart_ex01.py connected to the server") except Exception as e: msg1="connect error, addr="+addr+" port="+str(self.PORT) print(msg1) self.gui.write_message(msg1) tb=sys.exec_info()[2] msg2="[message]{0}".format(e.with_traceback(tb)) print(msg2) self.gui.write_message(msg2) return addr=self.sock.getsockname()[0] print("connect, addr="+addr) handle_thread = threading.Thread(target=self.handler, args=(self.sock,), daemon=True) handle_thread.start() def return_message(self,x): self.sock.send((x+'\n').encode('utf-8')) def handler(self,sock): """サーバからメッセージを受信し、表示する""" """繧オ繝シ繝舌°繧峨Γ繝・そ繝シ繧ク繧貞女菫。縺励∬。ィ遉コ縺吶k""" print("start handler from pic_ex04.py") sf=sock.makefile() while True: data=b'' #data=b'' #data=sock.recv(1024) #try: # data = sock.recv(1024) #except Exception as e: # tb=sys.exec_info()[2] # msg2="message]{0}".format(e.with_traceback(tb)) # print(msg2) # self.gui.write_message(msg2) try: data = sock.recv(1024) #print("[client-recvツ。]{}".format(data.decode("utf-8"))) #received=data.decode("utf-8") #lines=received.splitlines() #for line in lines: line=sf.readline() self.parse(line) except Exception as e: tb=sys.exec_info()[2] msg2="message]{0}".format(e.with_traceback(tb)) print(msg2) self.gui.write_message(msg2) try: #print("[client-recv¡]{}".format(data.decode("utf-8"))) received=data.decode("utf-8") lines=received.splitlines() for line in lines: self.parse(line) except Exception as e: tb=sys.exec_info()[2] msg2="message]{0}".format(e.with_traceback(tb)) print(msg2) self.gui.write_message(msg2) class App: arduino_queue=queue.Queue() gui_queue=queue.Queue() init_distance=50.0 allowance_0=4.0 allowance_1=10.0 allowance_2=50.0 waiting=False waiting_lock=threading.Lock() def __init__(self,master): self.root=root self.root.title("command panel") self.root.configure(background="white") self.root.geometry("850x600") self.server_label=Label(self.root, text='server addr') self.server_label.place(x=10,y=10) self.server_field=Entry(self.root, width=40) self.server_field.place(x=100,y=10) self.id_label=Label(self.root, text='id') self.id_label.place(x=10,y=40) self.id_field=Entry(self.root, width=40) self.id_field.place(x=100,y=40) #self.address_field=Entry(self.root, width=40) #self.address_field.place(x=100,y=10) self.command_label=Label(self.root, text='command') self.command_label.place(x=10,y=60) self.command_field=Entry(self.root, width=70) self.command_field.place(x=100,y=60) self.parse_command_button=Button(self.root, text='parse',bd=1,bg='white', command=self.click_command_button,width=8, relief=RIDGE) self.parse_command_button.place(x=700,y=50) self.arduino_command_label=Label(self.root, text='arduino') self.arduino_command_label.place(x=10,y=90) self.arduino_command_field=Entry(self.root, width=70) self.arduino_command_field.place(x=100,y=90) self.arduino_send_button=Button(self.root, text='send',bd=1,bg='white', command=self.arduino_click_button,width=8, relief=RIDGE) self.arduino_send_button.place(x=700,y=85) self.message_frame=tk.Frame(self.root,width=100, height=30) self.message_frame.place(x=30,y=120) self.message_area=scrolledtext.ScrolledText(self.message_frame) self.message_area.pack() self.message_line=0 self.message_clear_button=Button(self.root, text='clear',bd=1,bg='white', command=self.click_message_clear,width=8, relief=RIDGE) self.message_clear_button.place(x=700,y=120) self.id_field.delete(0,END) self.id_field.insert(END,'') self.serial=serial.Serial('/dev/ttyACM0',115200) self.serial.flush() thread=threading.Thread(target=self.read_loop,args=[],name='thread1') ipx=ipget.ipget() self.send_arduino("show \""+str(ipx.ipaddr('wlan0'))+"\"") self.send_arduino("show \""+str(ipx.ipaddr('eth0'))+"\"") #ipx=ipget.ipget() #self.send_arduino("show \""+str(ipx.ipaddr('wlan0'))+"\"") #self.send_arduino("show \""+str(ipx.ipaddr('eth0'))+"\"") interface_data=netifaces.ifaddresses('eth0') ip_v4=interface_data[netifaces.AF_INET][0]['addr'] self.send_arduino("show \""+str(ip_v4)+"\"") thread.start() gui_thread=threading.Thread(target=self.gui_access_loop,args=[],name='gui_thread') gui_thread.start() # Functions are defined here def set_Remote_Command_Reader(self,reader): self.reader=reader def set_tcp_server_address(self,address): self.server_field.delete(0,END) self.server_field.insert(END,address) def connect(self): server=self.server_field.get() self.reader.client_start(server) def shutdown(self): os.system("sudo shutdown -h now") def write_message(self,message): #self.message_area.insert(tk.END,message) #self.message_area.insert(tk.END,'\n') self.gui_queue.put('message '+message) def send_arduino(self,message): self.serial.write(message.encode('utf-8')) self.write_message(">"+message) self.serial.write('\n'.encode('utf-8')) def arduino_click_button(self): inx=self.arduino_command_field.get() print("inx="+inx) self.send_arduino(inx) self.arduino_command_field.delete(0,END) def click_message_clear(self): self.message_area.delete('1.0',END) def click_command_button(self): inx=self.command_field.get() print("inx="+inx) self.parse_command(inx) self.command_field.delete(0,END) def set_my_id(self,id): self.id_field.delete(0,END) self.id_field.insert(END,id) def set_init(self): # while True: # l=get distance # if abs(l-init_distance)<allowance # return # while l>init_distance # back little # while l<init_distance # go little self.write_message("start set init.") print("start set_init.") for i in range(30): #while True print("i="+str(i)) with self.waiting_lock: self.waiting=True print("at init, set waiting=True.") self.send_arduino("get distance.") print("sending get distance.") vs="" #vs=self.arduino_queue.get() for j in range(50): # wait for the distance print("j="+str(j)) #print("arduin_queue.get()...") c=self.arduino_queue.get() #print("arduino_queue.get()="+c) rx=c.split() #print("rx="+str(rx)) lrx=len(rx) #print("lrx="+str(lrx)) if lrx==2: if rx[1]=='mm.': print("get "+rx[0]) vs=rx[0] else: vs="" else: vs="" if vs=="": print("vs=no value") time.sleep(0.2) else: print("vs="+vs) break # if len(self.arduino_queue)>0: # vs=self.arduino_queue.get() # self.write_message("vs="+vs) # break # else: # self.write_message("wait arduino_queue") # time.sleep(0.2) print("at init, set, self.waiting=False") with self.waiting_lock: self.waiting=False time.sleep(0.5) if vs=="": self.reader.return_message("something wrong.") return self.write_message("distance="+vs) print("distance="+vs) v=float(vs) if abs(v-self.init_distance)<self.allowance_0: self.write_message("init done.") print("init done.") self.send_arduino("set current 0.0") self.reader.return_message("init succeded.") return elif v>self.init_distance+self.allowance_2: #self.write_message("v>allowance_2") print("v>allowance_2") dx=((v-self.init_distance)*2.0)/10.0 self.send_arduino("set current "+str(dx)) self.send_arduino("moveTo 0.0") elif v>self.init_distance+self.allowance_1: #self.write_message("v>allowance_1") print("v>allowance_1") dx=((v-self.init_distance)*0.2)/10.0 self.send_arduino("set current "+str(dx)) self.send_arduino("moveTo 0.0") elif v>self.init_distance: self.send_arduino("set currnt 0.02") self.send_arduino("moveTo 0.0") elif v<self.init_distance-self.allowance_1: print("v<allowance_0") self.send_arduino("set current 0.0") self.send_arduino("moveTo 0.1") else: print("else") self.send_arduino("set current 0.0") self.send_arduino("moveTo 0.025") print("init done, i>=30") self.send_arduino("set current 0.0") def is_my_id(self,x): print("is_my_id("+x+")",end="") id=self.id_field.get() print(" id="+id) return re.match(x,id) def read_loop(self): print("start read_loop") while True: c=self.serial.readline().decode('utf-8').rstrip() self.message_line=self.message_line+1 #print("c="+c) self.write_message(c) #addr=self.server_field.get() #rtn=addr+" "+c #print("return "+c) self.reader.return_message(c) #print("at read, self.waiting="+str(self.waiting)) if self.waiting: print("at read, waiting==True") #rx=c.split() #print("rx="+str(rx)) #lrx=len(rx) #print("lrx="+str(lrx)) #if lrx==2: # #self.write_message("lrx==2") # print("lrx==") # if rx[1]=='mm.': # #self.write_message("put..."+rx[0]) # print("put..."+rx[0]) # self.arduino_queue.put(rx[0]) # else: # self.arduino_queue.put("") #else: # self.arduino_queue.put("") self.arduino_queue.put(c) def gui_access_loop(self): print('start gui_access_loop(self)') while True: l=self.gui_queue.get() #print('l='+l) if l.startswith('message '): mx=l[len('message '):] #print('mx='+mx) self.gui_write_message(mx) def gui_write_message(self,message): self.message_area.insert(tk.END,message) self.message_area.insert(tk.END,'\n') # # command # id="<id>" arduino <arduino command> # arduino <arduino command> # set id="<id>" # set init # shutdown # def parse_command(self,command): rest=[""] strc=[""] ix=[""] self.command_field.delete(0,END) self.command_field.insert(END,command) command=self.r_b(command) print("parse_command("+command+")") if self.parse_key("id=",command,rest): print("parse_key(id="+command+","+rest[0]+")") command=self.r_b(rest[0]) if self.parse_String_Constant(command,strc,rest): print("strc="+strc[0]+",rest="+rest[0]) command=self.r_b(rest[0]) strcx=strc[0] if strcx=="*" : if self.parse_arduino(command): return True else: return False elif self.is_my_id(strcx): print("start parse_arduino("+command+",rest)") if self.parse_arduino(command): return True else: return False else: return False return False elif self.parse_key("set ",command,rest): command=self.r_b(rest[0]) if self.parse_key("id=",command,rest): command=self.r_b(rest[0]) if self.parse_String_Constant(command,strc,rest): strcx=strc[0] print("strc="+strcx+",rest="+rest[0]) self.set_my_id(strcx) return True else: return False elif self.parse_key("init",command,rest): self.set_init() else: return False elif self.parse_key("shutdown",command,rest): self.shutdown() return True def parse_arduino(self,command): rest=[""] strc=[""] ix=[""] command=self.r_b(command) if self.parse_key("arduino ",command,rest): print("parse_key(arduino "+command+","+rest[0]+")") command=self.r_b(rest[0]) print("xx "+command) self.arduino_command_field.delete(0,END) self.arduino_command_field.insert(END,command) self.send_arduino(command) return True elif self.parse_key("init",command,rest): print("parse_key(init "+command+","+rest[0]+")") self.set_init() return True else: return False def parse_key(self,key,inx,rest): keylen=len(key) inlen=len(inx) if inx.startswith(key): rest[0]=inx[keylen:inlen] return True rest[0]=inx return False def parse_p_int(self,command,ix,rest): print("parse_p_int("+command+",ix,rest)") i=0 c=command[0] if c in ['0','1','2','3','4','5','6','7','8','9']: clen=len(command) while True: i=i*10+int(c) command=command[1:clen] clen=len(command) if clen==0: ix[0]=i rest[0]="" return True print("parse_p_int command="+command) c=command[0] if not (c in ['0','1','2','3','4','5','6','7','8','9']): ix[0]=i rest[0]=command return True rest[0]=command ix[0]=0 return False def r_b(self,inx): rx=[""] while self.parse_key(" ",inx,rx): inx=rx[0] return rx[0] def parse_String_Constant(self,inx,strc,rest): rx=[""] xstr="" if self.parse_key('"',inx,rx): inx=rx[0] fx=inx[0] xlen=len(inx) while fx!='"': if xlen==0: return False if fx=='\\': inx=inx[1:xlen] else: xstr=xstr+fx xlen=len(inx) inx=inx[1:xlen] fx=inx[0] if self.parse_key('"',inx,rx): strc[0]=xstr rest[0]=rx[0] return True return False if __name__=="__main__": root = Tk() root.wm_title('Scrolling') app = App(root) rcr=Remote_Command_Reader(app) ip=rcr.get_my_address() app.set_tcp_server_address(ip) app.connect() ip=rcr.get_my_address() app.set_tcp_server_address(ip) app.set_my_id(ip) root.mainloop() }} ---- #counter