#author("2026-05-12T17:49:05+09:00","default:TESLA2","TESLA2") #author("2026-05-12T18:26:30+09:00","default:TESLA2","TESLA2") [[tesla2_client_01.py]] #code(Python){{ x # -*- coding:utf-8 -*- # takashi yamanoue, fukuyama university, 2026/5/12 # 20260512 # # two servo # #import serial import threading import time import math import sys import requests import os import socket from collections import deque import subprocess import copy import re import ipget class Remote_Command_Reader: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) HOST = 'localhost' PORT = 9998 def __init__(self,app): print("start chat_client_01.__init__") self.app=app self.my_address='localhost' self.get_my_address() def get_my_address(self): self.my_address = socket.gethostbyname(socket.gethostname()) ipx=ipget.ipget() self.my_address=ipx.ipaddr("wlan0") return self.my_address def parse_remote(self,line): #self.python2fwb(">"+line) #self.gui.parse_command(line) self.app.parse_remote_command(line) #print(line) def shutdown(self): os.system("sudo shutdown -h now") def client_start(self,addr): """クライアントのスタート""" try: self.sock.connect((addr, self.PORT)) print("chat_lient_ex01.py connected to the server") except Exception as e: msg1="connect error, addr="+addr+" port="+str(self.PORT) print(msg1) tb=sys.exec_info()[2] msg2="[message]{0}".format(e.with_traceback(tb)) print(msg2) return addr=self.sock.getsockname()[0] print("connect, addr="+addr) handle_thread = threading.Thread(target=self.handler, args=(self.sock,), daemon=True) handle_thread.start() def return_message(self,x): self.sock.send((x+'\n').encode('utf-8')) def handler(self,sock): """サーバからメッセージを受信し、表示する""" print("start handler from pic_ex04.py") sf=sock.makefile() while True: try: line=sf.readline() self.parse_remote(line) except Exception as e: tb=sys.exec_info()[2] msg2="message]{0}".format(e.with_traceback(tb)) print(msg2) class App: id="a" def __init__(self): self.message_line=0 #self.serial=serial.Serial('/dev/ttyUSB0',115200) #self.serial.flush() sys.path.append('RCB4Lib_for_Python_V100B/Rcb4Lib') #Rcb4Libの検索パスを追加 from Rcb4BaseLib import Rcb4BaseLib #Rcb4BaseLib.pyの中のRcb4BaseLibが使えるように設定 self.rcb4 = Rcb4BaseLib() #rcb4をインスタンス(定義) thread=threading.Thread(target=self.read_loop,args=[],name='thread1') thread.start() # Functions are defined here def set_Remote_Command_Reader(self,reader): self.reader=reader def connect(self, addr): #self.reader.client_start(self.server_address) self.reader.client_start(addr) self.ipx=ipget.ipget() def shutdown(self): os.system("sudo shutdown -h now") def send_rcb4(self,message): line=message+'\n' self.serial.write(line.encode('utf-8')) print("rcb4>"+message) def read_loop(self): print("start read_loop") while True: line=input() if self.reader!=None: print("read_loop self.reader!=None line="+line) self.parse_local_command(line) else: print('no connection to tcp server(reader)') def rcb4_open(self): #ポートをオープン #rcb4.open('/dev/ttyAMA0',115200,1.3) #(portName,bundrate,timeout(s)) self.rcb4.open('/dev/ttyUSB0',115200,1.3) #ACKコマンドを送ってRCB4と接続できるか確認 if self.rcb4.checkAcknowledge() == True: print ('checkAcknowledge OK') print ('Version -->' ,self.rcb4.Version) else: print ('checkAcknowledge error') def rcb4_get_config(self): print('get config =', end="") print(f"{self.rcb4.getConfig():x}") def rcb4_get_pio(self): print('get pio =',end="") print(f"{self.rcb4.getPio():x}") def rcb4_get_all_ad(self): print('get all ad data =',end="") print(f"{self.rcb4.getAllAdData()}") def rcb4_get_voltage(self): print('voltage =',end="") print(self.rcb4.getRcb4Voltage()) ''' len, rtn=Rcb4BaseLib.runConstFrameServoCmd([rcb4.ServoData(1,0,7500)],0x80) if len>0: tf=rcb4.synchronizeAck(rtn) if tf: print('success') else: print('fail') else: print("runConstFrameServoCmod error") ''' def get_pos(self,id,sio): #sio=0x01 (tf,pos)=self.rcb4.getSinglePos(id,sio) #print(tf,end="") #print(' pos=',end="") #print(pos) return pos def set_pos(self,id,sio,position): rtn=self.rcb4.setPos2(id,sio,position) #print(rtn) return rtn def set_free(self,id,sio): rtn=self.rcb4.setFreeSingleServo(id,sio) #print(rtn) return rtn def rcb4_close(self): self.rcb4.close() def start_sync(self): print('start_sync()') self.sync=True thread=threading.Thread(target=self.sync_loop,args=[],name='sync_thread1') thread.start() def start_sync2(self): rcmd="remote self.start_sync()" self.sync=True thread=threading.Thread(target=self.sync_loop,args=[],name='sync_thread1') thread.start() def stop_sync(self): self.sync=False def sync_loop(self): print("start sync_loop") #return servo_list=[(1,1),(2,1)] for id_sio in servo_list: (id,sio)=id_sio self.set_free(id,sio) while self.sync: for id_sio in servo_list: (id,sio)=id_sio pos=self.get_pos(id,sio) cmd="remote self.set_pos("+str(id)+","+str(sio)+","+str(pos)+")" #print(cmd) self.reader.return_message(cmd) cmd="remote self.set_free("+str(id)+","+str(sio)+")" #print(cmd) self.reader.return_message(cmd) time.sleep(0.1) # # command # shutdown # def parse_command(self,command): rest=[""] strc=[""] ix=[""] print("parse_command("+command+")") return True def parse_key(self,key,inx,rest): keylen=len(key) inlen=len(inx) if inx.startswith(key): rest[0]=inx[keylen:inlen] return True rest[0]=inx return False def parse_p_int(self,command,ix,rest): print("parse_p_int("+command+",ix,rest)") i=0 c=command[0] if c in ['0','1','2','3','4','5','6','7','8','9']: clen=len(command) while True: i=i*10+int(c) command=command[1:clen] clen=len(command) if clen==0: ix[0]=i rest[0]="" return True print("parse_p_int command="+command) c=command[0] if not (c in ['0','1','2','3','4','5','6','7','8','9']): ix[0]=i rest[0]=command return True rest[0]=command ix[0]=0 return False def r_b(self,inx): rx=[""] while self.parse_key(" ",inx,rx): inx=rx[0] return rx[0] def parse_String_Constant(self,inx,strc,rest): rx=[""] xstr="" if self.parse_key('"',inx,rx): inx=rx[0] fx=inx[0] xlen=len(inx) while fx!='"': if xlen==0: return False if fx=='\\': inx=inx[1:xlen] else: xstr=xstr+fx xlen=len(inx) inx=inx[1:xlen] fx=inx[0] if self.parse_key('"',inx,rx): strc[0]=xstr rest[0]=rx[0] return True return False def ex(self,cmd): exec_locals={'self':self} try: rtn=eval(cmd,globals(),exec_locals) print(rtn) except Exception as e: print("error in "+cmd) print(e) def set_id(self,id): self.id=id print('set_id('+id+')') def parse_local_command(self,line): rest=[""] strc=[""] ix=[""] print("parse_local_command("+line+")") linex=self.r_b(line) idkey='id='+self.id+' ' if self.parse_key("local ",linex,rest): cmd=rest[0] self.ex(cmd) return True elif self.parse_key("remote ",linex,rest): self.reader.return_message(linex) return True #elif self.parse_key(idkey,linex,rest): # cmd=rest[0] # self.ex(cmd) #cmd=rest[0] return True else: self.reader.return_message(line) return True def parse_remote_command(self,line): rest=[""] strc=[""] ix=[""] print("parse_remote_command("+line+")") linex=self.r_b(line) idkey='id='+self.id+' ' if self.parse_key("remote ",linex,rest): print("remote ...") cmd=rest[0] self.ex(cmd) return True elif self.parse_key(idkey,linex,rest): print(idkey+"...") cmd=rest[0] self.ex(cmd) return True else: print(line) return True if __name__=="__main__": args = sys.argv print(args) alen=len(args) if alen<2: print("usage: python3 chat_client_01.py <server_ip>") exit() ipx= args[1] print('ip='+ipx) app = App() app.set_id("a") rcr=Remote_Command_Reader(app) app.set_Remote_Command_Reader(rcr) app.connect(ipx) ip=rcr.get_my_address() print(ip) app.rcb4_open() }} ---- #counter