#author("2026-05-10T20:11:28+09:00","default:TESLA2","TESLA2")
#author("2026-05-10T20:11:54+09:00","default:TESLA2","TESLA2")
[[Rcb4BaseLib.py]]

#code(Python){{
# -*- coding:utf-8 -*-
# takashi yamanoue, fukuyama university, 2024/2/27
# 20230503
# takashi yamanoue, fukuyama university, 2026/5/10
# 20260510
#
#import serial
import threading
import time
import math
import sys
import requests
import os
import socket
from collections import deque
import subprocess
import copy
import re
import ipget

class Remote_Command_Reader:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    HOST = 'localhost'
    PORT = 9998
    def __init__(self,app):
        print("start chat_client_01.__init__")
        self.app=app
        self.my_address='localhost'
        self.get_my_address()

    def get_my_address(self):
        self.my_address = socket.gethostbyname(socket.gethostname())
        ipx=ipget.ipget()
        self.my_address=ipx.ipaddr("wlan0")
        return self.my_address

    def parse(self,line):
        #self.python2fwb(">"+line)
        #self.gui.parse_command(line)
        self.app.parse_remote_command(line)
        #print(line)

    def shutdown(self):
        os.system("sudo shutdown -h now")

    def client_start(self,addr):
      """クライアントのスタート"""
      try:
          self.sock.connect((addr, self.PORT))
          print("chat_lient_ex01.py connected to the server")
      except Exception as e:
          msg1="connect error, addr="+addr+" port="+str(self.PORT)
          print(msg1)
          tb=sys.exec_info()[2]
          msg2="[message]{0}".format(e.with_traceback(tb))
          print(msg2)
          return
      addr=self.sock.getsockname()[0]
      print("connect, addr="+addr)
      handle_thread = threading.Thread(target=self.handler, args=(self.sock,), daemon=True)
      handle_thread.start()

    def return_message(self,x):
        self.sock.send((x+'\n').encode('utf-8'))

    def handler(self,sock):
      """サーバからメッセージを受信し、表示する"""
      print("start handler from pic_ex04.py")
      sf=sock.makefile()
      while True:
          try:
              line=sf.readline()
              self.parse(line)
          except Exception as e:
              tb=sys.exec_info()[2]
              msg2="message]{0}".format(e.with_traceback(tb))
              print(msg2)

class App:
    def __init__(self):
        self.message_line=0

        #self.serial=serial.Serial('/dev/ttyUSB0',115200)
        #self.serial.flush()
        sys.path.append('RCB4Lib_for_Python_V100B/Rcb4Lib') #Rcb4Libの検索パスを追加
        from Rcb4BaseLib import Rcb4BaseLib            #Rcb4BaseLib.pyの中のRcb4BaseLibが使えるように設定
        self.rcb4 = Rcb4BaseLib()      #rcb4をインスタンス(定義)

        thread=threading.Thread(target=self.read_loop,args=[],name='thread1')
        thread.start()

# Functions are defined here
    def set_Remote_Command_Reader(self,reader):
        self.reader=reader

    def connect(self, addr):
        #self.reader.client_start(self.server_address)
        self.reader.client_start(addr)
        self.ipx=ipget.ipget()

    def shutdown(self):
        os.system("sudo shutdown -h now")

    def send_rcb4(self,message):
        line=message+'\n'
        self.serial.write(line.encode('utf-8'))
        print("rcb4>"+message)

    def read_loop(self):
        print("start read_loop")
        while True:
            line=input()
            if self.reader!=None:
                print("read_loop self.reader!=None line="+line)
                self.parse_local_command(line)
            else:
                print('no connection to tcp server(reader)')

    def rcb4_open(self):
        #ポートをオープン
        #rcb4.open('/dev/ttyAMA0',115200,1.3)  #(portName,bundrate,timeout(s))
        self.rcb4.open('/dev/ttyUSB0',115200,1.3)
        #ACKコマンドを送ってRCB4と接続できるか確認
        if self.rcb4.checkAcknowledge() == True:
            print ('checkAcknowledge OK')
            print ('Version    -->' ,self.rcb4.Version)
        else:
            print ('checkAcknowledge error')

    def rcb4_get_config(self):
            print('get config =', end="")
            print(f"{self.rcb4.getConfig():x}")

    def rcb4_get_pio(self):
        print('get pio =',end="")
        print(f"{self.rcb4.getPio():x}")
    def rcb4_get_all_ad(self):
        print('get all ad data =',end="")
        print(f"{self.rcb4.getAllAdData()}")
    def rcb4_get_voltage(self):
        print('voltage =',end="")
        print(self.rcb4.getRcb4Voltage())

        '''
        len, rtn=Rcb4BaseLib.runConstFrameServoCmd([rcb4.ServoData(1,0,7500)],0x80)
        if len>0:
        tf=rcb4.synchronizeAck(rtn)
        if tf:
            print('success')
        else:
            print('fail')
        else:
            print("runConstFrameServoCmod error")
        '''
    def get_pos(self,id,sio):
        #sio=0x01
        (tf,pos)=self.rcb4.getSinglePos(1,sio)
        print(tf,end="")
        print(' pos=',end="")
        #print(pos)
        return pos
    def set_pos(self,id,sio,position):
        rtn=self.rcb4.setPos2(id,sio,position)
        print(rtn)
        return rtn
    def set_free(self,id,sio):
        rtn=self.rcb4.setFreeSingleServo(id,sio)
        print(rtn)
        return rtn
    def rcb4_close(self):
        self.rcb4.close()

    def start_sync(self):
        self.sync=True
        thread=threading.Thread(target=self.sync_loop,args=[],name='sync_thread1')
        thread.start()

    def start_sync2(self):
        rcmd="remote self.start_sync()"
        self.sync=True
        thread=threading.Thread(target=self.sync_loop,args=[],name='sync_thread1')
        thread.start()

    def stop_sync(self):
        self.sync=False

    def sync_loop(self):
        print("start sync_loop")
        servo_list=[(1,1)]
        while self.sync:
            for id_sio in servo_list:
                (id,sio)=id_sio
                pos=self.get_pos(id,sio)
                cmd="remote self.set_pos("+str(id)+","+str(sio)+","+str(pos)+")"
                print(cmd)
                self.reader.return_message(cmd)
                cmd="remote self.set_free("+str(id)+","+str(sio)+")"
                print(cmd)
                self.reader.return_message(cmd)
            time.sleep(0.1)

    #
    # command
    #    shutdown
    #
    def parse_command(self,command):
        rest=[""]
        strc=[""]
        ix=[""]
        print("parse_command("+command+")")
        return True

    def parse_key(self,key,inx,rest):
        keylen=len(key)
        inlen=len(inx)
        if inx.startswith(key):
            rest[0]=inx[keylen:inlen]
            return True
        rest[0]=inx
        return False

    def parse_p_int(self,command,ix,rest):
        print("parse_p_int("+command+",ix,rest)")
        i=0
        c=command[0]
        if c in ['0','1','2','3','4','5','6','7','8','9']:
            clen=len(command)
            while True:
                i=i*10+int(c)
                command=command[1:clen]
                clen=len(command)
                if clen==0:
                    ix[0]=i
                    rest[0]=""
                    return True
                print("parse_p_int command="+command)
                c=command[0]
                if not (c in ['0','1','2','3','4','5','6','7','8','9']):
                    ix[0]=i
                    rest[0]=command
                    return True
        rest[0]=command
        ix[0]=0
        return False

    def r_b(self,inx):
        rx=[""]
        while self.parse_key(" ",inx,rx):
            inx=rx[0]
        return rx[0]

    def parse_String_Constant(self,inx,strc,rest):
        rx=[""]
        xstr=""
        if self.parse_key('"',inx,rx):
            inx=rx[0]
            fx=inx[0]
            xlen=len(inx)
            while fx!='"':
                if xlen==0:
                    return False
                if fx=='\\':
                   inx=inx[1:xlen]
                else:
                   xstr=xstr+fx
                xlen=len(inx)
                inx=inx[1:xlen]
                fx=inx[0]
            if self.parse_key('"',inx,rx):
                strc[0]=xstr
                rest[0]=rx[0]
                return True
        return False

    def ex(self,cmd):
        exec_locals={'self':self}
        try:
            rtn=eval(cmd,globals(),exec_locals)
            print(rtn)
        except Exception as e:
            print("error in "+cmd)
            print(e)

    def parse_local_command(self,line):
        rest=[""]
        strc=[""]
        ix=[""]
        print("parse_local_command("+line+")")
        linex=self.r_b(line)
        if self.parse_key("local ",linex,rest):
            cmd=rest[0]
            self.ex(cmd)
            return True
        elif self.parse_key("remote ",linex,rest):
            #cmd=rest[0]
            self.reader.return_message(line)
        else:
            self.reader.return_message(line)
            return True

    def parse_remote_command(self,line):
        rest=[""]
        strc=[""]
        ix=[""]
        print("parse_remote_command("+line+")")
        linex=self.r_b(line)
        if self.parse_key("remote ",linex,rest):
            cmd=rest[0]
            self.ex(cmd)
            return True
        else:
            print(line)
            return True

if __name__=="__main__":
    args = sys.argv
    print(args)
    alen=len(args)
    if alen<2:
        print("usage: python3 chat_client_01.py <server_ip>")
        exit()
    ipx= args[1]
    print('ip='+ipx)
    app = App()
    rcr=Remote_Command_Reader(app)
    app.set_Remote_Command_Reader(rcr)
    app.connect(ipx)
    ip=rcr.get_my_address()
    print(ip)
}}
----
#counter

トップ   編集 差分 履歴 添付 複製 名前変更 リロード   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS