tesla2_client_02.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[tesla2_client_01.py]]
#code(Python){{
# -*- coding:utf-8 -*-
# takashi yamanoue, fukuyama university, 2026/5/12
# 20260512
#
# two servo
#
#import serial
import threading
import time
import math
import sys
import requests
import os
import socket
from collections import deque
import subprocess
import copy
import re
import ipget
class Remote_Command_Reader:
sock = socket.socket(socket.AF_INET, socket.SOCK_STRE...
HOST = 'localhost'
PORT = 9998
def __init__(self,app):
print("start chat_client_01.__init__")
self.app=app
self.my_address='localhost'
self.get_my_address()
def get_my_address(self):
self.my_address = socket.gethostbyname(socket.get...
ipx=ipget.ipget()
self.my_address=ipx.ipaddr("wlan0")
return self.my_address
def parse_remote(self,line):
#self.python2fwb(">"+line)
#self.gui.parse_command(line)
self.app.parse_remote_command(line)
#print(line)
def shutdown(self):
os.system("sudo shutdown -h now")
def client_start(self,addr):
"""クライアントのスタート"""
try:
self.sock.connect((addr, self.PORT))
print("chat_lient_ex01.py connected to the serv...
except Exception as e:
msg1="connect error, addr="+addr+" port="+str(s...
print(msg1)
tb=sys.exec_info()[2]
msg2="[message]{0}".format(e.with_traceback(tb))
print(msg2)
return
addr=self.sock.getsockname()[0]
print("connect, addr="+addr)
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def return_message(self,x):
self.sock.send((x+'\n').encode('utf-8'))
def handler(self,sock):
"""サーバからメッセージを受信し、表示する"""
print("start handler from pic_ex04.py")
sf=sock.makefile()
while True:
try:
line=sf.readline()
self.parse_remote(line)
except Exception as e:
tb=sys.exec_info()[2]
msg2="message]{0}".format(e.with_traceback(...
print(msg2)
class App:
id="a"
def __init__(self):
self.message_line=0
#self.serial=serial.Serial('/dev/ttyUSB0',115200)
#self.serial.flush()
sys.path.append('RCB4Lib_for_Python_V100B/Rcb4Lib...
from Rcb4BaseLib import Rcb4BaseLib #R...
self.rcb4 = Rcb4BaseLib() #rcb4をインスタン...
thread=threading.Thread(target=self.read_loop,arg...
thread.start()
# Functions are defined here
def set_Remote_Command_Reader(self,reader):
self.reader=reader
def connect(self, addr):
#self.reader.client_start(self.server_address)
self.reader.client_start(addr)
self.ipx=ipget.ipget()
def shutdown(self):
os.system("sudo shutdown -h now")
def send_rcb4(self,message):
line=message+'\n'
self.serial.write(line.encode('utf-8'))
print("rcb4>"+message)
def read_loop(self):
print("start read_loop")
while True:
line=input()
if self.reader!=None:
print("read_loop self.reader!=None line="...
self.parse_local_command(line)
else:
print('no connection to tcp server(reader...
def rcb4_open(self):
#ポートをオープン
#rcb4.open('/dev/ttyAMA0',115200,1.3) #(portName...
self.rcb4.open('/dev/ttyUSB0',115200,1.3)
#ACKコマンドを送ってRCB4と接続できるか確認
if self.rcb4.checkAcknowledge() == True:
print ('checkAcknowledge OK')
print ('Version -->' ,self.rcb4.Version)
else:
print ('checkAcknowledge error')
def rcb4_get_config(self):
print('get config =', end="")
print(f"{self.rcb4.getConfig():x}")
def rcb4_get_pio(self):
print('get pio =',end="")
print(f"{self.rcb4.getPio():x}")
def rcb4_get_all_ad(self):
print('get all ad data =',end="")
print(f"{self.rcb4.getAllAdData()}")
def rcb4_get_voltage(self):
print('voltage =',end="")
print(self.rcb4.getRcb4Voltage())
'''
len, rtn=Rcb4BaseLib.runConstFrameServoCmd([rcb4....
if len>0:
tf=rcb4.synchronizeAck(rtn)
if tf:
print('success')
else:
print('fail')
else:
print("runConstFrameServoCmod error")
'''
def get_pos(self,id,sio):
#sio=0x01
(tf,pos)=self.rcb4.getSinglePos(id,sio)
#print(tf,end="")
#print(' pos=',end="")
#print(pos)
return pos
def set_pos(self,id,sio,position):
rtn=self.rcb4.setPos2(id,sio,position)
#print(rtn)
return rtn
def set_free(self,id,sio):
rtn=self.rcb4.setFreeSingleServo(id,sio)
#print(rtn)
return rtn
def rcb4_close(self):
self.rcb4.close()
def start_sync(self):
print('start_sync()')
self.sync=True
thread=threading.Thread(target=self.sync_loop,arg...
thread.start()
def start_sync2(self):
rcmd="remote self.start_sync()"
self.sync=True
thread=threading.Thread(target=self.sync_loop,arg...
thread.start()
def stop_sync(self):
self.sync=False
def sync_loop(self):
print("start sync_loop")
#return
servo_list=[(1,1),(2,1)]
for id_sio in servo_list:
(id,sio)=id_sio
self.set_free(id,sio)
while self.sync:
for id_sio in servo_list:
(id,sio)=id_sio
pos=self.get_pos(id,sio)
cmd="remote self.set_pos("+str(id)+","+st...
#print(cmd)
self.reader.return_message(cmd)
cmd="remote self.set_free("+str(id)+","+s...
#print(cmd)
self.reader.return_message(cmd)
time.sleep(0.1)
#
# command
# shutdown
#
def parse_command(self,command):
rest=[""]
strc=[""]
ix=[""]
print("parse_command("+command+")")
return True
def parse_key(self,key,inx,rest):
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_p_int(self,command,ix,rest):
print("parse_p_int("+command+",ix,rest)")
i=0
c=command[0]
if c in ['0','1','2','3','4','5','6','7','8','9']:
clen=len(command)
while True:
i=i*10+int(c)
command=command[1:clen]
clen=len(command)
if clen==0:
ix[0]=i
rest[0]=""
return True
print("parse_p_int command="+command)
c=command[0]
if not (c in ['0','1','2','3','4','5','6'...
ix[0]=i
rest[0]=command
return True
rest[0]=command
ix[0]=0
return False
def r_b(self,inx):
rx=[""]
while self.parse_key(" ",inx,rx):
inx=rx[0]
return rx[0]
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def ex(self,cmd):
exec_locals={'self':self}
try:
rtn=eval(cmd,globals(),exec_locals)
print(rtn)
except Exception as e:
print("error in "+cmd)
print(e)
def set_id(self,id):
self.id=id
print('set_id('+id+')')
def parse_local_command(self,line):
rest=[""]
strc=[""]
ix=[""]
print("parse_local_command("+line+")")
linex=self.r_b(line)
idkey='id='+self.id+' '
if self.parse_key("local ",linex,rest):
cmd=rest[0]
self.ex(cmd)
return True
elif self.parse_key("remote ",linex,rest):
self.reader.return_message(linex)
return True
#elif self.parse_key(idkey,linex,rest):
# cmd=rest[0]
# self.ex(cmd)
#cmd=rest[0]
return True
else:
self.reader.return_message(line)
return True
def parse_remote_command(self,line):
rest=[""]
strc=[""]
ix=[""]
print("parse_remote_command("+line+")")
linex=self.r_b(line)
idkey='id='+self.id+' '
if self.parse_key("remote ",linex,rest):
print("remote ...")
cmd=rest[0]
self.ex(cmd)
return True
elif self.parse_key(idkey,linex,rest):
print(idkey+"...")
cmd=rest[0]
self.ex(cmd)
return True
else:
print(line)
return True
if __name__=="__main__":
args = sys.argv
print(args)
alen=len(args)
if alen<2:
print("usage: python3 chat_client_01.py <server_i...
exit()
ipx= args[1]
print('ip='+ipx)
app = App()
app.set_id("a")
rcr=Remote_Command_Reader(app)
app.set_Remote_Command_Reader(rcr)
app.connect(ipx)
ip=rcr.get_my_address()
print(ip)
app.rcb4_open()
}}
----
#counter
終了行:
[[tesla2_client_01.py]]
#code(Python){{
# -*- coding:utf-8 -*-
# takashi yamanoue, fukuyama university, 2026/5/12
# 20260512
#
# two servo
#
#import serial
import threading
import time
import math
import sys
import requests
import os
import socket
from collections import deque
import subprocess
import copy
import re
import ipget
class Remote_Command_Reader:
sock = socket.socket(socket.AF_INET, socket.SOCK_STRE...
HOST = 'localhost'
PORT = 9998
def __init__(self,app):
print("start chat_client_01.__init__")
self.app=app
self.my_address='localhost'
self.get_my_address()
def get_my_address(self):
self.my_address = socket.gethostbyname(socket.get...
ipx=ipget.ipget()
self.my_address=ipx.ipaddr("wlan0")
return self.my_address
def parse_remote(self,line):
#self.python2fwb(">"+line)
#self.gui.parse_command(line)
self.app.parse_remote_command(line)
#print(line)
def shutdown(self):
os.system("sudo shutdown -h now")
def client_start(self,addr):
"""クライアントのスタート"""
try:
self.sock.connect((addr, self.PORT))
print("chat_lient_ex01.py connected to the serv...
except Exception as e:
msg1="connect error, addr="+addr+" port="+str(s...
print(msg1)
tb=sys.exec_info()[2]
msg2="[message]{0}".format(e.with_traceback(tb))
print(msg2)
return
addr=self.sock.getsockname()[0]
print("connect, addr="+addr)
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def return_message(self,x):
self.sock.send((x+'\n').encode('utf-8'))
def handler(self,sock):
"""サーバからメッセージを受信し、表示する"""
print("start handler from pic_ex04.py")
sf=sock.makefile()
while True:
try:
line=sf.readline()
self.parse_remote(line)
except Exception as e:
tb=sys.exec_info()[2]
msg2="message]{0}".format(e.with_traceback(...
print(msg2)
class App:
id="a"
def __init__(self):
self.message_line=0
#self.serial=serial.Serial('/dev/ttyUSB0',115200)
#self.serial.flush()
sys.path.append('RCB4Lib_for_Python_V100B/Rcb4Lib...
from Rcb4BaseLib import Rcb4BaseLib #R...
self.rcb4 = Rcb4BaseLib() #rcb4をインスタン...
thread=threading.Thread(target=self.read_loop,arg...
thread.start()
# Functions are defined here
def set_Remote_Command_Reader(self,reader):
self.reader=reader
def connect(self, addr):
#self.reader.client_start(self.server_address)
self.reader.client_start(addr)
self.ipx=ipget.ipget()
def shutdown(self):
os.system("sudo shutdown -h now")
def send_rcb4(self,message):
line=message+'\n'
self.serial.write(line.encode('utf-8'))
print("rcb4>"+message)
def read_loop(self):
print("start read_loop")
while True:
line=input()
if self.reader!=None:
print("read_loop self.reader!=None line="...
self.parse_local_command(line)
else:
print('no connection to tcp server(reader...
def rcb4_open(self):
#ポートをオープン
#rcb4.open('/dev/ttyAMA0',115200,1.3) #(portName...
self.rcb4.open('/dev/ttyUSB0',115200,1.3)
#ACKコマンドを送ってRCB4と接続できるか確認
if self.rcb4.checkAcknowledge() == True:
print ('checkAcknowledge OK')
print ('Version -->' ,self.rcb4.Version)
else:
print ('checkAcknowledge error')
def rcb4_get_config(self):
print('get config =', end="")
print(f"{self.rcb4.getConfig():x}")
def rcb4_get_pio(self):
print('get pio =',end="")
print(f"{self.rcb4.getPio():x}")
def rcb4_get_all_ad(self):
print('get all ad data =',end="")
print(f"{self.rcb4.getAllAdData()}")
def rcb4_get_voltage(self):
print('voltage =',end="")
print(self.rcb4.getRcb4Voltage())
'''
len, rtn=Rcb4BaseLib.runConstFrameServoCmd([rcb4....
if len>0:
tf=rcb4.synchronizeAck(rtn)
if tf:
print('success')
else:
print('fail')
else:
print("runConstFrameServoCmod error")
'''
def get_pos(self,id,sio):
#sio=0x01
(tf,pos)=self.rcb4.getSinglePos(id,sio)
#print(tf,end="")
#print(' pos=',end="")
#print(pos)
return pos
def set_pos(self,id,sio,position):
rtn=self.rcb4.setPos2(id,sio,position)
#print(rtn)
return rtn
def set_free(self,id,sio):
rtn=self.rcb4.setFreeSingleServo(id,sio)
#print(rtn)
return rtn
def rcb4_close(self):
self.rcb4.close()
def start_sync(self):
print('start_sync()')
self.sync=True
thread=threading.Thread(target=self.sync_loop,arg...
thread.start()
def start_sync2(self):
rcmd="remote self.start_sync()"
self.sync=True
thread=threading.Thread(target=self.sync_loop,arg...
thread.start()
def stop_sync(self):
self.sync=False
def sync_loop(self):
print("start sync_loop")
#return
servo_list=[(1,1),(2,1)]
for id_sio in servo_list:
(id,sio)=id_sio
self.set_free(id,sio)
while self.sync:
for id_sio in servo_list:
(id,sio)=id_sio
pos=self.get_pos(id,sio)
cmd="remote self.set_pos("+str(id)+","+st...
#print(cmd)
self.reader.return_message(cmd)
cmd="remote self.set_free("+str(id)+","+s...
#print(cmd)
self.reader.return_message(cmd)
time.sleep(0.1)
#
# command
# shutdown
#
def parse_command(self,command):
rest=[""]
strc=[""]
ix=[""]
print("parse_command("+command+")")
return True
def parse_key(self,key,inx,rest):
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_p_int(self,command,ix,rest):
print("parse_p_int("+command+",ix,rest)")
i=0
c=command[0]
if c in ['0','1','2','3','4','5','6','7','8','9']:
clen=len(command)
while True:
i=i*10+int(c)
command=command[1:clen]
clen=len(command)
if clen==0:
ix[0]=i
rest[0]=""
return True
print("parse_p_int command="+command)
c=command[0]
if not (c in ['0','1','2','3','4','5','6'...
ix[0]=i
rest[0]=command
return True
rest[0]=command
ix[0]=0
return False
def r_b(self,inx):
rx=[""]
while self.parse_key(" ",inx,rx):
inx=rx[0]
return rx[0]
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def ex(self,cmd):
exec_locals={'self':self}
try:
rtn=eval(cmd,globals(),exec_locals)
print(rtn)
except Exception as e:
print("error in "+cmd)
print(e)
def set_id(self,id):
self.id=id
print('set_id('+id+')')
def parse_local_command(self,line):
rest=[""]
strc=[""]
ix=[""]
print("parse_local_command("+line+")")
linex=self.r_b(line)
idkey='id='+self.id+' '
if self.parse_key("local ",linex,rest):
cmd=rest[0]
self.ex(cmd)
return True
elif self.parse_key("remote ",linex,rest):
self.reader.return_message(linex)
return True
#elif self.parse_key(idkey,linex,rest):
# cmd=rest[0]
# self.ex(cmd)
#cmd=rest[0]
return True
else:
self.reader.return_message(line)
return True
def parse_remote_command(self,line):
rest=[""]
strc=[""]
ix=[""]
print("parse_remote_command("+line+")")
linex=self.r_b(line)
idkey='id='+self.id+' '
if self.parse_key("remote ",linex,rest):
print("remote ...")
cmd=rest[0]
self.ex(cmd)
return True
elif self.parse_key(idkey,linex,rest):
print(idkey+"...")
cmd=rest[0]
self.ex(cmd)
return True
else:
print(line)
return True
if __name__=="__main__":
args = sys.argv
print(args)
alen=len(args)
if alen<2:
print("usage: python3 chat_client_01.py <server_i...
exit()
ipx= args[1]
print('ip='+ipx)
app = App()
app.set_id("a")
rcr=Remote_Command_Reader(app)
app.set_Remote_Command_Reader(rcr)
app.connect(ipx)
ip=rcr.get_my_address()
print(ip)
app.rcb4_open()
}}
----
#counter
ページ名: