wiki_bot_07.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
#code(Python){{
# -*- coding: utf-8 -*-
#
# wiki_bot_06.py
# t.yamanoue
# 20250227
#
import time
import requests
from requests_toolbelt import MultipartEncoder
import socket
try:
import ustruct as struct
except:
import struct
from time import sleep
import gc
import copy
import threading
import sys
from tkinter import *
from tkinter import Label
from tkinter import Button
import tkinter as tk
from tkinter import scrolledtext
#from PIL import Image, ImageTk, ImageOps
import tkinter.ttk as ttk
from tkinter import messagebox
from tkinter import filedialog
import time
import json
import pickle
import serial
import mimetypes
import traceback
import os
import subprocess
initials = {'bot_id':'yama_bot_0000_0000_0000_0001',
'waiting_term':600000,
'object_page_1':'http://192.168.10.10/fwb4pi/...
'object_page_2':'http://192.168.10.11/fwb4pi/...
'object_page_pass_1':'',
'object_page_pass_2':'',
'read_interval': 20000,
'exec_interval': 0,
'write_interval':0,
'report_length':120,
'external_program_path':'/home/pi/python/',
'external_program_name':'tcp_server_ex2.py',
'external_program_address':'localhost',
'external_program_port':'9999',
'external_program_autostart':False,
'external_program_autoconnect':False
}
class Properties:
def __init__(self):
self.properties=initials
self.file_name="bot.properties"
def set_gui(self,gui):
self.gui=gui
def load(self,fn):
try:
with open(fn,mode='rb') as f:
prop_w=pickle.load(f)
for key in prop_w:
self.properties[key]=prop_w[key]
self.map_to_gui()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
def map_to_gui(self):
for key in self.properties:
self.gui.map_to_gui(key,self.properties[key])
def map_from_gui(self):
for key in self.properties:
v=self.gui.lookup_prop(key)
self.properties[key]=v
def get(self,key):
if key in self.properties:
return self.properties[key]
return None
def set(self,key,value):
self.properties[key]=value
def save(self,fn):
self.map_from_gui()
try:
with open(fn,mode='wb') as f:
pickle.dump(self.properties,f)
except Exception as e:
print("file "+fn+" write error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
class GUI:
def __init__(self,root,bot):
self.bot=bot
self.recv_queue=[]
self.root=root
self.root.title("Wiki Bot panel")
self.root.configure(background="white")
self.root.geometry("850x800")
self.notebook =ttk.Notebook(root)
self.main_tab=tk.Frame(self.notebook, bg='white')
self.external_tab=tk.Frame(self.notebook, bg='whi...
self.notebook.add(self.main_tab, text="main", und...
self.notebook.add(self.external_tab, text="extern...
self.notebook.pack(expand=True, fill='both', padx...
yy=10
# command list
self.start_button=Button(self.main_tab, text="sta...
self.start_button.place(x=10,y=yy)
self.stop_button=Button(self.main_tab, text="stop...
self.stop_button.place(x=80,y=yy)
self.save_prop_button=Button(self.main_tab, text=...
self.save_prop_button.place(x=150,y=yy)
self.clear_message_button=Button(self.main_tab, t...
self.clear_message_button.place(x=280,y=yy)
yy=40
# this bot id, waiting term
self.bot_label=Label(self.main_tab,text="bot", wi...
self.bot_label.place(x=10,y=yy)
self.bot_id_field=Entry(self.main_tab,width=50)
self.bot_id_field.place(x=50,y=yy)
self.waiting_term_label=Label(self.main_tab,text=...
self.waiting_term_label.place(x=380,y=yy)
self.waiting_term_field=Entry(self.main_tab,width...
self.waiting_term_field.place(x=450,y=yy)
yy=70
self.page_frame = ttk.Frame(self.main_tab)
self.page_frame.grid(row=0, column=0)
# 項目をつくる
items = ['URI', 'name', 'pass']
for i in range(0, len(items)):
label_item = ttk.Label(self.page_frame,text=i...
label_item.grid(row=0, column=i)
n = 2 # 行数
# 項目1
self.object_page_table_uri = [0] * n
for i in range(0, n):
self.object_page_table_uri[i] = ttk.Entry(sel...
self.object_page_table_uri[i].grid(row=i+1, c...
# 項目2
self.object_page_table_name = [0] * n
for i in range(0, n):
self.object_page_table_name[i] = ttk.Entry(se...
self.object_page_table_name[i].grid(row=i+1, ...
# 項目3
self.object_page_table_pass = [0] * n
for i in range(0, n):
self.object_page_table_pass[i] = ttk.Entry(se...
self.object_page_table_pass[i].grid(row=i+1, ...
self.page_frame.place(x=10,y=yy)
self.param_frame = ttk.Frame(self.main_tab)
self.param_frame.grid(row=0, column=0)
# 項目をつくる
param_items = ['property', 'value']
for i in range(0, len(param_items)):
label_param_item = ttk.Label(self.param_frame...
label_param_item.grid(row=0, column=i)
n = 4 # 行数
# 項目1
self.param_table_property = [0] * n
for i in range(0, n):
self.param_table_property[i] = ttk.Entry(self...
self.param_table_property[i].grid(row=i+1, co...
# 項目2
self.param_table_value = [0] * n
for i in range(0, n):
self.param_table_value[i] = ttk.Entry(self.pa...
self.param_table_value[i].grid(row=i+1, colum...
self.param_frame.place(x=550,y=yy)
self.param_table_property[0].insert(END,'read_int...
self.param_table_property[1].insert(END,'exec_int...
self.param_table_property[2].insert(END,'write_in...
self.param_table_property[3].insert(END,'report_l...
yy=200
self.command_field_label=Label(self.main_tab,text...
self.command_field_label.place(x=10,y=yy)
self.command_field=Entry(self.main_tab,width=50)
self.command_field.place(x=150,y=yy)
self.command_enter_button=Button(self.main_tab,te...
self.command_enter_button.place(x=500,y=yy)
self.clear_command_button=Button(self.main_tab,te...
self.clear_command_button.place(x=580,y=yy)
yy=230
# script area
self.script_frame=tk.Frame(self.main_tab,width=50...
self.script_frame.place(x=10,y=yy)
self.script_area=scrolledtext.ScrolledText(self.s...
self.script_area.pack()
yy=600
# output area
self.message_frame=tk.Frame(self.main_tab,width=5...
self.message_frame.place(x=10,y=yy)
self.message_area=scrolledtext.ScrolledText(self....
self.message_area.pack()
#external tab
yy=10
self.external_label=Label(self.external_tab,text=...
self.external_label.place(x=30,y=yy)
yy=50
self.dir_name_label=Label(self.external_tab,text=...
self.dir_name_label.place(x=30,y=yy)
self.dir_name_field=Entry(self.external_tab,width...
self.dir_name_field.place(x=100,y=yy)
#self.read_button=Button(self.external_tab, text=...
#self.read_button.place(x=500,y=yy)
self.dir_button=Button(self.external_tab, text="d...
self.dir_button.place(x=320,y=yy)
self.is_autorun_selected=BooleanVar()
self.run_external_button2=Button(self.external_ta...
self.run_external_button2.place(x=420,y=yy)
self.auto_run_check=Checkbutton(self.external_tab...
self.auto_run_check.place(x=520,y=yy)
yy=100
self.file_name_label=Label(self.external_tab,text...
self.file_name_label.place(x=30,y=yy)
self.file_name_field=Entry(self.external_tab,widt...
self.file_name_field.place(x=100,y=yy)
self.read_button=Button(self.external_tab, text="...
self.read_button.place(x=420,y=yy)
self.select_button=Button(self.external_tab, text...
self.select_button.place(x=320,y=yy)
yy=140
self.is_autoconnect_selected=BooleanVar()
self.address_label=Label(self.external_tab,text="...
self.address_label.place(x=30,y=yy)
self.address_field=Entry(self.external_tab,width=...
self.address_field.place(x=100,y=yy)
self.port_label=Label(self.external_tab,text="por...
self.port_label.place(x=200,y=yy)
self.port_field=Entry(self.external_tab,width=10)
self.port_field.place(x=270,y=yy)
self.connect_button=Button(self.external_tab, tex...
self.connect_button.place(x=320,y=yy)
self.auto_connect_check=Checkbutton(self.external...
self.auto_connect_check.place(x=420,y=yy)
yy=200
self.command_label=Label(self.external_tab,text="...
self.command_label.place(x=30,y=yy)
self.command_field=Entry(self.external_tab,width=...
self.command_field.place(x=100,y=yy)
self.send_button=Button(self.external_tab, text="...
self.send_button.place(x=500,y=yy)
yy=240
self.external_frame=tk.Frame(self.external_tab,wi...
self.external_frame.place(x=50,y=yy)
self.external_area=scrolledtext.ScrolledText(self...
self.external_area.pack()
#self.run_script()
#self.set_script()
#self.display_help()
self.prop=Properties()
self.prop.set_gui(self)
def set_Pico_Time(self,time):
self.Pico_Time=time
def init_params(self):
self.prop.load(self.prop.file_name)
self.prop.map_to_gui()
def click_read_button(self):
fn=self.file_name_field.get()
print("fn="+fn)
f=None
try:
f=open(fn,"r")
except Exception as e:
print("file open error.")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
return
try:
self.script_area.delete(1.0,END)
line=f.readline()
while line:
print(line)
self.script_area.insert(END,line)
line=f.readline()
f.close()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
def click_dir_button(self):
iDir = os.path.abspath(os.path.dirname(__file__))
folder_name=tk.filedialog.askdirectory(initialdir...
if len(folder_name)==0:
print("cancel selcting folder")
else:
self.dir_name_field.insert(END,folder_name)
def click_select_file_button(self):
fType = [("python", "*.py")]
iDir=""
xdir=self.dir_name_field.get()
if xdir==None or xdir=="":
iDir=os.path.abspath(os.path.dirname(__file__))
else:
iDir=xdir
iFilePath=filedialog.askopenfilename(filetypes=fT...
if iFilePath==None or iFilePath=="":
print("cancel")
else:
self.file_name_field.delete(0,END)
self.file_name_field.insert(END,iFilePath)
def enter_command(self):
print("enter_command")
command=self.command_field.get()
self.ex(command)
def click_start_button(self):
print('click_start_button')
#exec(self.script)
self.script_thread=threading.Thread(target=self.s...
print('new thread, start thread')
#self.start_script()
self.script_thread.start()
print('script_started.')
def click_stop_button(self):
print('click_stop_button')
#exec(self.script)
if self.bot==None:
return
self.bot.stop_read_eval_print_loop()
def click_connect_button(self):
print('click_connect_button')
self.bot.connect_external()
def click_send_button(self):
print('click_send_button')
sending=self.command_field.get()
self.command_field.delete(0,END)
self.write_external(sending)
self.bot.send_external(sending)
def ex_external(self,command):
self.command_field.delete(0,END)
self.command_field.insert(END,command)
self.click_send_button()
def start_script(self):
print('start_script')
#self.script=self.script_area.get(0.,END)
#print("run_script:"+self.script)
#exec(self.script)
if self.bot==None:
return
self.bot.read_eval_print_loop()
def run_external(self):
print('run_external')
self.bot.start_external()
def click_save_prop_button(self):
self.prop.save(self.prop.file_name)
def add_uri(self,uri,i):
if i<2 and i >= 0:
self.object_page_table_uri[i].delete(0,END)
self.object_page_table_uri[i].insert(END,uri)
def map_to_gui(self,key,value):
if key=='bot_id':
self.bot_id_field.delete(0,END)
self.bot_id_field.insert(END,value)
elif key=='waiting_term':
self.waiting_term_field.delete(0,END)
self.waiting_term_field.insert(END,str(value))
elif key=='object_page_1':
self.object_page_table_uri[0].delete(0,END)
self.object_page_table_uri[0].insert(END,value)
elif key=='object_page_2':
self.object_page_table_uri[1].delete(0,END)
self.object_page_table_uri[1].insert(END,value)
elif key=='object_page_pass_1':
self.object_page_table_pass[0].delete(0,END)
self.object_page_table_pass[0].insert(END,val...
elif key=='object_page_pass_2':
self.object_page_table_pass[1].delete(0,END)
self.object_page_table_pass[1].insert(END,val...
elif key=='read_interval':
self.param_table_value[0].delete(0,END)
self.param_table_value[0].insert(END,str(valu...
elif key=='exec_interval':
self.param_table_value[1].delete(0,END)
self.param_table_value[1].insert(END,str(valu...
elif key=='write_interval':
self.param_table_value[2].delete(0,END)
self.param_table_value[2].insert(END,str(valu...
elif key=='report_length':
self.param_table_value[3].delete(0,END)
self.param_table_value[3].insert(END,str(valu...
elif key=='external_program_path':
self.dir_name_field.delete(0,END)
self.dir_name_field.insert(END,str(value))
elif key=='external_program_name':
self.file_name_field.delete(0,END)
self.file_name_field.insert(END,str(value))
elif key=='external_program_address':
self.address_field.delete(0,END)
self.address_field.insert(END,str(value))
elif key=='external_program_port':
self.port_field.delete(0,END)
self.port_field.insert(END,str(value))
elif key=='external_program_autostart':
if value:
self.auto_run_check.select()
else:
self.auto_run_check.deselect()
elif key=='external_program_autoconnect':
if value:
self.auto_connect_check.select()
else:
self.auto_connect_check.deselect()
def lookup_prop(self,key):
if key=='bot_id':
return self.bot_id_field.get()
elif key=='waiting_term':
return int(self.waiting_term_field.get())
elif key=='object_page_1':
return self.object_page_table_uri[0].get()
elif key=='object_page_2':
return self.object_page_table_uri[1].get()
elif key=='object_page_pass_1':
return self.object_page_table_pass[0].get()
elif key=='object_page_pass_2':
return self.object_page_table_pass[1].get() ...
elif key=='read_interval':
return int(self.param_table_value[0].get())
elif key=='exec_interval':
return int(self.param_table_value[1].get())
elif key=='write_interval':
return int(self.param_table_value[2].get())
elif key=='report_length':
return int(self.param_table_value[3].get())
elif key=='external_program_path':
return self.dir_name_field.get()
elif key=='external_program_name':
return self.file_name_field.get()
elif key=='external_program_address':
return self.address_field.get()
elif key=='external_program_port':
return self.port_field.get()
elif key=='external_program_autostart':
return self.is_autorun_selected.get()
elif key=='external_program_autoconnect':
return self.is_autoconnect_selected.get()
def update(self):
self.prop.map_from_gui()
self.prop.save(self.prop.file_name)
def ini_recv_queue(self):
self.recv_queue=[]
def is_not_empty_recv_queue(self):
if len(self.recv_queue)>0 :
print('recv_queue is empty')
return True
print('recv_queue is not empty')
return False
def are_in_the_recv_queue(self,messages):
print('start are_in_the_recv_queue')
for x in messages:
print('is '+x+' in the recv_queue?')
flag=False
for m in self.recv_queue:
print(m+' in the received queue')
if x in m:
print(x + ' is in the '+m)
flag=True
self.write_message("message "+x+" fou...
break
if flag==False:
return False
return True
def enter_all_command(self):
command=self.command_field_for_all_server.get()
print("send "+command+" to all")
self.write_message("send "+command+" to all")
self.coms.send_command_to_all(command)
self.command_field_for_all_server.delete(0,END)
def ex(self,command):
print("parse "+command)
command=self.r_b(command)
if self.parse_command(command):
self.write_message("OK")
print("OK")
else:
self.write_message("ERROR")
print("ERROR")
def write_script(self, message):
self.clear_script()
self.append_script(message)
def write_script_start(self,message):
self.write_script_thread=threading.Thread(target=...
self.write_script_thread.start()
def write_script_list(self,xlines):
print('start write_script_lines')
self.write_message('start_write_script_lines')
self.clear_script()
for l in xlines:
self.append_script(l)
self.script_area.update()
print('end write_script_lines')
def write_script_lines(self,lines):
print('start write_script_lines')
self.write_message('start_write_script_lines')
xlines=lines.splitlines()
self.clear_script()
for l in xlines:
self.append_script(l)
self.script_area.update()
print('end write_script_lines')
def append_script(self, message):
self.script_area.insert(tk.END,message)
self.script_area.insert(tk.END,'\n')
#self.script_area.yview_moveto(1)
def clear_script(self):
self.script_area.delete("1.0",END)
self.script_area.update()
def write_message(self, message):
#self.write_message_thread=threading.Thread(targe...
#self.write_message_thread.start()
message=message.strip()
if message=='':
return
self.append_message(message)
def clear_and_append_message(self, message):
self.clear_message()
self.append_message(message)
def append_message(self, message):
now = self.Pico_Time.get_now()
addline=now+': '+message
#print('start append_message')
self.message_area.insert(tk.END,addline)
self.message_area.insert(tk.END,'\n')
self.message_area.yview_moveto(1)
tx=self.message_area.get(0.0,tk.END)
count_lines = tx.count('\n') + 1
if count_lines>=500:
self.message_area.delete(0.0,99.0-1)
self.message_area.update()
#print('end append_message')
def clear_message(self):
self.message_area.delete("1.0",END)
self.message_area.update()
def write_external(self, message):
#self.write_message_thread=threading.Thread(targe...
#self.write_message_thread.start()
message=message.strip()
if message=='':
return
self.append_external(message)
def clear_and_append_external(self, message):
self.clear_external()
self.append_external(message)
def append_external(self, message):
now = self.Pico_Time.get_now()
addline=now+': '+message
#print('start append_message')
self.external_area.insert(tk.END,addline)
self.external_area.insert(tk.END,'\n')
self.external_area.yview_moveto(1)
tx=self.external_area.get(0.0,tk.END)
count_lines = tx.count('\n') + 1
if count_lines>=500:
self.external_area.delete(0.0,99.0-1)
self.external_area.update()
#print('end append_message')
def clear_external(self):
self.external_area.delete("1.0",END)
self.external_area.update()
class pico_net:
def __init__(self):
print('current ssid= '+ssid)
def set_access_point(self, ssid, password):
self.ssid = ssid
self.password = password
def connect(self):
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
chipid = int.from_bytes(self.wlan.config('mac'), ...
self.wlan.connect(ssid, password)
max_wait = 10
while max_wait > 0:
if self.wlan.status() < 0 or self.wlan.status...
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)
if self.wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('connected')
self.status = self.wlan.ifconfig()
print('ip = ' + self.status[0])
class pico_http:
def __init__(self):
print('pico_http')
def set_url(self, url):
self.url = url
def get(self, url):
#print('pico_http.get(urL='+url+')')
self.url=url
#print('urL='+url)
try:
r = requests.get(url)
#print(r)
r_text=r.text
r.close()
#print('pico_http.get...return')
return r_text
except:
print('pico_http.get...exception')
return ""
def put(self, url, payload):
#print('pico_http.put(urL='+url+'payload='+str(pa...
#payload is a dictionary
uri=url+'?'
for key in payload:
#print(key+'=',end="")
#print(payload[key])
uri=uri+key+'='+payload[key]+'&'
uri=uri[:-1]
#print('uri='+uri)
#r = urequests.post(uri)
#r = urequests.post(url,data=payload)
r = requests.get(uri)
if r=="":
print('pico_http.put...exception')
return ""
#print(r)
r_text=r.text
r.close()
#print(r_json)
return r_text
#post, ref: https://docs.micropython.org/en/latest/li...
def post(self, url, body, headers):
#print('pico_http.post(url='+url+' headers='+str(...
try:
r = requests.post(url,data=body,headers=heade...
#print(r)
r_text=r.text
#print(r_text)
#r_json=r.json()
r.close()
#print(r_json)
print('pico_http.post...return')
return r_text
except:
print('pico_http.post...exception')
return ""
import datetime
class pico_time:
# reference: https://wisteriahill.sakura.ne.jp/CMS/Wo...
# NTPサーバー
#ntp_host = "time-c.nist.gov"
#ntp_host = "time.cloudflare.com"
#ntp_host = "time.google.com"
ntp_host = "ntp.nict.jp"
def __init__(self):
print('pico_time')
def ptime(self):
NTP_DELTA = 2208988800
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
#addr = socket.getaddrinfo(self.ntp_host, 123)[0]...
addr=(self.ntp_host, 123)
s = socket.socket(socket.AF_INET, socket.SOCK_DGR...
# s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
wday = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
def zfill(self, s, width):
if len(s) < width:
return ("0" * (width - len(s))) + s
else:
return s
def get_now(self):
#t=self.ptime()
#t = t + 9*60*60
##print(t)
#datetimetuple = time.gmtime(t)
#year = str(datetimetuple[0])
#month = self.zfill(str(datetimetuple[1]),2)
#mday = self.zfill(str(datetimetuple[2]),2)
#hour = self.zfill(str(datetimetuple[3]),2)
#minute = self.zfill(str(datetimetuple[4]),2)
#seconds = self.zfill(str(datetimetuple[5]),2)
#wd_index = datetimetuple[6]
##日付と時刻を分けるデリミター("|")を付加
##datetime = year + "/" + month + "/" + mday + " ...
#datetime = year + "/" + month + "/" + mday + " "...
dt=str(datetime.datetime.now())
return dt
class html_parser:
html=""
tokens=[]
reserved_words={'<':'<','>':'>','&':'&','"'...
' ':' ','\n':'<br>','\t':' ...
'\\':'\','{':'{','}':'}...
}
a2c={' ':"+",'\n':"%0D%0A",'\r':"%0D",'\t':"%09",'!':...
'#':"%23",'$':"%24",'%':"%25",'&':"%26", '\'':"%2...
'*':"%2A",'+':"%2B",',':"%2C",'-':"%2D",'/':"%2F"...
';':"%3B",'<':"%3C",'=':"%3D",'>':"%3E",'?':"%3F",
'@':"%40",'[':"%5B",'\\':"%5C",']':"%5D",'^':"%5E",
'`':"%60",'{':"%7B",'|':"%7C",'}':"%7D",'~':"%7E"}
#Spechal char to ascii code
sc2a={'<':"<",'>':">",'&':"&",'"':'"',...
def __init__(self,html):
self.html=html
def html_tokenizer(self):
#print("html_tokenizer")
self.token_ith=0
w=""
tx=[""]
rest=[""]
inx=self.html
xlen=len(self.html)
while xlen>0:
if self.parse_tag(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('<'+tx[0]+'>')
#print('<'+tx[0]+'>')
inx=rest[0]
xlen=len(inx)
elif self.parse_String_Constant(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('"'+tx[0]+'"')
#print('"'+tx[0]+'"')
inx=rest[0]
xlen=len(inx)
else:
wc=inx[0]
inx=inx[1:xlen]
xlen=len(inx)
w=w+wc
if w!="":
self.tokens.append(w)
w=""
def get_html(self):
return self.html
def parse_key(self,key,inx,rest):
#print("parse_key-"+key)
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
#print("parse_String_Constant")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def parse_tag(self,inx,tag,rest):
rx=[""]
xstr=""
wstr=[""]
if self.parse_key('<',inx,rx):
#print("parse_tag")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='>':
if xlen<=0:
return False
if fx=='\\':
inx=inx[1:xlen]
if self.parse_String_Constant(inx,wstr,rx):
inx=rx[0]
xstr=xstr+'"'+wstr[0]+'"'
else:
xstr=xstr+fx
inx=inx[1:xlen]
fx=inx[0]
xlen=len(inx)
if self.parse_key('>',inx,rx):
tag[0]=xstr
rest[0]=rx[0]
return True
return False
def get_tokens(self):
return self.tokens
def next_token(self):
if self.token_ith<len(self.tokens):
self.token_ith=self.token_ith+1
return self.tokens[self.token_ith-1]
return ""
def has_more_tokens(self):
if self.token_ith<len(self.tokens):
return True
return False
def index_from(self,i,str):
p=self.html[i:].index(str)
return p
def p_url(self,p,url):
#print("p_url p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xurl=""
pw=p
xlen=len(self.html)
q=self.p_key('http://',p)
if p!=q:
xurl='http://'
else:
q=self.p_key('https://',p)
if p!=q:
xurl='https://'
else:
return pw
p=q
while not (self.html[p] in [' ','\n','\t','\r']):
p=p+1
if p>=xlen:
p=xlen
break
url[0]=xurl+self.html[q:p]
return p
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if p+keylen>len(self.html):
return p
q=p
if self.html[q:q+keylen]==key:
#print("key="+key+" have found, q="+str(q+key...
return q+keylen
#print("key="+key+" is not found, p="+str(p))
return p
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',p)
if p!=q:
#print("p_String_Constant p="+str(p)+' '+self...
p=q
fx=self.html[p]
xlen=len(self.html)-p
while fx!='"':
if xlen==0:
return pw
if fx in ['\n','\t','\r']:
return pw
if fx=='\\':
p=p+1
p=p+1
fx=self.html[p]
xlen=len(self.html)-p
q=self.p_key('"',p)
if p!=q:
strc[0]=self.html[pw+1:q-1]
#print("p_string_constant, strc="+str(str...
return q
#print("p_String_Constant, fail to find \"")
return pw
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'-'+str(month[0])+'-'+st...
return p
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
xlen=len(self.html)
if p>=xlen:
return p
while self.html[p]==' ':
p=p+1
if p>=xlen:
p=xlen
break
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.html)
while self.html[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.html[p:q]
#print("p_name p="+str(p)+" q="+str(q)+" name="+s...
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.html)
q=p
while self.html[p] in '0123456789':
q=q+1
if q>=last_p:
break
num[0]=int(self.html[p:q])
#print("p_number p="+str(p)+" q="+str(q)+" num="+...
return q
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
#print("p_qn_equation get "+nx[0]+"="+str...
return q
else:
eqs[nx[0]]=strc[0]
#print("p_an_equation get "+nx[0]+"="+str(str...
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(p,eqs)
if p==q:
return p
p=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.html.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
def p_get_between_tag_with_cond(self,p,tag_name,cond,...
#print("p_get_between_with_cond <"+tag_name+">......
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
while True:
try:
q=self.html.index('<'+tag_name,p)
#print("p_get_between_with_cond p="+str(p...
if p==q:
return pw
q=q+len('<'+tag_name)
p=q
eqs={}
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+s...
if p!=q:
if cond[0] in eqs:
#print("cond[0]="+cond[0]+" eqs[c...
if eqs[cond[0]]!=cond[1]:
continue
else:
break
except:
#print("fail to find <"+tag_name)
return pw
p=self.p_b(p)
#print("p_get_between q="+str(q)+' '+self.html[q:...
#print("pw="+str(pw))
q=self.p_key('>',p)
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if pw!=q:
start_p=q
else:
return p
try:
q=self.html.index('</'+tag_name,start_p)
except:
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("pw="+str(pw))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p]
#print("rx="+rx)
return q
def p_get_between(self,p,tag_name,eqs,rtn):
#print("p_get_between <"+tag_name+">...</"+tag_na...
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
try:
q=self.html.index('<'+tag_name,p)
except:
return pw
#print("p_get_between 2 q="+str(q)+' '+self.html[...
q=q+len('<'+tag_name)
#print("p_get_between 3 q="+str(q)+' '+self.html[...
if p==q:
return pw
p=q
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+self.html...
p=q
p=self.p_b(p)
#print("p="+str(p))
q=self.p_key('>',p)
#print("after > q="+str(q)+' p='+str(p)+' '+self....
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if p!=q:
end_p=q
rtn[0]=""
return q
else:
#print("get between "+tag_name+",fail to ...
return pw
try:
q=self.html.index('</'+tag_name,start_p)
except:
#print("get between "+tag_name+",fail to find...
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("p="+str(p))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p+1]
#print("get between "+tag_name+" rx="+rx)
return q
#def text_to_post_form(self,text):
# #print("html_parser.text_to_post_form")
# #print(text)
# post_form=""
# for i in range(len(text)):
# post_form=post_form+self.code_convert(text[i])
# #print("html_parser.text_to_post_form return")
# return post_form
def code_convert(self,code):
if code in self.a2c:
return self.a2c[code]
else:
return code
def text_to_post_form(self,text):
#print("html_parser.text_to_post_form")
#print(text)
rtn=""
for i in range(len(text)):
rtn+=self.code_convert(text[i])
return rtn
def special_char_to_text(self,text):
#print("special_char_to_text")
#print(text)
text=text.replace("<","<")
text=text.replace(">",">")
text=text.replace("&","&")
text=text.replace(""",'"')
text=text.replace("'","'")
return text
def post(self,url,body):
#payload is a dictionary
headers={'Content-Type': 'application/x-www-form-...
#def http_post(url, value):
# https://docs.openmv.io/library/urequests.html
#body = "value=%s" % (value)
#headers = {'Content-Type': 'application/x-www-fo...
try:
response = requests.post(url, data=body.encod...
r_text=response.text
response.close()
return r_text
except Exception as e:
print(e)
return ""
class pico_wiki_driver:
url=""
def __init__(self):
print('pico_wiki_driver')
#self.url=url
#self.page=page
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
def set_url(self,url):
self.url=url
def set_page(self,page):
self.page=page
def get_url(self):
return self.url
def get_html(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
#print("get_html..."+uri)
#print(all_page)
return all_page
def get_wiki_page(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
if p!=q:
#print("get_wiki_page id=body..."+all_page[rt...
body_x=all_page[rtn[0][0]:rtn[0][1]]
body=parser.special_char_to_text(body_x)
return body
def replace_wiki_page(self,new_body):
print("pico_wiki_driver.replace_wiki_page...")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['write']=array_of_equations[i...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=r[0:start_p]+new_body+r[end_p+1:]
#print("body=......")
#print(new_body)
payload['msg']=edit_html.text_to_post_form(ne...
#print('msg=......')
#print(payload['msg'])
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
body=""
body=body+'encode_hint='+(payload['encode_hint'])...
body=body+'template_page=&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'digest='+payload['digest']+'&'
body=body+'msg='+payload['msg']+'&'
body=body+'write='+payload['write']+'&'
body=body+'original='+payload['original']
#print('body='+body)
edit_html.post(self.url,body)
#print(r)
def upload_file(self,file_dir,file_name,pass_word):
# MIMEタイプの定義
#XLSX_MIMETYPE = 'application/vnd.openxmlformats-...
#file_Name = '2024-12-20-222611.jpg'
#file_dir='C:/vscode/python/bot_computing/'
file_path=file_dir+file_name
MIMETYPE = mimetypes.guess_type(file_path)[0]
print('upload file '+file_path)
# ファイル名とデータの取得
file_Data_Binary = open(file_path, 'rb').read()
# アップロード先のURL
#url = 'http://www.yama-lab.org/fwb4pi/index.php'
#page = 'test02'
url=self.url
page=self.page
# アップロードするファイルの情報を辞書に格納
files = {'attach_file': (file_name, file_Data_Bin...
'encode_hint': 'ぷ',
'pass': pass_word,
'refer': page,
'pcmd': 'post',
'plugin': 'attach'}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': url+'?'+'plugin=attach&pcmd=upload...
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(url, data=m, headers=hx)
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path=file_dir+'return.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_attachment_list(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
attach_end=q
attach_list=[]
if p==q:
return attach_list
#print("get_wiki_page id=body..."+all_page[rtn[0]...
p=rtn[0][0]
eqs={}
while True:
q=parser.p_get_between(p,'a',eqs,rtn)
if p==q:
break
if q>attach_end:
break
print(parser.html[rtn[0][0]:rtn[0][1]])
eqs_img={}
q2=parser.p_get_equations_of_the_tag(rtn[0][0...
fname=parser.html[q2:(rtn[0][1]+1)]
print(fname)
attach_list.append(fname)
q=parser.p_get_between(q2,'a',eqs,rtn)
p=q
print(attach_list)
return attach_list
def delete_attachment(self,attach_name,pass_word):
# アップロードするファイルの情報を辞書に格納
files = {'encode_hint': 'ぷ',
'plugin': 'attach',
'refer': self.page,
'file': attach_name,
'age': '0',
'pcmd': 'delete',
'newname': attach_name,
'pass': pass_word}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': self.url+'?'+'plugin=attach&pcmd=i...
+'file='+attach_name+'&refer='+self.page
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(self.url, data=m, header...
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path='return2.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_wiki_source_of_the_uri(uri):
url_page=uri.split('?')
self.set_url(url_page[0])
self.set_page(url_page[1])
self.get_wiki_source()
def get_wiki_source(self):
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['cancel']=array_of_equations[...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
wiki_source=""
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
wiki_source=r[start_p:end_p]
#body=""
#body=body+'encode_hint='+payload['encode_hint']+...
#body=body+'cmd='+payload['cmd']+'&'
#body=body+'page='+payload['page']+'&'
#body=body+'cancel='+payload['cancel']
##print('body='+body)
#r=edit_html.post(self.url,body)
wiki_source=edit_html.special_char_to_text(wiki_s...
return wiki_source
def get_result(self):
#print("pico_wiki_bot.get_result")
#r=self.get_script_and_result()
all_text=self.get_wiki_source()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
#print('pico_wiki_bot.get_result return')
#print(all_text[p+1:q])
#print('all_text size='+str(len(all_text)))
#print('p='+str(p))
#print('q='+str(q))
return all_text[p+1:q]
def get_result_of_the_uri(self,uri):
try:
url_page=uri.split('?')
self.set_url(url_page[0])
self.set_page(url_page[1])
return self.get_result()
except Exception as e:
print('error in get_result_of_the_uri:'+uri)
print(e)
class USB_Serial:
PORT0 = '/dev/ttyACM0' # Windows �̏ꍇCOM*
PORT1 = '/dev/ttyACM1'
def __init__(self):
try:
self.ser = serial.Serial(port=self.PORT0,baud...
except Exception as e:
try:
self.ser = serial.Serial(port=self.PORT1,...
except Exception as e:
print(e)
#self.GUI.write_message(str(e))
self.ser=None
self.usb_serial_input_thread=threading.Thread(tar...
self.usb_serial_input_thread.start()
def set_gui(self,gui):
self.GUI=gui;
def send_command(self,command):
self.rtn=None
print('writing '+command)
send_bytes = bytes(command+'\r\n','utf-8')
try:
if self.ser!=None:
self.ser.write(send_bytes)
self.ser.flush()
except Exception as e:
print("serial write error")
self.GUI.write_message("serial write error.")
print(e)
self.GUI.write_message(str(e))
self.rtn="0"
count=0
while True:
if self.rtn!=None:
rtnx=self.rtn
self.rtn=None
return rtnx
if count>1000:
self.rtn=None
return "0"
time.sleep(0.01)
count=count+1
def receive_loop(self):
while True:
try:
if self.ser==None:
continue
buf = self.ser.readlines()
buf = [i.decode().strip() for i in buf]
if len(buf)>0:
print(buf)
for l in buf:
if '>>>' in l:
print('please reset pico.')
continue
#print("buf[i]="+buf)
self.rtn= buf[1]
except Exception as e:
print("error receiving")
#self.GUI.write_message("error receiving")
print(e)
#self.GUI.write_message(str(e))
try:
self.ser = serial.Serial(port=self.PO...
except Exception as e:
print(e)
try:
self.ser = serial.Serial(port=sel...
except Exception as e:
print(e)
#self.GUI.write_message('error '+...
self.ser=None
self.rtn="0"
time.sleep(0.01)
#HOST="localhost"
#PORT=9999
class Python_Connector:
def __init__(self,host,port,mw):
self.host=host
self.port=port
self.connected=False
self.soc=None
self.handle_thread=None
self.message_window=mw
def set_message_window(self,window):
self.message_window=window
def set_bot(self,bot):
self.bot=bot
def is_connected(sekf):
return self.connected
def connect_address(self,address):
self.host=address
self.connect()
def connect(self):
print("re connect to host "+self.host)
if self.soc !=None:
print("close the previous host, "+self.soc.ge...
try:
self.soc.close()
self.soc=None
time.sleep(2.0)
except:
print("close failed")
try:
#socket.socket ... socket を作成し、socに代入
self.soc = socket.socket(socket.AF_INET, sock...
#self.soc.setsockopt(socket.SOL_SOCKET, socke...
#IPアドレスがhostのserver socketにsocを接続
self.soc.connect((self.host,int(self.port)))
except Exception as e:
print("connect error.")
if self.message_window:
self.message_window.write_message("error ...
self.connected=False
#受信担当の関数handlerを物threadでうごかすhandle_...
self.handle_thread=threading.Thread(target=self.h...
#handle_threadをstart
self.handle_thread.start()
self.connected=True
if self.message_window:
self.message_window.write_external("connected...
def send_command(self,command):
if self.soc!=None:
try:
self.soc.send(bytes(command+'\n',"utf-8"))
except KeyboardInterrupt:
if self.message_window:
self.message_window.write_external("e...
self.soc.close()
exit()
else:
if self.host is not None:
self.message_window.write_external("no so...
else:
self.message_window.write_external("no ho...
#受信の処理。送信threadとは別に、並行して処理を行う。
def handler(self):
print("at client, connected, start handler")
fx=self.soc.makefile('r')
try:
while True:
l=fx.readline()
line="[receive]- {}".format(l)
print(line)
if self.message_window:
self.message_window.write_external("f...
x=self.bot.handler(l)
self.bot.write_line(x)
except socket.error:
print("socket error. exit")
self.soc.close()
except Exception as e:
import traceback
traceback.print_exc()
if self.message_window:
self.message_window.write_message(sel...
if self.soc is not None:
self.soc.close()
self.soc=None
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceba...
def close(self):
self.soc.close()
self.soc=None
class pico_wiki_bot:
result_lines=[]
script_lines=[]
urls=[]
programs={}
environments={}
time_millis=0
timer=None
current_program_name=""
current_program=""
report=[]
#reportLength=120
current_url=""
current_page=""
current_text=""
next_url_index=""
def __init__(self):
print('wiki_bot')
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
self.bot=None
print('time now='+t)
self.read_initials()
#self.current_url=url
#self.current_page=page
self.next_url_index=0
self.Pico_Wiki_Driver=pico_wiki_driver()
self.all_text=""
self.u_serial=USB_Serial()
if self.Pico_Wiki_Driver.get_url()=="":
return
self.timer=Timer()
#all_text=self.Pico_Wiki_Driver.get_wiki_page()
#if all_text==None:
# return None
#self.timer.init(mode=Timer.PERIODIC, freq=1, cal...
def read_initials(self):
print("read_initials")
if self.bot==None:
return
if self.bot.prop==None:
return
self.environments['waiting_term']=self.bot.prop.g...
self.environments['read_interval']=self.bot.prop....
self.environments['exec_interval']=self.bot.prop....
self.environments['write_interval']=self.bot.prop...
self.environments['report_length']=self.bot.prop....
uri=self.bot.prop.get('object_page_1')
url=uri.split('?')[0]
self.page=uri.split('?')[1]
def set_current_page(self,page):
self.current_page=page
def set_current_url(self,url):
self.current_url=url
def set_current_pass(self,pass_w):
self.pass_word=pass_w
def clear_report(self):
self.report.clear()
def add_report_line(self,send):
#print("add_report_line, report_length="+str(self...
#print(" current report_length="+str(len(self.rep...
#print(" adding..."+send)
#while len(self.report)>(int(self.environments['r...
while len(self.report)>(int(self.GUI.lookup_prop(...
for i in range(len(self.report)-1):
self.report[i]=self.report[i+1]
self.report.pop()
self.report.append(send)
def set_GUI(self,gui):
self.GUI=gui
self.u_serial.set_gui(gui)
def init_params(self):
if self.GUI==None:
return
self.GUI.init_params()
def add_one_second_to_time_millis(self,timer):
#print("pico_wiki_bot.add_one_second_to_time_mill...
self.time_millis=self.time_millis+1000
#print("pico_wiki_bot.add_one_second_to_time_mill...
def stop_timer(self):
self.timer=None
def get_the_object_page(self,url):
#print("pico_wiki_bot.get_the_object_page")
#print("uri="+uri)
#if self.GUI!=None:
# try:
# object_page=self.GUI.lookup_prop('object...
# pass_word=self.GUI.lookup_prop('object_p...
# except:
# object_page=initials['object_page_1']
# pass_word=initials['object_page_pass_1']...
url_page=url.split('?')
#self.url=url_page[0]
#self.page=url_page[1]
#self.pass_word=pass_word
driver=pico_wiki_driver()
driver.set_url(url_page[0])
driver.set_page(url_page[1])
html=driver.get_html()
return html
def get_the_page(self,uri):
#print("pico_wiki_bot.get_the_object_page")
#print("uri="+uri)
url_page=uri.split('?')
url=url_page[0]
page=url_page[1]
driver=pico_wiki_driver()
driver.set_url(url)
driver.set_page(page)
html=driver.get_html()
if html!="":
return html
try:
object_page=self.GUI.lookup_prop('object_page...
pass_word=self.GUI.lookup_prop('object_page_p...
except:
object_page=initials['object_page_2']
pass_word=initials['object_page_pass_1']
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
if html!="":
return html
def get_script_and_result(self,url):
#print("pico_wiki_bot.get_script_and_result")
html=self.get_the_object_page(url)
if html=="":
return ""
parser=html_parser(html)
p=0
eqs={}
rtn=[(0,0)]
q=parser.p_get_between(p,'pre',eqs,rtn)
be=rtn[0]
command_and_result=parser.html[be[0]:(be[1]+1)]
self.current_text=parser.special_char_to_text(com...
return self.current_text
def get_result(self,url):
#print("pico_wiki_bot.get_result")
r=self.get_script_and_result(url)
all_text=self.current_text
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
#print('pico_wiki_bot.get_result return')
return all_text[p+1:q]
def get_script(self,url):
#print("pico_wiki_bot.get_script")
all_text=self.get_script_and_result(url)
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
x_text=all_text[p:q]
text_lines=x_text.split('\n')
include_text=[[]]
head=[]
for i in range(len(text_lines)):
line=text_lines[i]
parser=html_parser(line)
if self.is_include(parser,include_text):
inc=include_text[0]
head=text_lines[:i-1]
tail=text_lines[i+1:]
head.extend(inc)
head.extend(tail)
break
if head!=[]:
text_lines=head
return text_lines
def get_current_device_line(self):
#command_and_result=self.get_script_and_result()
command_and_result=self.current_text
parser=html_parser(command_and_result)
if command_and_result==None:
return None
p=command_and_result.index("current_device=")
p=p+len("current_device=")
try:
q=command_and_result.index("\n",p)
except:
q=len(command_and_result)
return parser.special_char_to_text(command_and_re...
def replace_wiki(self,new_wiki):
self.Pico_Wiki_Driver.replace_wiki_page(new_wiki)
#print(r)
def replace_result(self):
#print("pico_wiki_bot.replace_result")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
eqs_msg={}
rtn=[(0,0)]
body=""
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=edit_html.special_char_to_text(r[start_p...
#print("body=......")
#print(body)
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
#print("body="+body)
px=body.index("result:")+len("result:")
q=body.index("current_device=",px)
if q<0:
q=len(body)
head_area=body[0:px]
#tail_area=msg[q:]
#result_area=msg[px:q]
new_area=head_area+'\n'
for i in range(len(self.report)):
line=self.report[i]
if line.endswith('\n'):
new_area=new_area+' '+line
else:
new_area=new_area+' '+line+'\n'
#new_area=new_area+' '+self.report[i]+'\n'
bot_id=self.GUI.lookup_prop('bot_id')
new_area=new_area+" current_device=\""+bot_id+"\"...
gc.collect()
self.Pico_Wiki_Driver.replace_wiki_page(new_area)
def get_attachment_list(self):
return self.Pico_Wiki_Driver.get_attachment_list()
def upload_file(self,file_path,file_name):
self.Pico_Wiki_Driver.upload_file(file_path,file_...
def delete_attachment(self,attach_name):
self.Pico_Wiki_Driver.delete_attachment(attach_na...
def read_script(self,script_lines):
#print("read_script")
if self.GUI!=None:
try:
object_page=self.GUI.lookup_prop('object_...
pass_word=self.GUI.lookup_prop('object_pa...
except:
object_page=initials['object_page_1']
pass_word=initials['object_page_pass_1'] ...
else:
print("failed to read")
return False
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
all_text=self.get_script(object_page)
if all_text==None or all_text==[]:
script_lines[0]=None
return False
script_lines[0]=all_text
#print("-----script_lines-----")
#for i in range(len(self.script_lines)):
# print(str(i)+' - '+self.script_lines[i])
#if self.GUI != None:
# #self.GUI.write_script_start(all_text)
# self.GUI.write_script_lines(all_text)
return True
def eval_script(self):
print("eval_script")
self.interpret_lines()
#def print_result(self):
# print("print_result")
def is_object_page(self,parser):
#print("is_object_page, line="+line[:10]+"...")
p=0
p=parser.p_b(p)
q=parser.p_key("object_page ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
self.urls=[]
xurl=[""]
i=0
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
while p!=q:
i=i+1
self.urls.append(xurl[0])
p=q
p=parser.p_b(p)
q=parser.p_key("or ",p)
#print("after key or p="+str(p)+" q="+str(q))
if p==q:
break
p=q
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
#print("p="+str(p)+" q="+str(q))
#for i in range(len(self.urls)):
# print("url["+str(i)+"]="+self.urls[i])
return True
def is_device(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("current_device=",p)
if p==q:
return False
p=q
p=parser.p_b(p)
xname=[""]
q=parser.p_String_Constant(p,xname)
if p==q:
return False
p=parser.p_b(q)
self.current_device=xname[0]
q=parser.p_key(",",p)
if p==q:
return False
p=parser.p_b(q)
q=parser.p_key("Date=",p)
xdate=[]
q=parser.p_Date(p,xdate)
def is_include(self,parser,script_lines):
#print("is_object_page, line="+line[:10]+"...")
p=0
p=parser.p_b(p)
q=parser.p_key("include ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
include_pages=[]
xurl=[""]
i=0
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
while p!=q:
i=i+1
include_pages.append(xurl[0])
p=q
p=parser.p_b(p)
q=parser.p_key("or ",p)
#print("after key or p="+str(p)+" q="+str(q))
if p==q:
break
p=q
try:
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
#print("p="+str(p)+" q="+str(q))
except Exception as e:
print('error in include '+e.toString())
return False
for i in range(len(include_pages)):
print("include_pages["+str(i)+"]="+include_pa...
x_lines=self.get_script(include_pages[i])
if x_lines==None or x_lines=="":
continue
else:
script_lines[0]=x_lines
break
return True
def set_page_name(self,eq):
rv=eq['pageName']
print("set pageName="+rv)
date=self.Pico_Time.get_now()
print('ok?')
#date=2024-12-31 14:25:16.61944
time_x=(date.split())[1]
print('time_x='+time_x)
if "<hour>" in rv: #need to correct, to repl...
time_xx=time_x.split(':')
#hm=time_xx[0]+'_'+time_xx[1]
xrv=rv.replace("<hour>",time_xx[0])
page_1=self.GUI.lookup_prop('object_page_1')
url_and_page=page_1.split('?')
url_and_page[1]=xrv
new_page_1=url_and_page[0]+'?'+url_and_page[1]
print(new_page_1)
self.GUI.map_to_gui('object_page_1',new_page_1)
page_2=self.GUI.lookup_prop('object_page_2')
url_and_page=page_2.split('?')
url_and_page[1]=xrv
new_page_2=url_and_page[0]+'?'+url_and_page[1]
print(new_page_2)
self.GUI.map_to_gui('object_page_2',new_page_2)
if "<day>" in rv: #need to correct
date_x=(date.split())[0]
print('ok?')
print('date_x='+date_x)
date_xx=date_x.split('-')
xrv=rv.replace("<day>",date_xx[2])
page_1=self.GUI.lookup_prop('object_page_1')
url_and_page=page_1.split('?')
url_and_page[1]=xrv
new_page_1=url_and_page[0]+'?'+url_and_page[1]
print(new_page_1)
self.GUI.add_uri(new_page_1,0)
page_2=self.GUI.lookup_prop('object_page_2')
url_and_page=page_2.split('?')
url_and_page[1]=xrv
new_page_2=url_and_page[0]+'?'+url_and_page[1]
print(new_page_2)
self.GUI.add_uri(new_page_2,1)
def is_set_command(self,parser,p):
#print("is_set_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("set ",p)
if p==q:
#print("is_set_command return False")
return False
p=parser.p_b(q)
eq={}
q=parser.p_an_equation(p,eq)
if p==q:
return False
if "pageName" in eq:
print(eq)
self.set_page_name(eq)
return True
for key in eq:
self.environments[key]=eq[key]
self.GUI.write_message("set "+str(key)+"="+st...
self.GUI.map_to_gui(key,self.environments[key])
return True
def is_py_command(self,parser,p):
#print("is_py_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("py ",p)
if p==q:
return False
p=parser.p_b(q)
#print("p="+str(p)+",line[p:10]="+parser.html[p:p...
xname=[""]
q=parser.p_name(p,xname)
self.current_program_name=xname[0]
self.current_program=""
def is_end_command(self,parser,p):
#print("is_end_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("end ",p)
if p==q:
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
self.programs[self.current_program_name]=self.cur...
def ex_pico(self,command):
return(self.u_serial.send_command(command))
def handler(self,x):
return x
def external(self,x):
self.GUI.ex_external(x)
def send_external(self,x):
self.ext_soc.send_command(x)
def is_run_command(self,parser,p):
# reference : https://qiita.com/kyoshidajp/items/...
# reference : https://qiita.com/kammultica/items/...
print("is_run_command.."+parser.html[p:p+10])
q=parser.p_key("run ",p)
if p==q:
#print("is_run_command return False")
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
if p==q:
#print("is_run_command return False")
return False
try:
#r=self.get_result(self.current_url+'?'+self....
r=self.get_result(self.url+'?'+self.page)
except Exception as e:
print("error in run command, get_result..:"+s...
if r!=None and r!="":
self.clear_report()
lines=r.split('\n')
for i in range(len(lines)):
self.add_report_line(lines[i])
exec_locals={'self':self}
#print("line="+parser.html)
#print("xname[0]="+xname[0])
program=self.programs[xname[0]]
print("program="+program)
try:
exec(program,globals(),exec_locals)
except Exception as e:
print("fail to exec "+program)
self.add_report_line("fail to exec:"+xname[0])
self.GUI.write_message("fail to exec "+program)
tb = sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
#print(str(e))
self.GUI.write_message(str(e))
traceback.print_stack()
traceback.format_exc()
#self.add_report_line(str(e))
#sys.print_exception()
#try:
# exec(program)
#except:
# print("fail to exec "+program)
# self.add_report_line("fail to exec:"+xname[0])
# sys.print_exception()
def is_command(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("command:",p)
#print("line="+parser.html)
#print("command="+parser.html[q:])
if p==q:
return False
q=parser.p_b(q)
#lx=line[q:]
if self.is_set_command(parser,q):
return True
if self.is_py_command(parser,q):
return True
if self.is_end_command(parser,q):
return True
if self.is_run_command(parser,q):
return True
return False
def is_py(self,parser):
#print("is_py line="+line[:10])
p=0
p=parser.p_b(p)
q=parser.p_key("py: ",p)
if p==q:
return False
lx=parser.html[q:]
self.current_program=self.current_program+lx+'\n'
def interpret_a_line(self,line):
#print("interpret_a_line:"+line)
parser=html_parser(line)
if self.is_object_page(parser):
for i in range(len(self.urls)):
uri=self.urls[i]
self.GUI.add_uri(uri,i)
return
if self.is_device(parser):
return
#if self.is_include(parser):
# return
if self.is_command(parser):
return
if self.is_py(parser):
return
def interpret_lines(self):
#print("interpret_lines")
for i in range(len(self.script_lines)):
try:
self.interpret_a_line(self.script_lines[i])
except Exception as e:
line="error in '"+self.script_lines[i]+"'"
print(line)
self.write_line(line)
line="...:{e}"
print(line)
self.write_line(line)
self.send_result()
def next_url(self):
print("next_url")
if len(self.urls)==0:
return
if int(self.next_url_index)>=len(self.urls):
self.next_url_index=0
urlx=self.urls[int(self.next_url_index)]
p=urlx.index("?")
self.current_page=urlx[p+1:]
#self.current_url=urlx[0:p-1]
self.current_url=urlx[0:p]
print("current_url="+self.current_url)
print("current_page="+self.current_page)
self.next_url_index=int(self.next_url_index)+1
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
def send_result(self):
print("send_result")
self.GUI.write_message("send_result")
try:
self.replace_result()
except Exception as e:
print('error while send_result')
print(e)
def write_line(self,line):
#print("write_line")
try:
self.GUI.write_message("add_report "+line)
self.add_report_line(line)
except Exception as e:
print('error while write_line('+line+')')
print(e)
def read_eval_print_loop(self):
print("read_eval_print_loop")
self.continue_read_eval_print_loop=True
last_read_time=int(time.time() * 1000)
last_eval_time=last_read_time
last_write_time=last_read_time
while self.continue_read_eval_print_loop:
gc.collect()
current_time=int(time.time() * 1000)
if current_time<last_read_time:
last_read_time=current_time
if current_time-last_read_time>self.GUI.looku...
last_read_time=current_time
new_lines=[None]
if self.read_script(new_lines):
if self.GUI != None:
#self.GUI.write_script_start(all_...
self.GUI.write_script_list(new_li...
self.script_lines=new_lines[0]
if self.GUI.lookup_prop('exec_interva...
self.eval_script()
self.GUI.update()
#else:
# self.next_url()
# continue
if self.GUI.lookup_prop('exec_interval')>0:
exec_interval_time=self.GUI.lookup_prop('...
if current_time-last_eval_time>exec_inter...
last_eval_time=current_time
self.eval_script()
if self.GUI.lookup_prop('write_interval')>0:
if current_time-last_write_time>self.GUI....
last_write_time=current_time
self.send_result()
#time.sleep_ms(100)
time.sleep(0.1)
def stop_read_eval_print_loop(self):
self.continue_read_eval_print_loop=False
def init_params(self):
uri=self.GUI.lookup_prop('object_page_1')
pass_word=self.GUI.lookup_prop('object_page_pass_...
page=uri.split('?')[1]
url=uri.split('?')[0]
self.set_current_url(url)
self.set_current_page(page)
self.set_current_pass(pass_word)
self.next_url_index=0
def start_external(self):
dir=self.GUI.lookup_prop('external_program_path')
file=self.GUI.lookup_prop('external_program_name')
path=dir+file
try:
subprocess.Popen('python3 '+path,shell=True) ...
except Exception as e:
import traceback
traceback.print_exec()
print(e)
print('error')
return
self.GUI.write_external('start '+file)
def connect_external(self):
time.sleep(5.0)
try:
address=self.GUI.lookup_prop('external_progra...
port=self.GUI.lookup_prop('external_program_p...
self.ext_soc=Python_Connector(address,port,se...
self.ext_soc.set_bot(self)
self.ext_soc.connect()
except Exception as e:
import traceback
traceback.print_exc()
print(e)
print('error')
return
def get_external_connection(self):
return self.ext_soc
def start_thread(gui):
gui.click_start_button()
def main() -> None:
args = sys.argv
if len(args)>=2:
fst_opt=args[1]
else:
fst_opt=None
print('fst_opt='+str(fst_opt))
root=Tk()
bot=pico_wiki_bot()
gui=GUI(root,bot)
bot.set_GUI(gui)
gui.set_Pico_Time(bot.Pico_Time)
gui.init_params()
bot.init_params()
#if gui.is_autorun_selected.get():
# bot.start_external()
#if gui.is_autoconnect_selected.get():
# bot.connect_external()
if fst_opt=='-s':
if gui.is_autorun_selected.get():
bot.start_external()
if gui.is_autoconnect_selected.get():
bot.connect_external()
start_thread(gui)
root.mainloop()
#print("-----------get_script_and_result-------------...
#print(bot.get_script_and_result())
#print("-----------get_result-----------------")
#print(bot.get_result())
#print("-----------get_script-----------------")
#print(bot.get_script())
#print("-----------get_current_device_line-----------...
#print(bot.get_current_device_line())
#print("-----------replace_result-----------------")
#bot.replace_result("Hello!\nThis is a test2....\nxxx...
#print("-----------stop_timer--------------------")
#bot.stop_timer()
#bot.read_eval_print_loop()
if __name__=='__main__':
main()
}}
終了行:
#code(Python){{
# -*- coding: utf-8 -*-
#
# wiki_bot_06.py
# t.yamanoue
# 20250227
#
import time
import requests
from requests_toolbelt import MultipartEncoder
import socket
try:
import ustruct as struct
except:
import struct
from time import sleep
import gc
import copy
import threading
import sys
from tkinter import *
from tkinter import Label
from tkinter import Button
import tkinter as tk
from tkinter import scrolledtext
#from PIL import Image, ImageTk, ImageOps
import tkinter.ttk as ttk
from tkinter import messagebox
from tkinter import filedialog
import time
import json
import pickle
import serial
import mimetypes
import traceback
import os
import subprocess
initials = {'bot_id':'yama_bot_0000_0000_0000_0001',
'waiting_term':600000,
'object_page_1':'http://192.168.10.10/fwb4pi/...
'object_page_2':'http://192.168.10.11/fwb4pi/...
'object_page_pass_1':'',
'object_page_pass_2':'',
'read_interval': 20000,
'exec_interval': 0,
'write_interval':0,
'report_length':120,
'external_program_path':'/home/pi/python/',
'external_program_name':'tcp_server_ex2.py',
'external_program_address':'localhost',
'external_program_port':'9999',
'external_program_autostart':False,
'external_program_autoconnect':False
}
class Properties:
def __init__(self):
self.properties=initials
self.file_name="bot.properties"
def set_gui(self,gui):
self.gui=gui
def load(self,fn):
try:
with open(fn,mode='rb') as f:
prop_w=pickle.load(f)
for key in prop_w:
self.properties[key]=prop_w[key]
self.map_to_gui()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
def map_to_gui(self):
for key in self.properties:
self.gui.map_to_gui(key,self.properties[key])
def map_from_gui(self):
for key in self.properties:
v=self.gui.lookup_prop(key)
self.properties[key]=v
def get(self,key):
if key in self.properties:
return self.properties[key]
return None
def set(self,key,value):
self.properties[key]=value
def save(self,fn):
self.map_from_gui()
try:
with open(fn,mode='wb') as f:
pickle.dump(self.properties,f)
except Exception as e:
print("file "+fn+" write error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
class GUI:
def __init__(self,root,bot):
self.bot=bot
self.recv_queue=[]
self.root=root
self.root.title("Wiki Bot panel")
self.root.configure(background="white")
self.root.geometry("850x800")
self.notebook =ttk.Notebook(root)
self.main_tab=tk.Frame(self.notebook, bg='white')
self.external_tab=tk.Frame(self.notebook, bg='whi...
self.notebook.add(self.main_tab, text="main", und...
self.notebook.add(self.external_tab, text="extern...
self.notebook.pack(expand=True, fill='both', padx...
yy=10
# command list
self.start_button=Button(self.main_tab, text="sta...
self.start_button.place(x=10,y=yy)
self.stop_button=Button(self.main_tab, text="stop...
self.stop_button.place(x=80,y=yy)
self.save_prop_button=Button(self.main_tab, text=...
self.save_prop_button.place(x=150,y=yy)
self.clear_message_button=Button(self.main_tab, t...
self.clear_message_button.place(x=280,y=yy)
yy=40
# this bot id, waiting term
self.bot_label=Label(self.main_tab,text="bot", wi...
self.bot_label.place(x=10,y=yy)
self.bot_id_field=Entry(self.main_tab,width=50)
self.bot_id_field.place(x=50,y=yy)
self.waiting_term_label=Label(self.main_tab,text=...
self.waiting_term_label.place(x=380,y=yy)
self.waiting_term_field=Entry(self.main_tab,width...
self.waiting_term_field.place(x=450,y=yy)
yy=70
self.page_frame = ttk.Frame(self.main_tab)
self.page_frame.grid(row=0, column=0)
# 項目をつくる
items = ['URI', 'name', 'pass']
for i in range(0, len(items)):
label_item = ttk.Label(self.page_frame,text=i...
label_item.grid(row=0, column=i)
n = 2 # 行数
# 項目1
self.object_page_table_uri = [0] * n
for i in range(0, n):
self.object_page_table_uri[i] = ttk.Entry(sel...
self.object_page_table_uri[i].grid(row=i+1, c...
# 項目2
self.object_page_table_name = [0] * n
for i in range(0, n):
self.object_page_table_name[i] = ttk.Entry(se...
self.object_page_table_name[i].grid(row=i+1, ...
# 項目3
self.object_page_table_pass = [0] * n
for i in range(0, n):
self.object_page_table_pass[i] = ttk.Entry(se...
self.object_page_table_pass[i].grid(row=i+1, ...
self.page_frame.place(x=10,y=yy)
self.param_frame = ttk.Frame(self.main_tab)
self.param_frame.grid(row=0, column=0)
# 項目をつくる
param_items = ['property', 'value']
for i in range(0, len(param_items)):
label_param_item = ttk.Label(self.param_frame...
label_param_item.grid(row=0, column=i)
n = 4 # 行数
# 項目1
self.param_table_property = [0] * n
for i in range(0, n):
self.param_table_property[i] = ttk.Entry(self...
self.param_table_property[i].grid(row=i+1, co...
# 項目2
self.param_table_value = [0] * n
for i in range(0, n):
self.param_table_value[i] = ttk.Entry(self.pa...
self.param_table_value[i].grid(row=i+1, colum...
self.param_frame.place(x=550,y=yy)
self.param_table_property[0].insert(END,'read_int...
self.param_table_property[1].insert(END,'exec_int...
self.param_table_property[2].insert(END,'write_in...
self.param_table_property[3].insert(END,'report_l...
yy=200
self.command_field_label=Label(self.main_tab,text...
self.command_field_label.place(x=10,y=yy)
self.command_field=Entry(self.main_tab,width=50)
self.command_field.place(x=150,y=yy)
self.command_enter_button=Button(self.main_tab,te...
self.command_enter_button.place(x=500,y=yy)
self.clear_command_button=Button(self.main_tab,te...
self.clear_command_button.place(x=580,y=yy)
yy=230
# script area
self.script_frame=tk.Frame(self.main_tab,width=50...
self.script_frame.place(x=10,y=yy)
self.script_area=scrolledtext.ScrolledText(self.s...
self.script_area.pack()
yy=600
# output area
self.message_frame=tk.Frame(self.main_tab,width=5...
self.message_frame.place(x=10,y=yy)
self.message_area=scrolledtext.ScrolledText(self....
self.message_area.pack()
#external tab
yy=10
self.external_label=Label(self.external_tab,text=...
self.external_label.place(x=30,y=yy)
yy=50
self.dir_name_label=Label(self.external_tab,text=...
self.dir_name_label.place(x=30,y=yy)
self.dir_name_field=Entry(self.external_tab,width...
self.dir_name_field.place(x=100,y=yy)
#self.read_button=Button(self.external_tab, text=...
#self.read_button.place(x=500,y=yy)
self.dir_button=Button(self.external_tab, text="d...
self.dir_button.place(x=320,y=yy)
self.is_autorun_selected=BooleanVar()
self.run_external_button2=Button(self.external_ta...
self.run_external_button2.place(x=420,y=yy)
self.auto_run_check=Checkbutton(self.external_tab...
self.auto_run_check.place(x=520,y=yy)
yy=100
self.file_name_label=Label(self.external_tab,text...
self.file_name_label.place(x=30,y=yy)
self.file_name_field=Entry(self.external_tab,widt...
self.file_name_field.place(x=100,y=yy)
self.read_button=Button(self.external_tab, text="...
self.read_button.place(x=420,y=yy)
self.select_button=Button(self.external_tab, text...
self.select_button.place(x=320,y=yy)
yy=140
self.is_autoconnect_selected=BooleanVar()
self.address_label=Label(self.external_tab,text="...
self.address_label.place(x=30,y=yy)
self.address_field=Entry(self.external_tab,width=...
self.address_field.place(x=100,y=yy)
self.port_label=Label(self.external_tab,text="por...
self.port_label.place(x=200,y=yy)
self.port_field=Entry(self.external_tab,width=10)
self.port_field.place(x=270,y=yy)
self.connect_button=Button(self.external_tab, tex...
self.connect_button.place(x=320,y=yy)
self.auto_connect_check=Checkbutton(self.external...
self.auto_connect_check.place(x=420,y=yy)
yy=200
self.command_label=Label(self.external_tab,text="...
self.command_label.place(x=30,y=yy)
self.command_field=Entry(self.external_tab,width=...
self.command_field.place(x=100,y=yy)
self.send_button=Button(self.external_tab, text="...
self.send_button.place(x=500,y=yy)
yy=240
self.external_frame=tk.Frame(self.external_tab,wi...
self.external_frame.place(x=50,y=yy)
self.external_area=scrolledtext.ScrolledText(self...
self.external_area.pack()
#self.run_script()
#self.set_script()
#self.display_help()
self.prop=Properties()
self.prop.set_gui(self)
def set_Pico_Time(self,time):
self.Pico_Time=time
def init_params(self):
self.prop.load(self.prop.file_name)
self.prop.map_to_gui()
def click_read_button(self):
fn=self.file_name_field.get()
print("fn="+fn)
f=None
try:
f=open(fn,"r")
except Exception as e:
print("file open error.")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
return
try:
self.script_area.delete(1.0,END)
line=f.readline()
while line:
print(line)
self.script_area.insert(END,line)
line=f.readline()
f.close()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
def click_dir_button(self):
iDir = os.path.abspath(os.path.dirname(__file__))
folder_name=tk.filedialog.askdirectory(initialdir...
if len(folder_name)==0:
print("cancel selcting folder")
else:
self.dir_name_field.insert(END,folder_name)
def click_select_file_button(self):
fType = [("python", "*.py")]
iDir=""
xdir=self.dir_name_field.get()
if xdir==None or xdir=="":
iDir=os.path.abspath(os.path.dirname(__file__))
else:
iDir=xdir
iFilePath=filedialog.askopenfilename(filetypes=fT...
if iFilePath==None or iFilePath=="":
print("cancel")
else:
self.file_name_field.delete(0,END)
self.file_name_field.insert(END,iFilePath)
def enter_command(self):
print("enter_command")
command=self.command_field.get()
self.ex(command)
def click_start_button(self):
print('click_start_button')
#exec(self.script)
self.script_thread=threading.Thread(target=self.s...
print('new thread, start thread')
#self.start_script()
self.script_thread.start()
print('script_started.')
def click_stop_button(self):
print('click_stop_button')
#exec(self.script)
if self.bot==None:
return
self.bot.stop_read_eval_print_loop()
def click_connect_button(self):
print('click_connect_button')
self.bot.connect_external()
def click_send_button(self):
print('click_send_button')
sending=self.command_field.get()
self.command_field.delete(0,END)
self.write_external(sending)
self.bot.send_external(sending)
def ex_external(self,command):
self.command_field.delete(0,END)
self.command_field.insert(END,command)
self.click_send_button()
def start_script(self):
print('start_script')
#self.script=self.script_area.get(0.,END)
#print("run_script:"+self.script)
#exec(self.script)
if self.bot==None:
return
self.bot.read_eval_print_loop()
def run_external(self):
print('run_external')
self.bot.start_external()
def click_save_prop_button(self):
self.prop.save(self.prop.file_name)
def add_uri(self,uri,i):
if i<2 and i >= 0:
self.object_page_table_uri[i].delete(0,END)
self.object_page_table_uri[i].insert(END,uri)
def map_to_gui(self,key,value):
if key=='bot_id':
self.bot_id_field.delete(0,END)
self.bot_id_field.insert(END,value)
elif key=='waiting_term':
self.waiting_term_field.delete(0,END)
self.waiting_term_field.insert(END,str(value))
elif key=='object_page_1':
self.object_page_table_uri[0].delete(0,END)
self.object_page_table_uri[0].insert(END,value)
elif key=='object_page_2':
self.object_page_table_uri[1].delete(0,END)
self.object_page_table_uri[1].insert(END,value)
elif key=='object_page_pass_1':
self.object_page_table_pass[0].delete(0,END)
self.object_page_table_pass[0].insert(END,val...
elif key=='object_page_pass_2':
self.object_page_table_pass[1].delete(0,END)
self.object_page_table_pass[1].insert(END,val...
elif key=='read_interval':
self.param_table_value[0].delete(0,END)
self.param_table_value[0].insert(END,str(valu...
elif key=='exec_interval':
self.param_table_value[1].delete(0,END)
self.param_table_value[1].insert(END,str(valu...
elif key=='write_interval':
self.param_table_value[2].delete(0,END)
self.param_table_value[2].insert(END,str(valu...
elif key=='report_length':
self.param_table_value[3].delete(0,END)
self.param_table_value[3].insert(END,str(valu...
elif key=='external_program_path':
self.dir_name_field.delete(0,END)
self.dir_name_field.insert(END,str(value))
elif key=='external_program_name':
self.file_name_field.delete(0,END)
self.file_name_field.insert(END,str(value))
elif key=='external_program_address':
self.address_field.delete(0,END)
self.address_field.insert(END,str(value))
elif key=='external_program_port':
self.port_field.delete(0,END)
self.port_field.insert(END,str(value))
elif key=='external_program_autostart':
if value:
self.auto_run_check.select()
else:
self.auto_run_check.deselect()
elif key=='external_program_autoconnect':
if value:
self.auto_connect_check.select()
else:
self.auto_connect_check.deselect()
def lookup_prop(self,key):
if key=='bot_id':
return self.bot_id_field.get()
elif key=='waiting_term':
return int(self.waiting_term_field.get())
elif key=='object_page_1':
return self.object_page_table_uri[0].get()
elif key=='object_page_2':
return self.object_page_table_uri[1].get()
elif key=='object_page_pass_1':
return self.object_page_table_pass[0].get()
elif key=='object_page_pass_2':
return self.object_page_table_pass[1].get() ...
elif key=='read_interval':
return int(self.param_table_value[0].get())
elif key=='exec_interval':
return int(self.param_table_value[1].get())
elif key=='write_interval':
return int(self.param_table_value[2].get())
elif key=='report_length':
return int(self.param_table_value[3].get())
elif key=='external_program_path':
return self.dir_name_field.get()
elif key=='external_program_name':
return self.file_name_field.get()
elif key=='external_program_address':
return self.address_field.get()
elif key=='external_program_port':
return self.port_field.get()
elif key=='external_program_autostart':
return self.is_autorun_selected.get()
elif key=='external_program_autoconnect':
return self.is_autoconnect_selected.get()
def update(self):
self.prop.map_from_gui()
self.prop.save(self.prop.file_name)
def ini_recv_queue(self):
self.recv_queue=[]
def is_not_empty_recv_queue(self):
if len(self.recv_queue)>0 :
print('recv_queue is empty')
return True
print('recv_queue is not empty')
return False
def are_in_the_recv_queue(self,messages):
print('start are_in_the_recv_queue')
for x in messages:
print('is '+x+' in the recv_queue?')
flag=False
for m in self.recv_queue:
print(m+' in the received queue')
if x in m:
print(x + ' is in the '+m)
flag=True
self.write_message("message "+x+" fou...
break
if flag==False:
return False
return True
def enter_all_command(self):
command=self.command_field_for_all_server.get()
print("send "+command+" to all")
self.write_message("send "+command+" to all")
self.coms.send_command_to_all(command)
self.command_field_for_all_server.delete(0,END)
def ex(self,command):
print("parse "+command)
command=self.r_b(command)
if self.parse_command(command):
self.write_message("OK")
print("OK")
else:
self.write_message("ERROR")
print("ERROR")
def write_script(self, message):
self.clear_script()
self.append_script(message)
def write_script_start(self,message):
self.write_script_thread=threading.Thread(target=...
self.write_script_thread.start()
def write_script_list(self,xlines):
print('start write_script_lines')
self.write_message('start_write_script_lines')
self.clear_script()
for l in xlines:
self.append_script(l)
self.script_area.update()
print('end write_script_lines')
def write_script_lines(self,lines):
print('start write_script_lines')
self.write_message('start_write_script_lines')
xlines=lines.splitlines()
self.clear_script()
for l in xlines:
self.append_script(l)
self.script_area.update()
print('end write_script_lines')
def append_script(self, message):
self.script_area.insert(tk.END,message)
self.script_area.insert(tk.END,'\n')
#self.script_area.yview_moveto(1)
def clear_script(self):
self.script_area.delete("1.0",END)
self.script_area.update()
def write_message(self, message):
#self.write_message_thread=threading.Thread(targe...
#self.write_message_thread.start()
message=message.strip()
if message=='':
return
self.append_message(message)
def clear_and_append_message(self, message):
self.clear_message()
self.append_message(message)
def append_message(self, message):
now = self.Pico_Time.get_now()
addline=now+': '+message
#print('start append_message')
self.message_area.insert(tk.END,addline)
self.message_area.insert(tk.END,'\n')
self.message_area.yview_moveto(1)
tx=self.message_area.get(0.0,tk.END)
count_lines = tx.count('\n') + 1
if count_lines>=500:
self.message_area.delete(0.0,99.0-1)
self.message_area.update()
#print('end append_message')
def clear_message(self):
self.message_area.delete("1.0",END)
self.message_area.update()
def write_external(self, message):
#self.write_message_thread=threading.Thread(targe...
#self.write_message_thread.start()
message=message.strip()
if message=='':
return
self.append_external(message)
def clear_and_append_external(self, message):
self.clear_external()
self.append_external(message)
def append_external(self, message):
now = self.Pico_Time.get_now()
addline=now+': '+message
#print('start append_message')
self.external_area.insert(tk.END,addline)
self.external_area.insert(tk.END,'\n')
self.external_area.yview_moveto(1)
tx=self.external_area.get(0.0,tk.END)
count_lines = tx.count('\n') + 1
if count_lines>=500:
self.external_area.delete(0.0,99.0-1)
self.external_area.update()
#print('end append_message')
def clear_external(self):
self.external_area.delete("1.0",END)
self.external_area.update()
class pico_net:
def __init__(self):
print('current ssid= '+ssid)
def set_access_point(self, ssid, password):
self.ssid = ssid
self.password = password
def connect(self):
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
chipid = int.from_bytes(self.wlan.config('mac'), ...
self.wlan.connect(ssid, password)
max_wait = 10
while max_wait > 0:
if self.wlan.status() < 0 or self.wlan.status...
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)
if self.wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('connected')
self.status = self.wlan.ifconfig()
print('ip = ' + self.status[0])
class pico_http:
def __init__(self):
print('pico_http')
def set_url(self, url):
self.url = url
def get(self, url):
#print('pico_http.get(urL='+url+')')
self.url=url
#print('urL='+url)
try:
r = requests.get(url)
#print(r)
r_text=r.text
r.close()
#print('pico_http.get...return')
return r_text
except:
print('pico_http.get...exception')
return ""
def put(self, url, payload):
#print('pico_http.put(urL='+url+'payload='+str(pa...
#payload is a dictionary
uri=url+'?'
for key in payload:
#print(key+'=',end="")
#print(payload[key])
uri=uri+key+'='+payload[key]+'&'
uri=uri[:-1]
#print('uri='+uri)
#r = urequests.post(uri)
#r = urequests.post(url,data=payload)
r = requests.get(uri)
if r=="":
print('pico_http.put...exception')
return ""
#print(r)
r_text=r.text
r.close()
#print(r_json)
return r_text
#post, ref: https://docs.micropython.org/en/latest/li...
def post(self, url, body, headers):
#print('pico_http.post(url='+url+' headers='+str(...
try:
r = requests.post(url,data=body,headers=heade...
#print(r)
r_text=r.text
#print(r_text)
#r_json=r.json()
r.close()
#print(r_json)
print('pico_http.post...return')
return r_text
except:
print('pico_http.post...exception')
return ""
import datetime
class pico_time:
# reference: https://wisteriahill.sakura.ne.jp/CMS/Wo...
# NTPサーバー
#ntp_host = "time-c.nist.gov"
#ntp_host = "time.cloudflare.com"
#ntp_host = "time.google.com"
ntp_host = "ntp.nict.jp"
def __init__(self):
print('pico_time')
def ptime(self):
NTP_DELTA = 2208988800
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
#addr = socket.getaddrinfo(self.ntp_host, 123)[0]...
addr=(self.ntp_host, 123)
s = socket.socket(socket.AF_INET, socket.SOCK_DGR...
# s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
wday = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
def zfill(self, s, width):
if len(s) < width:
return ("0" * (width - len(s))) + s
else:
return s
def get_now(self):
#t=self.ptime()
#t = t + 9*60*60
##print(t)
#datetimetuple = time.gmtime(t)
#year = str(datetimetuple[0])
#month = self.zfill(str(datetimetuple[1]),2)
#mday = self.zfill(str(datetimetuple[2]),2)
#hour = self.zfill(str(datetimetuple[3]),2)
#minute = self.zfill(str(datetimetuple[4]),2)
#seconds = self.zfill(str(datetimetuple[5]),2)
#wd_index = datetimetuple[6]
##日付と時刻を分けるデリミター("|")を付加
##datetime = year + "/" + month + "/" + mday + " ...
#datetime = year + "/" + month + "/" + mday + " "...
dt=str(datetime.datetime.now())
return dt
class html_parser:
html=""
tokens=[]
reserved_words={'<':'<','>':'>','&':'&','"'...
' ':' ','\n':'<br>','\t':' ...
'\\':'\','{':'{','}':'}...
}
a2c={' ':"+",'\n':"%0D%0A",'\r':"%0D",'\t':"%09",'!':...
'#':"%23",'$':"%24",'%':"%25",'&':"%26", '\'':"%2...
'*':"%2A",'+':"%2B",',':"%2C",'-':"%2D",'/':"%2F"...
';':"%3B",'<':"%3C",'=':"%3D",'>':"%3E",'?':"%3F",
'@':"%40",'[':"%5B",'\\':"%5C",']':"%5D",'^':"%5E",
'`':"%60",'{':"%7B",'|':"%7C",'}':"%7D",'~':"%7E"}
#Spechal char to ascii code
sc2a={'<':"<",'>':">",'&':"&",'"':'"',...
def __init__(self,html):
self.html=html
def html_tokenizer(self):
#print("html_tokenizer")
self.token_ith=0
w=""
tx=[""]
rest=[""]
inx=self.html
xlen=len(self.html)
while xlen>0:
if self.parse_tag(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('<'+tx[0]+'>')
#print('<'+tx[0]+'>')
inx=rest[0]
xlen=len(inx)
elif self.parse_String_Constant(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('"'+tx[0]+'"')
#print('"'+tx[0]+'"')
inx=rest[0]
xlen=len(inx)
else:
wc=inx[0]
inx=inx[1:xlen]
xlen=len(inx)
w=w+wc
if w!="":
self.tokens.append(w)
w=""
def get_html(self):
return self.html
def parse_key(self,key,inx,rest):
#print("parse_key-"+key)
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
#print("parse_String_Constant")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def parse_tag(self,inx,tag,rest):
rx=[""]
xstr=""
wstr=[""]
if self.parse_key('<',inx,rx):
#print("parse_tag")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='>':
if xlen<=0:
return False
if fx=='\\':
inx=inx[1:xlen]
if self.parse_String_Constant(inx,wstr,rx):
inx=rx[0]
xstr=xstr+'"'+wstr[0]+'"'
else:
xstr=xstr+fx
inx=inx[1:xlen]
fx=inx[0]
xlen=len(inx)
if self.parse_key('>',inx,rx):
tag[0]=xstr
rest[0]=rx[0]
return True
return False
def get_tokens(self):
return self.tokens
def next_token(self):
if self.token_ith<len(self.tokens):
self.token_ith=self.token_ith+1
return self.tokens[self.token_ith-1]
return ""
def has_more_tokens(self):
if self.token_ith<len(self.tokens):
return True
return False
def index_from(self,i,str):
p=self.html[i:].index(str)
return p
def p_url(self,p,url):
#print("p_url p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xurl=""
pw=p
xlen=len(self.html)
q=self.p_key('http://',p)
if p!=q:
xurl='http://'
else:
q=self.p_key('https://',p)
if p!=q:
xurl='https://'
else:
return pw
p=q
while not (self.html[p] in [' ','\n','\t','\r']):
p=p+1
if p>=xlen:
p=xlen
break
url[0]=xurl+self.html[q:p]
return p
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if p+keylen>len(self.html):
return p
q=p
if self.html[q:q+keylen]==key:
#print("key="+key+" have found, q="+str(q+key...
return q+keylen
#print("key="+key+" is not found, p="+str(p))
return p
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',p)
if p!=q:
#print("p_String_Constant p="+str(p)+' '+self...
p=q
fx=self.html[p]
xlen=len(self.html)-p
while fx!='"':
if xlen==0:
return pw
if fx in ['\n','\t','\r']:
return pw
if fx=='\\':
p=p+1
p=p+1
fx=self.html[p]
xlen=len(self.html)-p
q=self.p_key('"',p)
if p!=q:
strc[0]=self.html[pw+1:q-1]
#print("p_string_constant, strc="+str(str...
return q
#print("p_String_Constant, fail to find \"")
return pw
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'-'+str(month[0])+'-'+st...
return p
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
xlen=len(self.html)
if p>=xlen:
return p
while self.html[p]==' ':
p=p+1
if p>=xlen:
p=xlen
break
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.html)
while self.html[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.html[p:q]
#print("p_name p="+str(p)+" q="+str(q)+" name="+s...
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.html)
q=p
while self.html[p] in '0123456789':
q=q+1
if q>=last_p:
break
num[0]=int(self.html[p:q])
#print("p_number p="+str(p)+" q="+str(q)+" num="+...
return q
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
#print("p_qn_equation get "+nx[0]+"="+str...
return q
else:
eqs[nx[0]]=strc[0]
#print("p_an_equation get "+nx[0]+"="+str(str...
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(p,eqs)
if p==q:
return p
p=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.html.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
def p_get_between_tag_with_cond(self,p,tag_name,cond,...
#print("p_get_between_with_cond <"+tag_name+">......
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
while True:
try:
q=self.html.index('<'+tag_name,p)
#print("p_get_between_with_cond p="+str(p...
if p==q:
return pw
q=q+len('<'+tag_name)
p=q
eqs={}
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+s...
if p!=q:
if cond[0] in eqs:
#print("cond[0]="+cond[0]+" eqs[c...
if eqs[cond[0]]!=cond[1]:
continue
else:
break
except:
#print("fail to find <"+tag_name)
return pw
p=self.p_b(p)
#print("p_get_between q="+str(q)+' '+self.html[q:...
#print("pw="+str(pw))
q=self.p_key('>',p)
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if pw!=q:
start_p=q
else:
return p
try:
q=self.html.index('</'+tag_name,start_p)
except:
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("pw="+str(pw))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p]
#print("rx="+rx)
return q
def p_get_between(self,p,tag_name,eqs,rtn):
#print("p_get_between <"+tag_name+">...</"+tag_na...
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
try:
q=self.html.index('<'+tag_name,p)
except:
return pw
#print("p_get_between 2 q="+str(q)+' '+self.html[...
q=q+len('<'+tag_name)
#print("p_get_between 3 q="+str(q)+' '+self.html[...
if p==q:
return pw
p=q
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+self.html...
p=q
p=self.p_b(p)
#print("p="+str(p))
q=self.p_key('>',p)
#print("after > q="+str(q)+' p='+str(p)+' '+self....
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if p!=q:
end_p=q
rtn[0]=""
return q
else:
#print("get between "+tag_name+",fail to ...
return pw
try:
q=self.html.index('</'+tag_name,start_p)
except:
#print("get between "+tag_name+",fail to find...
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("p="+str(p))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p+1]
#print("get between "+tag_name+" rx="+rx)
return q
#def text_to_post_form(self,text):
# #print("html_parser.text_to_post_form")
# #print(text)
# post_form=""
# for i in range(len(text)):
# post_form=post_form+self.code_convert(text[i])
# #print("html_parser.text_to_post_form return")
# return post_form
def code_convert(self,code):
if code in self.a2c:
return self.a2c[code]
else:
return code
def text_to_post_form(self,text):
#print("html_parser.text_to_post_form")
#print(text)
rtn=""
for i in range(len(text)):
rtn+=self.code_convert(text[i])
return rtn
def special_char_to_text(self,text):
#print("special_char_to_text")
#print(text)
text=text.replace("<","<")
text=text.replace(">",">")
text=text.replace("&","&")
text=text.replace(""",'"')
text=text.replace("'","'")
return text
def post(self,url,body):
#payload is a dictionary
headers={'Content-Type': 'application/x-www-form-...
#def http_post(url, value):
# https://docs.openmv.io/library/urequests.html
#body = "value=%s" % (value)
#headers = {'Content-Type': 'application/x-www-fo...
try:
response = requests.post(url, data=body.encod...
r_text=response.text
response.close()
return r_text
except Exception as e:
print(e)
return ""
class pico_wiki_driver:
url=""
def __init__(self):
print('pico_wiki_driver')
#self.url=url
#self.page=page
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
def set_url(self,url):
self.url=url
def set_page(self,page):
self.page=page
def get_url(self):
return self.url
def get_html(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
#print("get_html..."+uri)
#print(all_page)
return all_page
def get_wiki_page(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
if p!=q:
#print("get_wiki_page id=body..."+all_page[rt...
body_x=all_page[rtn[0][0]:rtn[0][1]]
body=parser.special_char_to_text(body_x)
return body
def replace_wiki_page(self,new_body):
print("pico_wiki_driver.replace_wiki_page...")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['write']=array_of_equations[i...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=r[0:start_p]+new_body+r[end_p+1:]
#print("body=......")
#print(new_body)
payload['msg']=edit_html.text_to_post_form(ne...
#print('msg=......')
#print(payload['msg'])
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
body=""
body=body+'encode_hint='+(payload['encode_hint'])...
body=body+'template_page=&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'digest='+payload['digest']+'&'
body=body+'msg='+payload['msg']+'&'
body=body+'write='+payload['write']+'&'
body=body+'original='+payload['original']
#print('body='+body)
edit_html.post(self.url,body)
#print(r)
def upload_file(self,file_dir,file_name,pass_word):
# MIMEタイプの定義
#XLSX_MIMETYPE = 'application/vnd.openxmlformats-...
#file_Name = '2024-12-20-222611.jpg'
#file_dir='C:/vscode/python/bot_computing/'
file_path=file_dir+file_name
MIMETYPE = mimetypes.guess_type(file_path)[0]
print('upload file '+file_path)
# ファイル名とデータの取得
file_Data_Binary = open(file_path, 'rb').read()
# アップロード先のURL
#url = 'http://www.yama-lab.org/fwb4pi/index.php'
#page = 'test02'
url=self.url
page=self.page
# アップロードするファイルの情報を辞書に格納
files = {'attach_file': (file_name, file_Data_Bin...
'encode_hint': 'ぷ',
'pass': pass_word,
'refer': page,
'pcmd': 'post',
'plugin': 'attach'}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': url+'?'+'plugin=attach&pcmd=upload...
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(url, data=m, headers=hx)
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path=file_dir+'return.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_attachment_list(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
attach_end=q
attach_list=[]
if p==q:
return attach_list
#print("get_wiki_page id=body..."+all_page[rtn[0]...
p=rtn[0][0]
eqs={}
while True:
q=parser.p_get_between(p,'a',eqs,rtn)
if p==q:
break
if q>attach_end:
break
print(parser.html[rtn[0][0]:rtn[0][1]])
eqs_img={}
q2=parser.p_get_equations_of_the_tag(rtn[0][0...
fname=parser.html[q2:(rtn[0][1]+1)]
print(fname)
attach_list.append(fname)
q=parser.p_get_between(q2,'a',eqs,rtn)
p=q
print(attach_list)
return attach_list
def delete_attachment(self,attach_name,pass_word):
# アップロードするファイルの情報を辞書に格納
files = {'encode_hint': 'ぷ',
'plugin': 'attach',
'refer': self.page,
'file': attach_name,
'age': '0',
'pcmd': 'delete',
'newname': attach_name,
'pass': pass_word}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': self.url+'?'+'plugin=attach&pcmd=i...
+'file='+attach_name+'&refer='+self.page
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(self.url, data=m, header...
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path='return2.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_wiki_source_of_the_uri(uri):
url_page=uri.split('?')
self.set_url(url_page[0])
self.set_page(url_page[1])
self.get_wiki_source()
def get_wiki_source(self):
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['cancel']=array_of_equations[...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
wiki_source=""
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
wiki_source=r[start_p:end_p]
#body=""
#body=body+'encode_hint='+payload['encode_hint']+...
#body=body+'cmd='+payload['cmd']+'&'
#body=body+'page='+payload['page']+'&'
#body=body+'cancel='+payload['cancel']
##print('body='+body)
#r=edit_html.post(self.url,body)
wiki_source=edit_html.special_char_to_text(wiki_s...
return wiki_source
def get_result(self):
#print("pico_wiki_bot.get_result")
#r=self.get_script_and_result()
all_text=self.get_wiki_source()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
#print('pico_wiki_bot.get_result return')
#print(all_text[p+1:q])
#print('all_text size='+str(len(all_text)))
#print('p='+str(p))
#print('q='+str(q))
return all_text[p+1:q]
def get_result_of_the_uri(self,uri):
try:
url_page=uri.split('?')
self.set_url(url_page[0])
self.set_page(url_page[1])
return self.get_result()
except Exception as e:
print('error in get_result_of_the_uri:'+uri)
print(e)
class USB_Serial:
PORT0 = '/dev/ttyACM0' # Windows �̏ꍇCOM*
PORT1 = '/dev/ttyACM1'
def __init__(self):
try:
self.ser = serial.Serial(port=self.PORT0,baud...
except Exception as e:
try:
self.ser = serial.Serial(port=self.PORT1,...
except Exception as e:
print(e)
#self.GUI.write_message(str(e))
self.ser=None
self.usb_serial_input_thread=threading.Thread(tar...
self.usb_serial_input_thread.start()
def set_gui(self,gui):
self.GUI=gui;
def send_command(self,command):
self.rtn=None
print('writing '+command)
send_bytes = bytes(command+'\r\n','utf-8')
try:
if self.ser!=None:
self.ser.write(send_bytes)
self.ser.flush()
except Exception as e:
print("serial write error")
self.GUI.write_message("serial write error.")
print(e)
self.GUI.write_message(str(e))
self.rtn="0"
count=0
while True:
if self.rtn!=None:
rtnx=self.rtn
self.rtn=None
return rtnx
if count>1000:
self.rtn=None
return "0"
time.sleep(0.01)
count=count+1
def receive_loop(self):
while True:
try:
if self.ser==None:
continue
buf = self.ser.readlines()
buf = [i.decode().strip() for i in buf]
if len(buf)>0:
print(buf)
for l in buf:
if '>>>' in l:
print('please reset pico.')
continue
#print("buf[i]="+buf)
self.rtn= buf[1]
except Exception as e:
print("error receiving")
#self.GUI.write_message("error receiving")
print(e)
#self.GUI.write_message(str(e))
try:
self.ser = serial.Serial(port=self.PO...
except Exception as e:
print(e)
try:
self.ser = serial.Serial(port=sel...
except Exception as e:
print(e)
#self.GUI.write_message('error '+...
self.ser=None
self.rtn="0"
time.sleep(0.01)
#HOST="localhost"
#PORT=9999
class Python_Connector:
def __init__(self,host,port,mw):
self.host=host
self.port=port
self.connected=False
self.soc=None
self.handle_thread=None
self.message_window=mw
def set_message_window(self,window):
self.message_window=window
def set_bot(self,bot):
self.bot=bot
def is_connected(sekf):
return self.connected
def connect_address(self,address):
self.host=address
self.connect()
def connect(self):
print("re connect to host "+self.host)
if self.soc !=None:
print("close the previous host, "+self.soc.ge...
try:
self.soc.close()
self.soc=None
time.sleep(2.0)
except:
print("close failed")
try:
#socket.socket ... socket を作成し、socに代入
self.soc = socket.socket(socket.AF_INET, sock...
#self.soc.setsockopt(socket.SOL_SOCKET, socke...
#IPアドレスがhostのserver socketにsocを接続
self.soc.connect((self.host,int(self.port)))
except Exception as e:
print("connect error.")
if self.message_window:
self.message_window.write_message("error ...
self.connected=False
#受信担当の関数handlerを物threadでうごかすhandle_...
self.handle_thread=threading.Thread(target=self.h...
#handle_threadをstart
self.handle_thread.start()
self.connected=True
if self.message_window:
self.message_window.write_external("connected...
def send_command(self,command):
if self.soc!=None:
try:
self.soc.send(bytes(command+'\n',"utf-8"))
except KeyboardInterrupt:
if self.message_window:
self.message_window.write_external("e...
self.soc.close()
exit()
else:
if self.host is not None:
self.message_window.write_external("no so...
else:
self.message_window.write_external("no ho...
#受信の処理。送信threadとは別に、並行して処理を行う。
def handler(self):
print("at client, connected, start handler")
fx=self.soc.makefile('r')
try:
while True:
l=fx.readline()
line="[receive]- {}".format(l)
print(line)
if self.message_window:
self.message_window.write_external("f...
x=self.bot.handler(l)
self.bot.write_line(x)
except socket.error:
print("socket error. exit")
self.soc.close()
except Exception as e:
import traceback
traceback.print_exc()
if self.message_window:
self.message_window.write_message(sel...
if self.soc is not None:
self.soc.close()
self.soc=None
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceba...
def close(self):
self.soc.close()
self.soc=None
class pico_wiki_bot:
result_lines=[]
script_lines=[]
urls=[]
programs={}
environments={}
time_millis=0
timer=None
current_program_name=""
current_program=""
report=[]
#reportLength=120
current_url=""
current_page=""
current_text=""
next_url_index=""
def __init__(self):
print('wiki_bot')
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
self.bot=None
print('time now='+t)
self.read_initials()
#self.current_url=url
#self.current_page=page
self.next_url_index=0
self.Pico_Wiki_Driver=pico_wiki_driver()
self.all_text=""
self.u_serial=USB_Serial()
if self.Pico_Wiki_Driver.get_url()=="":
return
self.timer=Timer()
#all_text=self.Pico_Wiki_Driver.get_wiki_page()
#if all_text==None:
# return None
#self.timer.init(mode=Timer.PERIODIC, freq=1, cal...
def read_initials(self):
print("read_initials")
if self.bot==None:
return
if self.bot.prop==None:
return
self.environments['waiting_term']=self.bot.prop.g...
self.environments['read_interval']=self.bot.prop....
self.environments['exec_interval']=self.bot.prop....
self.environments['write_interval']=self.bot.prop...
self.environments['report_length']=self.bot.prop....
uri=self.bot.prop.get('object_page_1')
url=uri.split('?')[0]
self.page=uri.split('?')[1]
def set_current_page(self,page):
self.current_page=page
def set_current_url(self,url):
self.current_url=url
def set_current_pass(self,pass_w):
self.pass_word=pass_w
def clear_report(self):
self.report.clear()
def add_report_line(self,send):
#print("add_report_line, report_length="+str(self...
#print(" current report_length="+str(len(self.rep...
#print(" adding..."+send)
#while len(self.report)>(int(self.environments['r...
while len(self.report)>(int(self.GUI.lookup_prop(...
for i in range(len(self.report)-1):
self.report[i]=self.report[i+1]
self.report.pop()
self.report.append(send)
def set_GUI(self,gui):
self.GUI=gui
self.u_serial.set_gui(gui)
def init_params(self):
if self.GUI==None:
return
self.GUI.init_params()
def add_one_second_to_time_millis(self,timer):
#print("pico_wiki_bot.add_one_second_to_time_mill...
self.time_millis=self.time_millis+1000
#print("pico_wiki_bot.add_one_second_to_time_mill...
def stop_timer(self):
self.timer=None
def get_the_object_page(self,url):
#print("pico_wiki_bot.get_the_object_page")
#print("uri="+uri)
#if self.GUI!=None:
# try:
# object_page=self.GUI.lookup_prop('object...
# pass_word=self.GUI.lookup_prop('object_p...
# except:
# object_page=initials['object_page_1']
# pass_word=initials['object_page_pass_1']...
url_page=url.split('?')
#self.url=url_page[0]
#self.page=url_page[1]
#self.pass_word=pass_word
driver=pico_wiki_driver()
driver.set_url(url_page[0])
driver.set_page(url_page[1])
html=driver.get_html()
return html
def get_the_page(self,uri):
#print("pico_wiki_bot.get_the_object_page")
#print("uri="+uri)
url_page=uri.split('?')
url=url_page[0]
page=url_page[1]
driver=pico_wiki_driver()
driver.set_url(url)
driver.set_page(page)
html=driver.get_html()
if html!="":
return html
try:
object_page=self.GUI.lookup_prop('object_page...
pass_word=self.GUI.lookup_prop('object_page_p...
except:
object_page=initials['object_page_2']
pass_word=initials['object_page_pass_1']
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
if html!="":
return html
def get_script_and_result(self,url):
#print("pico_wiki_bot.get_script_and_result")
html=self.get_the_object_page(url)
if html=="":
return ""
parser=html_parser(html)
p=0
eqs={}
rtn=[(0,0)]
q=parser.p_get_between(p,'pre',eqs,rtn)
be=rtn[0]
command_and_result=parser.html[be[0]:(be[1]+1)]
self.current_text=parser.special_char_to_text(com...
return self.current_text
def get_result(self,url):
#print("pico_wiki_bot.get_result")
r=self.get_script_and_result(url)
all_text=self.current_text
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
#print('pico_wiki_bot.get_result return')
return all_text[p+1:q]
def get_script(self,url):
#print("pico_wiki_bot.get_script")
all_text=self.get_script_and_result(url)
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
x_text=all_text[p:q]
text_lines=x_text.split('\n')
include_text=[[]]
head=[]
for i in range(len(text_lines)):
line=text_lines[i]
parser=html_parser(line)
if self.is_include(parser,include_text):
inc=include_text[0]
head=text_lines[:i-1]
tail=text_lines[i+1:]
head.extend(inc)
head.extend(tail)
break
if head!=[]:
text_lines=head
return text_lines
def get_current_device_line(self):
#command_and_result=self.get_script_and_result()
command_and_result=self.current_text
parser=html_parser(command_and_result)
if command_and_result==None:
return None
p=command_and_result.index("current_device=")
p=p+len("current_device=")
try:
q=command_and_result.index("\n",p)
except:
q=len(command_and_result)
return parser.special_char_to_text(command_and_re...
def replace_wiki(self,new_wiki):
self.Pico_Wiki_Driver.replace_wiki_page(new_wiki)
#print(r)
def replace_result(self):
#print("pico_wiki_bot.replace_result")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
eqs_msg={}
rtn=[(0,0)]
body=""
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=edit_html.special_char_to_text(r[start_p...
#print("body=......")
#print(body)
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
#print("body="+body)
px=body.index("result:")+len("result:")
q=body.index("current_device=",px)
if q<0:
q=len(body)
head_area=body[0:px]
#tail_area=msg[q:]
#result_area=msg[px:q]
new_area=head_area+'\n'
for i in range(len(self.report)):
line=self.report[i]
if line.endswith('\n'):
new_area=new_area+' '+line
else:
new_area=new_area+' '+line+'\n'
#new_area=new_area+' '+self.report[i]+'\n'
bot_id=self.GUI.lookup_prop('bot_id')
new_area=new_area+" current_device=\""+bot_id+"\"...
gc.collect()
self.Pico_Wiki_Driver.replace_wiki_page(new_area)
def get_attachment_list(self):
return self.Pico_Wiki_Driver.get_attachment_list()
def upload_file(self,file_path,file_name):
self.Pico_Wiki_Driver.upload_file(file_path,file_...
def delete_attachment(self,attach_name):
self.Pico_Wiki_Driver.delete_attachment(attach_na...
def read_script(self,script_lines):
#print("read_script")
if self.GUI!=None:
try:
object_page=self.GUI.lookup_prop('object_...
pass_word=self.GUI.lookup_prop('object_pa...
except:
object_page=initials['object_page_1']
pass_word=initials['object_page_pass_1'] ...
else:
print("failed to read")
return False
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
all_text=self.get_script(object_page)
if all_text==None or all_text==[]:
script_lines[0]=None
return False
script_lines[0]=all_text
#print("-----script_lines-----")
#for i in range(len(self.script_lines)):
# print(str(i)+' - '+self.script_lines[i])
#if self.GUI != None:
# #self.GUI.write_script_start(all_text)
# self.GUI.write_script_lines(all_text)
return True
def eval_script(self):
print("eval_script")
self.interpret_lines()
#def print_result(self):
# print("print_result")
def is_object_page(self,parser):
#print("is_object_page, line="+line[:10]+"...")
p=0
p=parser.p_b(p)
q=parser.p_key("object_page ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
self.urls=[]
xurl=[""]
i=0
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
while p!=q:
i=i+1
self.urls.append(xurl[0])
p=q
p=parser.p_b(p)
q=parser.p_key("or ",p)
#print("after key or p="+str(p)+" q="+str(q))
if p==q:
break
p=q
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
#print("p="+str(p)+" q="+str(q))
#for i in range(len(self.urls)):
# print("url["+str(i)+"]="+self.urls[i])
return True
def is_device(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("current_device=",p)
if p==q:
return False
p=q
p=parser.p_b(p)
xname=[""]
q=parser.p_String_Constant(p,xname)
if p==q:
return False
p=parser.p_b(q)
self.current_device=xname[0]
q=parser.p_key(",",p)
if p==q:
return False
p=parser.p_b(q)
q=parser.p_key("Date=",p)
xdate=[]
q=parser.p_Date(p,xdate)
def is_include(self,parser,script_lines):
#print("is_object_page, line="+line[:10]+"...")
p=0
p=parser.p_b(p)
q=parser.p_key("include ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
include_pages=[]
xurl=[""]
i=0
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
while p!=q:
i=i+1
include_pages.append(xurl[0])
p=q
p=parser.p_b(p)
q=parser.p_key("or ",p)
#print("after key or p="+str(p)+" q="+str(q))
if p==q:
break
p=q
try:
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
#print("p="+str(p)+" q="+str(q))
except Exception as e:
print('error in include '+e.toString())
return False
for i in range(len(include_pages)):
print("include_pages["+str(i)+"]="+include_pa...
x_lines=self.get_script(include_pages[i])
if x_lines==None or x_lines=="":
continue
else:
script_lines[0]=x_lines
break
return True
def set_page_name(self,eq):
rv=eq['pageName']
print("set pageName="+rv)
date=self.Pico_Time.get_now()
print('ok?')
#date=2024-12-31 14:25:16.61944
time_x=(date.split())[1]
print('time_x='+time_x)
if "<hour>" in rv: #need to correct, to repl...
time_xx=time_x.split(':')
#hm=time_xx[0]+'_'+time_xx[1]
xrv=rv.replace("<hour>",time_xx[0])
page_1=self.GUI.lookup_prop('object_page_1')
url_and_page=page_1.split('?')
url_and_page[1]=xrv
new_page_1=url_and_page[0]+'?'+url_and_page[1]
print(new_page_1)
self.GUI.map_to_gui('object_page_1',new_page_1)
page_2=self.GUI.lookup_prop('object_page_2')
url_and_page=page_2.split('?')
url_and_page[1]=xrv
new_page_2=url_and_page[0]+'?'+url_and_page[1]
print(new_page_2)
self.GUI.map_to_gui('object_page_2',new_page_2)
if "<day>" in rv: #need to correct
date_x=(date.split())[0]
print('ok?')
print('date_x='+date_x)
date_xx=date_x.split('-')
xrv=rv.replace("<day>",date_xx[2])
page_1=self.GUI.lookup_prop('object_page_1')
url_and_page=page_1.split('?')
url_and_page[1]=xrv
new_page_1=url_and_page[0]+'?'+url_and_page[1]
print(new_page_1)
self.GUI.add_uri(new_page_1,0)
page_2=self.GUI.lookup_prop('object_page_2')
url_and_page=page_2.split('?')
url_and_page[1]=xrv
new_page_2=url_and_page[0]+'?'+url_and_page[1]
print(new_page_2)
self.GUI.add_uri(new_page_2,1)
def is_set_command(self,parser,p):
#print("is_set_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("set ",p)
if p==q:
#print("is_set_command return False")
return False
p=parser.p_b(q)
eq={}
q=parser.p_an_equation(p,eq)
if p==q:
return False
if "pageName" in eq:
print(eq)
self.set_page_name(eq)
return True
for key in eq:
self.environments[key]=eq[key]
self.GUI.write_message("set "+str(key)+"="+st...
self.GUI.map_to_gui(key,self.environments[key])
return True
def is_py_command(self,parser,p):
#print("is_py_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("py ",p)
if p==q:
return False
p=parser.p_b(q)
#print("p="+str(p)+",line[p:10]="+parser.html[p:p...
xname=[""]
q=parser.p_name(p,xname)
self.current_program_name=xname[0]
self.current_program=""
def is_end_command(self,parser,p):
#print("is_end_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("end ",p)
if p==q:
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
self.programs[self.current_program_name]=self.cur...
def ex_pico(self,command):
return(self.u_serial.send_command(command))
def handler(self,x):
return x
def external(self,x):
self.GUI.ex_external(x)
def send_external(self,x):
self.ext_soc.send_command(x)
def is_run_command(self,parser,p):
# reference : https://qiita.com/kyoshidajp/items/...
# reference : https://qiita.com/kammultica/items/...
print("is_run_command.."+parser.html[p:p+10])
q=parser.p_key("run ",p)
if p==q:
#print("is_run_command return False")
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
if p==q:
#print("is_run_command return False")
return False
try:
#r=self.get_result(self.current_url+'?'+self....
r=self.get_result(self.url+'?'+self.page)
except Exception as e:
print("error in run command, get_result..:"+s...
if r!=None and r!="":
self.clear_report()
lines=r.split('\n')
for i in range(len(lines)):
self.add_report_line(lines[i])
exec_locals={'self':self}
#print("line="+parser.html)
#print("xname[0]="+xname[0])
program=self.programs[xname[0]]
print("program="+program)
try:
exec(program,globals(),exec_locals)
except Exception as e:
print("fail to exec "+program)
self.add_report_line("fail to exec:"+xname[0])
self.GUI.write_message("fail to exec "+program)
tb = sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
#print(str(e))
self.GUI.write_message(str(e))
traceback.print_stack()
traceback.format_exc()
#self.add_report_line(str(e))
#sys.print_exception()
#try:
# exec(program)
#except:
# print("fail to exec "+program)
# self.add_report_line("fail to exec:"+xname[0])
# sys.print_exception()
def is_command(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("command:",p)
#print("line="+parser.html)
#print("command="+parser.html[q:])
if p==q:
return False
q=parser.p_b(q)
#lx=line[q:]
if self.is_set_command(parser,q):
return True
if self.is_py_command(parser,q):
return True
if self.is_end_command(parser,q):
return True
if self.is_run_command(parser,q):
return True
return False
def is_py(self,parser):
#print("is_py line="+line[:10])
p=0
p=parser.p_b(p)
q=parser.p_key("py: ",p)
if p==q:
return False
lx=parser.html[q:]
self.current_program=self.current_program+lx+'\n'
def interpret_a_line(self,line):
#print("interpret_a_line:"+line)
parser=html_parser(line)
if self.is_object_page(parser):
for i in range(len(self.urls)):
uri=self.urls[i]
self.GUI.add_uri(uri,i)
return
if self.is_device(parser):
return
#if self.is_include(parser):
# return
if self.is_command(parser):
return
if self.is_py(parser):
return
def interpret_lines(self):
#print("interpret_lines")
for i in range(len(self.script_lines)):
try:
self.interpret_a_line(self.script_lines[i])
except Exception as e:
line="error in '"+self.script_lines[i]+"'"
print(line)
self.write_line(line)
line="...:{e}"
print(line)
self.write_line(line)
self.send_result()
def next_url(self):
print("next_url")
if len(self.urls)==0:
return
if int(self.next_url_index)>=len(self.urls):
self.next_url_index=0
urlx=self.urls[int(self.next_url_index)]
p=urlx.index("?")
self.current_page=urlx[p+1:]
#self.current_url=urlx[0:p-1]
self.current_url=urlx[0:p]
print("current_url="+self.current_url)
print("current_page="+self.current_page)
self.next_url_index=int(self.next_url_index)+1
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
def send_result(self):
print("send_result")
self.GUI.write_message("send_result")
try:
self.replace_result()
except Exception as e:
print('error while send_result')
print(e)
def write_line(self,line):
#print("write_line")
try:
self.GUI.write_message("add_report "+line)
self.add_report_line(line)
except Exception as e:
print('error while write_line('+line+')')
print(e)
def read_eval_print_loop(self):
print("read_eval_print_loop")
self.continue_read_eval_print_loop=True
last_read_time=int(time.time() * 1000)
last_eval_time=last_read_time
last_write_time=last_read_time
while self.continue_read_eval_print_loop:
gc.collect()
current_time=int(time.time() * 1000)
if current_time<last_read_time:
last_read_time=current_time
if current_time-last_read_time>self.GUI.looku...
last_read_time=current_time
new_lines=[None]
if self.read_script(new_lines):
if self.GUI != None:
#self.GUI.write_script_start(all_...
self.GUI.write_script_list(new_li...
self.script_lines=new_lines[0]
if self.GUI.lookup_prop('exec_interva...
self.eval_script()
self.GUI.update()
#else:
# self.next_url()
# continue
if self.GUI.lookup_prop('exec_interval')>0:
exec_interval_time=self.GUI.lookup_prop('...
if current_time-last_eval_time>exec_inter...
last_eval_time=current_time
self.eval_script()
if self.GUI.lookup_prop('write_interval')>0:
if current_time-last_write_time>self.GUI....
last_write_time=current_time
self.send_result()
#time.sleep_ms(100)
time.sleep(0.1)
def stop_read_eval_print_loop(self):
self.continue_read_eval_print_loop=False
def init_params(self):
uri=self.GUI.lookup_prop('object_page_1')
pass_word=self.GUI.lookup_prop('object_page_pass_...
page=uri.split('?')[1]
url=uri.split('?')[0]
self.set_current_url(url)
self.set_current_page(page)
self.set_current_pass(pass_word)
self.next_url_index=0
def start_external(self):
dir=self.GUI.lookup_prop('external_program_path')
file=self.GUI.lookup_prop('external_program_name')
path=dir+file
try:
subprocess.Popen('python3 '+path,shell=True) ...
except Exception as e:
import traceback
traceback.print_exec()
print(e)
print('error')
return
self.GUI.write_external('start '+file)
def connect_external(self):
time.sleep(5.0)
try:
address=self.GUI.lookup_prop('external_progra...
port=self.GUI.lookup_prop('external_program_p...
self.ext_soc=Python_Connector(address,port,se...
self.ext_soc.set_bot(self)
self.ext_soc.connect()
except Exception as e:
import traceback
traceback.print_exc()
print(e)
print('error')
return
def get_external_connection(self):
return self.ext_soc
def start_thread(gui):
gui.click_start_button()
def main() -> None:
args = sys.argv
if len(args)>=2:
fst_opt=args[1]
else:
fst_opt=None
print('fst_opt='+str(fst_opt))
root=Tk()
bot=pico_wiki_bot()
gui=GUI(root,bot)
bot.set_GUI(gui)
gui.set_Pico_Time(bot.Pico_Time)
gui.init_params()
bot.init_params()
#if gui.is_autorun_selected.get():
# bot.start_external()
#if gui.is_autoconnect_selected.get():
# bot.connect_external()
if fst_opt=='-s':
if gui.is_autorun_selected.get():
bot.start_external()
if gui.is_autoconnect_selected.get():
bot.connect_external()
start_thread(gui)
root.mainloop()
#print("-----------get_script_and_result-------------...
#print(bot.get_script_and_result())
#print("-----------get_result-----------------")
#print(bot.get_result())
#print("-----------get_script-----------------")
#print(bot.get_script())
#print("-----------get_current_device_line-----------...
#print(bot.get_current_device_line())
#print("-----------replace_result-----------------")
#bot.replace_result("Hello!\nThis is a test2....\nxxx...
#print("-----------stop_timer--------------------")
#bot.stop_timer()
#bot.read_eval_print_loop()
if __name__=='__main__':
main()
}}
ページ名: