chat_client_ex01.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[chat_server_ex01.py]]
#code(Python){{
# -*- coding:utf-8 -*-
# takashi yamanoue, fukuyama university, 2024/2/27
# 20230503
#
#import serial
import threading
import time
import math
import sys
import requests
import os
import socket
from collections import deque
import subprocess
import copy
import re
import ipget
class Remote_Command_Reader:
sock = socket.socket(socket.AF_INET, socket.SOCK_STRE...
HOST = 'localhost'
PORT = 9998
def __init__(self,app):
print("start chat_client_01.__init__")
self.app=app
self.my_address='localhost'
self.get_my_address()
def get_my_address(self):
self.my_address = socket.gethostbyname(socket.get...
ipx=ipget.ipget()
#self.my_address=ipx.ipaddr("wlan0")
self.my_address=ipx.ipaddr("eth0")
return self.my_address
def parse(self,line):
#self.python2fwb(">"+line)
#self.gui.parse_command(line)
print(line)
def shutdown(self):
os.system("sudo shutdown -h now")
def client_start(self,addr):
"""クライアントのスタート"""
try:
self.sock.connect((addr, self.PORT))
print("chat_lient_ex01.py connected to the serv...
except Exception as e:
msg1="connect error, addr="+addr+" port="+str(s...
print(msg1)
tb=sys.exec_info()[2]
msg2="[message]{0}".format(e.with_traceback(tb))
print(msg2)
return
addr=self.sock.getsockname()[0]
print("connect, addr="+addr)
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def return_message(self,x):
self.sock.send((x+'\n').encode('utf-8'))
def handler(self,sock):
"""サーバからメッセージを受信し、表示する"""
print("start handler from pic_ex04.py")
sf=sock.makefile()
while True:
try:
line=sf.readline()
self.parse(line)
except Exception as e:
tb=sys.exec_info()[2]
msg2="message]{0}".format(e.with_traceback(...
print(msg2)
class App:
def __init__(self):
self.message_line=0
#self.serial=serial.Serial('/dev/ttyUSB0',115200)
#self.serial.flush()
thread=threading.Thread(target=self.read_loop,arg...
thread.start()
# Functions are defined here
def set_Remote_Command_Reader(self,reader):
self.reader=reader
def connect(self, addr):
#self.reader.client_start(self.server_address)
self.reader.client_start(addr)
self.ipx=ipget.ipget()
def shutdown(self):
os.system("sudo shutdown -h now")
def send_rcb4(self,message):
line=message+'\n'
self.serial.write(line.encode('utf-8'))
print("rcb4>"+message)
def exit_tesla(self):
self.reader.sock.close()
exit()
def shutdown(self):
self.rcb4_close()
self.reader.sock.close()
os.system("sudo shutdown -h now")
def read_loop(self):
print("start read_loop")
while True:
line=input()
l=self.r_b(line)
rest=[""]
if self.parse_key("exit.",l,rest):
self.exit_tesla()
elif self.parse_key("shutdown.",l,rest):
self.shutdown()
elif self.parse_key("a.",l,rest):
print("start a.")
self.reader.return_message("remote self.s...
self.reader.return_message("id=a self.sta...
elif self.parse_key("b.",l,rest):
print("start b.")
self.reader.return_message("remote self.s...
self.reader.return_message("id=b self.sta...
elif self.parse_key("stop.",l,rest):
self.reader.return_message("remote_self.s...
elif self.reader!=None:
self.reader.return_message(line)
else:
print('no connection to tcp server(reader...
#
# command
# shutdown
#
def parse_command(self,command):
rest=[""]
strc=[""]
ix=[""]
print("parse_command("+command+")")
return True
def parse_key(self,key,inx,rest):
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_p_int(self,command,ix,rest):
print("parse_p_int("+command+",ix,rest)")
i=0
c=command[0]
if c in ['0','1','2','3','4','5','6','7','8','9']:
clen=len(command)
while True:
i=i*10+int(c)
command=command[1:clen]
clen=len(command)
if clen==0:
ix[0]=i
rest[0]=""
return True
print("parse_p_int command="+command)
c=command[0]
if not (c in ['0','1','2','3','4','5','6'...
ix[0]=i
rest[0]=command
return True
rest[0]=command
ix[0]=0
return False
def r_b(self,inx):
rx=[""]
while self.parse_key(" ",inx,rx):
inx=rx[0]
return rx[0]
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
if __name__=="__main__":
args = sys.argv
print(args)
alen=len(args)
if alen<2:
print("usage: python3 chat_client_01.py <server_i...
exit()
ipx= args[1]
print('ip='+ipx)
app = App()
rcr=Remote_Command_Reader(app)
app.set_Remote_Command_Reader(rcr)
app.connect(ipx)
ip=rcr.get_my_address()
print(ip)
}}
----
#counter
終了行:
[[chat_server_ex01.py]]
#code(Python){{
# -*- coding:utf-8 -*-
# takashi yamanoue, fukuyama university, 2024/2/27
# 20230503
#
#import serial
import threading
import time
import math
import sys
import requests
import os
import socket
from collections import deque
import subprocess
import copy
import re
import ipget
class Remote_Command_Reader:
sock = socket.socket(socket.AF_INET, socket.SOCK_STRE...
HOST = 'localhost'
PORT = 9998
def __init__(self,app):
print("start chat_client_01.__init__")
self.app=app
self.my_address='localhost'
self.get_my_address()
def get_my_address(self):
self.my_address = socket.gethostbyname(socket.get...
ipx=ipget.ipget()
#self.my_address=ipx.ipaddr("wlan0")
self.my_address=ipx.ipaddr("eth0")
return self.my_address
def parse(self,line):
#self.python2fwb(">"+line)
#self.gui.parse_command(line)
print(line)
def shutdown(self):
os.system("sudo shutdown -h now")
def client_start(self,addr):
"""クライアントのスタート"""
try:
self.sock.connect((addr, self.PORT))
print("chat_lient_ex01.py connected to the serv...
except Exception as e:
msg1="connect error, addr="+addr+" port="+str(s...
print(msg1)
tb=sys.exec_info()[2]
msg2="[message]{0}".format(e.with_traceback(tb))
print(msg2)
return
addr=self.sock.getsockname()[0]
print("connect, addr="+addr)
handle_thread = threading.Thread(target=self.handle...
handle_thread.start()
def return_message(self,x):
self.sock.send((x+'\n').encode('utf-8'))
def handler(self,sock):
"""サーバからメッセージを受信し、表示する"""
print("start handler from pic_ex04.py")
sf=sock.makefile()
while True:
try:
line=sf.readline()
self.parse(line)
except Exception as e:
tb=sys.exec_info()[2]
msg2="message]{0}".format(e.with_traceback(...
print(msg2)
class App:
def __init__(self):
self.message_line=0
#self.serial=serial.Serial('/dev/ttyUSB0',115200)
#self.serial.flush()
thread=threading.Thread(target=self.read_loop,arg...
thread.start()
# Functions are defined here
def set_Remote_Command_Reader(self,reader):
self.reader=reader
def connect(self, addr):
#self.reader.client_start(self.server_address)
self.reader.client_start(addr)
self.ipx=ipget.ipget()
def shutdown(self):
os.system("sudo shutdown -h now")
def send_rcb4(self,message):
line=message+'\n'
self.serial.write(line.encode('utf-8'))
print("rcb4>"+message)
def exit_tesla(self):
self.reader.sock.close()
exit()
def shutdown(self):
self.rcb4_close()
self.reader.sock.close()
os.system("sudo shutdown -h now")
def read_loop(self):
print("start read_loop")
while True:
line=input()
l=self.r_b(line)
rest=[""]
if self.parse_key("exit.",l,rest):
self.exit_tesla()
elif self.parse_key("shutdown.",l,rest):
self.shutdown()
elif self.parse_key("a.",l,rest):
print("start a.")
self.reader.return_message("remote self.s...
self.reader.return_message("id=a self.sta...
elif self.parse_key("b.",l,rest):
print("start b.")
self.reader.return_message("remote self.s...
self.reader.return_message("id=b self.sta...
elif self.parse_key("stop.",l,rest):
self.reader.return_message("remote_self.s...
elif self.reader!=None:
self.reader.return_message(line)
else:
print('no connection to tcp server(reader...
#
# command
# shutdown
#
def parse_command(self,command):
rest=[""]
strc=[""]
ix=[""]
print("parse_command("+command+")")
return True
def parse_key(self,key,inx,rest):
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_p_int(self,command,ix,rest):
print("parse_p_int("+command+",ix,rest)")
i=0
c=command[0]
if c in ['0','1','2','3','4','5','6','7','8','9']:
clen=len(command)
while True:
i=i*10+int(c)
command=command[1:clen]
clen=len(command)
if clen==0:
ix[0]=i
rest[0]=""
return True
print("parse_p_int command="+command)
c=command[0]
if not (c in ['0','1','2','3','4','5','6'...
ix[0]=i
rest[0]=command
return True
rest[0]=command
ix[0]=0
return False
def r_b(self,inx):
rx=[""]
while self.parse_key(" ",inx,rx):
inx=rx[0]
return rx[0]
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
if __name__=="__main__":
args = sys.argv
print(args)
alen=len(args)
if alen<2:
print("usage: python3 chat_client_01.py <server_i...
exit()
ipx= args[1]
print('ip='+ipx)
app = App()
rcr=Remote_Command_Reader(app)
app.set_Remote_Command_Reader(rcr)
app.connect(ipx)
ip=rcr.get_my_address()
print(ip)
}}
----
#counter
ページ名: