#author("2026-05-10T11:34:44+09:00","default:TESLA2","TESLA2") #author("2026-05-20T16:17:02+09:00","default:TESLA2","TESLA2") [[chat_server_ex01.py]] #code(Python){{ # -*- coding:utf-8 -*- # takashi yamanoue, fukuyama city university, 2026/5/10 # 20260510 # takashi yamanoue, fukuyama university, 2024/2/27 # 20230503 # #import serial import threading import time import math import sys import requests import os import socket from collections import deque import subprocess import copy import re import ipget class Remote_Command_Reader: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) HOST = 'localhost' PORT = 9998 def __init__(self,app): print("start chat_client_01.__init__") self.app=app self.my_address='localhost' self.get_my_address() def get_my_address(self): self.my_address = socket.gethostbyname(socket.gethostname()) ipx=ipget.ipget() self.my_address=ipx.ipaddr("wlan0") #self.my_address=ipx.ipaddr("wlan0") self.my_address=ipx.ipaddr("eth0") return self.my_address def parse(self,line): #self.python2fwb(">"+line) #self.gui.parse_command(line) print(line) def shutdown(self): os.system("sudo shutdown -h now") def client_start(self,addr): """クライアントのスタート""" try: self.sock.connect((addr, self.PORT)) print("chat_lient_ex01.py connected to the server") except Exception as e: msg1="connect error, addr="+addr+" port="+str(self.PORT) print(msg1) tb=sys.exec_info()[2] msg2="[message]{0}".format(e.with_traceback(tb)) print(msg2) return addr=self.sock.getsockname()[0] print("connect, addr="+addr) handle_thread = threading.Thread(target=self.handler, args=(self.sock,), daemon=True) handle_thread.start() def return_message(self,x): self.sock.send((x+'\n').encode('utf-8')) def handler(self,sock): """サーバからメッセージを受信し、表示する""" print("start handler from pic_ex04.py") sf=sock.makefile() while True: try: line=sf.readline() self.parse(line) except Exception as e: tb=sys.exec_info()[2] msg2="message]{0}".format(e.with_traceback(tb)) print(msg2) class App: def __init__(self): self.message_line=0 #self.serial=serial.Serial('/dev/ttyUSB0',115200) #self.serial.flush() thread=threading.Thread(target=self.read_loop,args=[],name='thread1') thread.start() # Functions are defined here def set_Remote_Command_Reader(self,reader): self.reader=reader def connect(self, addr): #self.reader.client_start(self.server_address) self.reader.client_start(addr) self.ipx=ipget.ipget() def shutdown(self): os.system("sudo shutdown -h now") def send_rcb4(self,message): line=message+'\n' self.serial.write(line.encode('utf-8')) print("rcb4>"+message) def exit_tesla(self): self.reader.sock.close() exit() def shutdown(self): self.rcb4_close() self.reader.sock.close() os.system("sudo shutdown -h now") def read_loop(self): print("start read_loop") while True: line=input() if self.reader!=None: l=self.r_b(line) rest=[""] if self.parse_key("exit.",l,rest): self.exit_tesla() elif self.parse_key("shutdown.",l,rest): self.shutdown() elif self.parse_key("a.",l,rest): print("start a.") self.reader.return_message("remote self.stop_sync()") self.reader.return_message("id=a self.start_sync()") elif self.parse_key("b.",l,rest): print("start b.") self.reader.return_message("remote self.stop_sync()") self.reader.return_message("id=b self.start_sync()") elif self.parse_key("stop.",l,rest): self.reader.return_message("remote_self.stop_sync()") elif self.reader!=None: self.reader.return_message(line) else: print('no connection to tcp server(reader)') # # command # shutdown # def parse_command(self,command): rest=[""] strc=[""] ix=[""] print("parse_command("+command+")") return True def parse_key(self,key,inx,rest): keylen=len(key) inlen=len(inx) if inx.startswith(key): rest[0]=inx[keylen:inlen] return True rest[0]=inx return False def parse_p_int(self,command,ix,rest): print("parse_p_int("+command+",ix,rest)") i=0 c=command[0] if c in ['0','1','2','3','4','5','6','7','8','9']: clen=len(command) while True: i=i*10+int(c) command=command[1:clen] clen=len(command) if clen==0: ix[0]=i rest[0]="" return True print("parse_p_int command="+command) c=command[0] if not (c in ['0','1','2','3','4','5','6','7','8','9']): ix[0]=i rest[0]=command return True rest[0]=command ix[0]=0 return False def r_b(self,inx): rx=[""] while self.parse_key(" ",inx,rx): inx=rx[0] return rx[0] def parse_String_Constant(self,inx,strc,rest): rx=[""] xstr="" if self.parse_key('"',inx,rx): inx=rx[0] fx=inx[0] xlen=len(inx) while fx!='"': if xlen==0: return False if fx=='\\': inx=inx[1:xlen] else: xstr=xstr+fx xlen=len(inx) inx=inx[1:xlen] fx=inx[0] if self.parse_key('"',inx,rx): strc[0]=xstr rest[0]=rx[0] return True return False if __name__=="__main__": args = sys.argv print(args) alen=len(args) if alen<2: print("usage: python3 chat_client_01.py <server_ip>") exit() ipx= args[1] print('ip='+ipx) app = App() rcr=Remote_Command_Reader(app) app.set_Remote_Command_Reader(rcr) app.connect(ipx) ip=rcr.get_my_address() print(ip) }} ---- #counter