wiki_bot_04.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[FrontPage]]
#code(Python){{
# -*- coding: utf-8 -*-
#
# 20250117
#
import time
import requests
from requests_toolbelt import MultipartEncoder
import socket
try:
import ustruct as struct
except:
import struct
from time import sleep
import gc
import copy
import threading
import sys
from tkinter import *
from tkinter import Label
from tkinter import Button
import tkinter as tk
from tkinter import scrolledtext
#from PIL import Image, ImageTk, ImageOps
import tkinter.ttk as ttk
from tkinter import messagebox
from tkinter import filedialog
import time
import json
import pickle
import serial
import mimetypes
initials = {'bot_id':'yama_bot_0000_0000_0000_0001',
'waiting_term':600000,
'object_page_1':'http://192.168.10.10/fwb4pi/...
'object_page_2':'http://192.168.10.11/fwb4pi/...
'object_page_pass_1':'',
'object_page_pass_2':'',
'read_interval': 20000,
'exec_interval': 0,
'write_interval':0,
'report_length':120
}
class Properties:
def __init__(self):
self.properties=initials
self.file_name="bot.properties"
def set_gui(self,gui):
self.gui=gui
def load(self,fn):
try:
with open(fn,mode='rb') as f:
prop_w=pickle.load(f)
for key in prop_w:
self.properties[key]=prop_w[key]
self.map_to_gui()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
def map_to_gui(self):
for key in self.properties:
self.gui.map_to_gui(key,self.properties[key])
def map_from_gui(self):
for key in self.properties:
v=self.gui.lookup_prop(key)
self.properties[key]=v
def get(self,key):
if key in self.properties:
return self.properties[key]
return None
def set(self,key,value):
self.properties[key]=value
def save(self,fn):
self.map_from_gui()
try:
with open(fn,mode='wb') as f:
pickle.dump(self.properties,f)
except Exception as e:
print("file "+fn+" write error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
class GUI:
def __init__(self,root,bot):
self.bot=bot
self.recv_queue=[]
self.root=root
self.root.title("Wiki Bot panel")
self.root.configure(background="white")
self.root.geometry("850x800")
self.notebook =ttk.Notebook(root)
self.main_tab=tk.Frame(self.notebook, bg='white')
self.script_tab=tk.Frame(self.notebook, bg='white')
self.notebook.add(self.main_tab, text="main", und...
self.notebook.add(self.script_tab, text="script")
self.notebook.pack(expand=True, fill='both', padx...
yy=10
# command list
self.start_button=Button(self.main_tab, text="sta...
self.start_button.place(x=10,y=yy)
self.stop_button=Button(self.main_tab, text="stop...
self.stop_button.place(x=80,y=yy)
self.save_prop_button=Button(self.main_tab, text=...
self.save_prop_button.place(x=150,y=yy)
self.clear_message_button=Button(self.main_tab, t...
self.clear_message_button.place(x=280,y=yy)
yy=40
# this bot id, waiting term
self.bot_label=Label(self.main_tab,text="bot", wi...
self.bot_label.place(x=10,y=yy)
self.bot_id_field=Entry(self.main_tab,width=50)
self.bot_id_field.place(x=50,y=yy)
self.waiting_term_label=Label(self.main_tab,text=...
self.waiting_term_label.place(x=380,y=yy)
self.waiting_term_field=Entry(self.main_tab,width...
self.waiting_term_field.place(x=450,y=yy)
yy=70
self.page_frame = ttk.Frame(self.main_tab)
self.page_frame.grid(row=0, column=0)
# 項目をつくる
items = ['URI', 'name', 'pass']
for i in range(0, len(items)):
label_item = ttk.Label(self.page_frame,text=i...
label_item.grid(row=0, column=i)
n = 2 # 行数
# 項目1
self.object_page_table_uri = [0] * n
for i in range(0, n):
self.object_page_table_uri[i] = ttk.Entry(sel...
self.object_page_table_uri[i].grid(row=i+1, c...
# 項目2
self.object_page_table_name = [0] * n
for i in range(0, n):
self.object_page_table_name[i] = ttk.Entry(se...
self.object_page_table_name[i].grid(row=i+1, ...
# 項目3
self.object_page_table_pass = [0] * n
for i in range(0, n):
self.object_page_table_pass[i] = ttk.Entry(se...
self.object_page_table_pass[i].grid(row=i+1, ...
self.page_frame.place(x=10,y=yy)
self.param_frame = ttk.Frame(self.main_tab)
self.param_frame.grid(row=0, column=0)
# 項目をつくる
param_items = ['property', 'value']
for i in range(0, len(param_items)):
label_param_item = ttk.Label(self.param_frame...
label_param_item.grid(row=0, column=i)
n = 4 # 行数
# 項目1
self.param_table_property = [0] * n
for i in range(0, n):
self.param_table_property[i] = ttk.Entry(self...
self.param_table_property[i].grid(row=i+1, co...
# 項目2
self.param_table_value = [0] * n
for i in range(0, n):
self.param_table_value[i] = ttk.Entry(self.pa...
self.param_table_value[i].grid(row=i+1, colum...
self.param_frame.place(x=550,y=yy)
self.param_table_property[0].insert(END,'read_int...
self.param_table_property[1].insert(END,'exec_int...
self.param_table_property[2].insert(END,'write_in...
self.param_table_property[3].insert(END,'report_l...
yy=200
self.command_field_label=Label(self.main_tab,text...
self.command_field_label.place(x=10,y=yy)
self.command_field=Entry(self.main_tab,width=50)
self.command_field.place(x=150,y=yy)
self.command_enter_button=Button(self.main_tab,te...
self.command_enter_button.place(x=500,y=yy)
self.clear_command_button=Button(self.main_tab,te...
self.clear_command_button.place(x=580,y=yy)
yy=230
# script area
self.script_frame=tk.Frame(self.main_tab,width=50...
self.script_frame.place(x=10,y=yy)
self.script_area=scrolledtext.ScrolledText(self.s...
self.script_area.pack()
yy=600
# output area
self.message_frame=tk.Frame(self.main_tab,width=5...
self.message_frame.place(x=10,y=yy)
self.message_area=scrolledtext.ScrolledText(self....
self.message_area.pack()
#script tab
yy=10
self.dir_name_label=Label(self.script_tab,text="d...
self.dir_name_label.place(x=30,y=yy)
self.dir_name_field=Entry(self.script_tab,width=50)
self.dir_name_field.place(x=100,y=yy)
#self.read_button=Button(self.script_tab, text="r...
#self.read_button.place(x=500,y=yy)
self.dir_button=Button(self.script_tab, text="dir...
self.dir_button.place(x=570,y=yy)
self.run_script_button2=Button(self.script_tab,te...
self.run_script_button2.place(x=680,y=yy)
yy=50
self.file_name_label=Label(self.script_tab,text="...
self.file_name_label.place(x=30,y=yy)
self.file_name_field=Entry(self.script_tab,width=...
self.file_name_field.place(x=100,y=yy)
self.read_button=Button(self.script_tab, text="re...
self.read_button.place(x=500,y=yy)
self.dir_button=Button(self.script_tab, text="sel...
self.dir_button.place(x=570,y=yy)
yy=100
self.script_frame2=tk.Frame(self.script_tab,width...
self.script_frame2.place(x=50,y=yy)
self.script_area2=scrolledtext.ScrolledText(self....
self.script_area2.pack()
#self.run_script()
#self.set_script()
#self.display_help()
self.prop=Properties()
self.prop.set_gui(self)
def set_Pico_Time(self,time):
self.Pico_Time=time
def init_params(self):
self.prop.load(self.prop.file_name)
self.prop.map_to_gui()
def click_read_button(self):
fn=self.file_name_field.get()
print("fn="+fn)
f=None
try:
f=open(fn,"r")
except Exception as e:
print("file open error.")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
return
try:
self.script_area.delete(1.0,END)
line=f.readline()
while line:
print(line)
self.script_area.insert(END,line)
line=f.readline()
f.close()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
def click_dir_button(self):
iDir = os.path.abspath(os.path.dirname(__file__))
folder_name=tk.filedialog.askdirectory(initialdir...
if len(folder_name)==0:
print("cancel selcting folder")
else:
self.dir_name_field.insert(END,folder_name)
def click_select_file_button(self):
fType = [("python", "*.py")]
iDir=""
xdir=self.dir_name_field.get()
if xdir==None or xdir=="":
iDir=os.path.abspath(os.path.dirname(__file__))
else:
iDir=xdir
iFilePath=filedialog.askopenfilename(filetypes=fT...
if iFilePath==None or iFilePath=="":
print("cancel")
else:
self.file_name_field.delete(0,END)
self.file_name_field.insert(END,iFilePath)
def enter_command(self):
print("enter_command")
command=self.command_field.get()
self.ex(command)
def click_start_button(self):
print('click_start_button')
#exec(self.script)
self.script_thread=threading.Thread(target=self.s...
print('new thread, start thread')
#self.start_script()
self.script_thread.start()
print('script_started.')
def click_stop_button(self):
print('click_stop_button')
#exec(self.script)
if self.bot==None:
return
self.bot.stop_read_eval_print_loop()
def start_script(self):
print('start_script')
#self.script=self.script_area.get(0.,END)
#print("run_script:"+self.script)
#exec(self.script)
if self.bot==None:
return
self.bot.read_eval_print_loop()
def run_script(self):
print('run_script')
def click_save_prop_button(self):
self.prop.save(self.prop.file_name)
def add_url(self,url,i):
self.object_page_table_uri[i].delete(0,END)
self.object_page_table_uri[i].insert(END,url)
def map_to_gui(self,key,value):
if key=='bot_id':
self.bot_id_field.delete(0,END)
self.bot_id_field.insert(END,value)
elif key=='waiting_term':
self.waiting_term_field.delete(0,END)
self.waiting_term_field.insert(END,str(value))
elif key=='object_page_1':
self.object_page_table_uri[0].delete(0,END)
self.object_page_table_uri[0].insert(END,value)
elif key=='object_page_2':
self.object_page_table_uri[1].delete(0,END)
self.object_page_table_uri[1].insert(END,value)
elif key=='object_page_pass_1':
self.object_page_table_pass[0].delete(0,END)
self.object_page_table_pass[0].insert(END,val...
elif key=='object_page_pass_2':
self.object_page_table_pass[1].delete(0,END)
self.object_page_table_pass[1].insert(END,val...
elif key=='read_interval':
self.param_table_value[0].delete(0,END)
self.param_table_value[0].insert(END,str(valu...
elif key=='exec_interval':
self.param_table_value[1].delete(0,END)
self.param_table_value[1].insert(END,str(valu...
elif key=='write_interval':
self.param_table_value[2].delete(0,END)
self.param_table_value[2].insert(END,str(valu...
elif key=='report_length':
self.param_table_value[3].delete(0,END)
self.param_table_value[3].insert(END,str(valu...
def lookup_prop(self,key):
if key=='bot_id':
return self.bot_id_field.get()
elif key=='waiting_term':
return int(self.waiting_term_field.get())
elif key=='object_page_1':
return self.object_page_table_uri[0].get()
elif key=='object_page_2':
return self.object_page_table_uri[1].get()
elif key=='object_page_pass_1':
return self.object_page_table_pass[0].get()
elif key=='object_page_pass_2':
return self.object_page_table_pass[1].get() ...
elif key=='read_interval':
return int(self.param_table_value[0].get())
elif key=='exec_interval':
return int(self.param_table_value[1].get())
elif key=='write_interval':
return int(self.param_table_value[2].get())
elif key=='report_length':
return int(self.param_table_value[3].get())
return None
def update(self):
self.prop.map_from_gui()
self.prop.save(self.prop.file_name)
def ini_recv_queue(self):
self.recv_queue=[]
def is_not_empty_recv_queue(self):
if len(self.recv_queue)>0 :
print('recv_queue is empty')
return True
print('recv_queue is not empty')
return False
def are_in_the_recv_queue(self,messages):
print('start are_in_the_recv_queue')
for x in messages:
print('is '+x+' in the recv_queue?')
flag=False
for m in self.recv_queue:
print(m+' in the received queue')
if x in m:
print(x + ' is in the '+m)
flag=True
self.write_message("message "+x+" fou...
break
if flag==False:
return False
return True
def enter_all_command(self):
command=self.command_field_for_all_server.get()
print("send "+command+" to all")
self.write_message("send "+command+" to all")
self.coms.send_command_to_all(command)
self.command_field_for_all_server.delete(0,END)
def ex(self,command):
print("parse "+command)
command=self.r_b(command)
if self.parse_command(command):
self.write_message("OK")
print("OK")
else:
self.write_message("ERROR")
print("ERROR")
def write_script(self, message):
self.clear_script()
self.append_script(message)
def write_script_start(self,message):
self.write_script_thread=threading.Thread(target=...
self.write_script_thread.start()
def write_script_lines(self,lines):
print('start write_script_lines')
self.write_message('start_write_script_lines')
xlines=lines.splitlines()
self.clear_script()
for l in xlines:
self.append_script(l)
self.script_area.update()
print('end write_script_lines')
def append_script(self, message):
self.script_area.insert(tk.END,message)
self.script_area.insert(tk.END,'\n')
#self.script_area.yview_moveto(1)
def clear_script(self):
self.script_area.delete("1.0",END)
self.script_area.update()
def write_message(self, message):
#self.write_message_thread=threading.Thread(targe...
#self.write_message_thread.start()
self.append_message(message)
def clear_and_append_message(self, message):
self.clear_message()
self.append_message(message)
def append_message(self, message):
now = self.Pico_Time.get_now()
addline=now+': '+message
#print('start append_message')
self.message_area.insert(tk.END,addline)
self.message_area.insert(tk.END,'\n')
self.message_area.yview_moveto(1)
tx=self.message_area.get(0.0,tk.END)
count_lines = tx.count('\n') + 1
if count_lines>=500:
self.message_area.delete(0.0,99.0-1)
self.message_area.update()
#print('end append_message')
def clear_message(self):
self.message_area.delete("1.0",END)
self.message_area.update()
class pico_net:
def __init__(self):
print('current ssid= '+ssid)
def set_access_point(self, ssid, password):
self.ssid = ssid
self.password = password
def connect(self):
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
chipid = int.from_bytes(self.wlan.config('mac'), ...
self.wlan.connect(ssid, password)
max_wait = 10
while max_wait > 0:
if self.wlan.status() < 0 or self.wlan.status...
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)
if self.wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('connected')
self.status = self.wlan.ifconfig()
print('ip = ' + self.status[0])
class pico_http:
def __init__(self):
print('pico_http')
def set_url(self, url):
self.url = url
def get(self, url):
#print('pico_http.get(urL='+url+')')
self.url=url
#print('urL='+url)
try:
r = requests.get(url)
#print(r)
r_text=r.text
r.close()
#print('pico_http.get...return')
return r_text
except:
print('pico_http.get...exception')
return ""
def put(self, url, payload):
#print('pico_http.put(urL='+url+'payload='+str(pa...
#payload is a dictionary
uri=url+'?'
for key in payload:
#print(key+'=',end="")
#print(payload[key])
uri=uri+key+'='+payload[key]+'&'
uri=uri[:-1]
#print('uri='+uri)
#r = urequests.post(uri)
#r = urequests.post(url,data=payload)
r = requests.get(uri)
if r=="":
print('pico_http.put...exception')
return ""
#print(r)
r_text=r.text
r.close()
#print(r_json)
return r_text
#post, ref: https://docs.micropython.org/en/latest/li...
def post(self, url, body, headers):
#print('pico_http.post(url='+url+' headers='+str(...
try:
r = requests.post(url,data=body,headers=heade...
#print(r)
r_text=r.text
#print(r_text)
#r_json=r.json()
r.close()
#print(r_json)
print('pico_http.post...return')
return r_text
except:
print('pico_http.post...exception')
return ""
import datetime
class pico_time:
# reference: https://wisteriahill.sakura.ne.jp/CMS/Wo...
# NTPサーバー
#ntp_host = "time-c.nist.gov"
#ntp_host = "time.cloudflare.com"
#ntp_host = "time.google.com"
ntp_host = "ntp.nict.jp"
def __init__(self):
print('pico_time')
def ptime(self):
NTP_DELTA = 2208988800
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
#addr = socket.getaddrinfo(self.ntp_host, 123)[0]...
addr=(self.ntp_host, 123)
s = socket.socket(socket.AF_INET, socket.SOCK_DGR...
# s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
wday = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
def zfill(self, s, width):
if len(s) < width:
return ("0" * (width - len(s))) + s
else:
return s
def get_now(self):
#t=self.ptime()
#t = t + 9*60*60
##print(t)
#datetimetuple = time.gmtime(t)
#year = str(datetimetuple[0])
#month = self.zfill(str(datetimetuple[1]),2)
#mday = self.zfill(str(datetimetuple[2]),2)
#hour = self.zfill(str(datetimetuple[3]),2)
#minute = self.zfill(str(datetimetuple[4]),2)
#seconds = self.zfill(str(datetimetuple[5]),2)
#wd_index = datetimetuple[6]
##日付と時刻を分けるデリミター("|")を付加
##datetime = year + "/" + month + "/" + mday + " ...
#datetime = year + "/" + month + "/" + mday + " "...
dt=str(datetime.datetime.now())
return dt
class html_parser:
html=""
tokens=[]
reserved_words={'<':'<','>':'>','&':'&','"'...
' ':' ','\n':'<br>','\t':' ...
'\\':'\','{':'{','}':'}...
}
a2c={' ':"+",'\n':"%0D%0A",'\r':"%0D",'\t':"%09",'!':...
'#':"%23",'$':"%24",'%':"%25",'&':"%26", '\'':"%2...
'*':"%2A",'+':"%2B",',':"%2C",'-':"%2D",'/':"%2F"...
';':"%3B",'<':"%3C",'=':"%3D",'>':"%3E",'?':"%3F",
'@':"%40",'[':"%5B",'\\':"%5C",']':"%5D",'^':"%5E",
'`':"%60",'{':"%7B",'|':"%7C",'}':"%7D",'~':"%7E"}
#Spechal char to ascii code
sc2a={'<':"<",'>':">",'&':"&",'"':'"',...
def __init__(self,html):
self.html=html
def html_tokenizer(self):
#print("html_tokenizer")
self.token_ith=0
w=""
tx=[""]
rest=[""]
inx=self.html
xlen=len(self.html)
while xlen>0:
if self.parse_tag(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('<'+tx[0]+'>')
#print('<'+tx[0]+'>')
inx=rest[0]
xlen=len(inx)
elif self.parse_String_Constant(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('"'+tx[0]+'"')
#print('"'+tx[0]+'"')
inx=rest[0]
xlen=len(inx)
else:
wc=inx[0]
inx=inx[1:xlen]
xlen=len(inx)
w=w+wc
if w!="":
self.tokens.append(w)
w=""
def get_html(self):
return self.html
def parse_key(self,key,inx,rest):
#print("parse_key-"+key)
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
#print("parse_String_Constant")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def parse_tag(self,inx,tag,rest):
rx=[""]
xstr=""
wstr=[""]
if self.parse_key('<',inx,rx):
#print("parse_tag")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='>':
if xlen<=0:
return False
if fx=='\\':
inx=inx[1:xlen]
if self.parse_String_Constant(inx,wstr,rx):
inx=rx[0]
xstr=xstr+'"'+wstr[0]+'"'
else:
xstr=xstr+fx
inx=inx[1:xlen]
fx=inx[0]
xlen=len(inx)
if self.parse_key('>',inx,rx):
tag[0]=xstr
rest[0]=rx[0]
return True
return False
def get_tokens(self):
return self.tokens
def next_token(self):
if self.token_ith<len(self.tokens):
self.token_ith=self.token_ith+1
return self.tokens[self.token_ith-1]
return ""
def has_more_tokens(self):
if self.token_ith<len(self.tokens):
return True
return False
def index_from(self,i,str):
p=self.html[i:].index(str)
return p
def p_url(self,p,url):
#print("p_url p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xurl=""
pw=p
xlen=len(self.html)
q=self.p_key('http://',p)
if p!=q:
xurl='http://'
else:
q=self.p_key('https://',p)
if p!=q:
xurl='https://'
else:
return pw
p=q
while not (self.html[p] in [' ','\n','\t','\r']):
p=p+1
if p>=xlen:
p=xlen
break
url[0]=xurl+self.html[q:p]
return p
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if p+keylen>=len(self.html):
return p
q=p
if self.html[q:q+keylen]==key:
#print("key="+key+" have found, q="+str(q+key...
return q+keylen
#print("key="+key+" is not found, p="+str(p))
return p
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',p)
if p!=q:
#print("p_String_Constant p="+str(p)+' '+self...
p=q
fx=self.html[p]
xlen=len(self.html)-p
while fx!='"':
if xlen==0:
return pw
if fx in ['\n','\t','\r']:
return pw
if fx=='\\':
p=p+1
p=p+1
fx=self.html[p]
xlen=len(self.html)-p
q=self.p_key('"',p)
if p!=q:
strc[0]=self.html[pw+1:q-1]
#print("p_string_constant, strc="+str(str...
return q
#print("p_String_Constant, fail to find \"")
return pw
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'-'+str(month[0])+'-'+st...
return p
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
xlen=len(self.html)
if p>=xlen:
return p
while self.html[p]==' ':
p=p+1
if p>=xlen:
p=xlen
break
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.html)
while self.html[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.html[p:q]
#print("p_name p="+str(p)+" q="+str(q)+" name="+s...
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.html)
q=p
while self.html[p] in '0123456789':
q=q+1
if q>=last_p:
break
num[0]=int(self.html[p:q])
#print("p_number p="+str(p)+" q="+str(q)+" num="+...
return q
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
#print("p_qn_equation get "+nx[0]+"="+str...
return q
else:
eqs[nx[0]]=strc[0]
#print("p_an_equation get "+nx[0]+"="+str(str...
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(p,eqs)
if p==q:
return p
p=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.html.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
def p_get_between_tag_with_cond(self,p,tag_name,cond,...
#print("p_get_between_with_cond <"+tag_name+">......
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
while True:
try:
q=self.html.index('<'+tag_name,p)
#print("p_get_between_with_cond p="+str(p...
if p==q:
return pw
q=q+len('<'+tag_name)
p=q
eqs={}
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+s...
if p!=q:
if cond[0] in eqs:
#print("cond[0]="+cond[0]+" eqs[c...
if eqs[cond[0]]!=cond[1]:
continue
else:
break
except:
#print("fail to find <"+tag_name)
return pw
p=self.p_b(p)
#print("p_get_between q="+str(q)+' '+self.html[q:...
#print("pw="+str(pw))
q=self.p_key('>',p)
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if pw!=q:
start_p=q
else:
return p
try:
q=self.html.index('</'+tag_name,start_p)
except:
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("pw="+str(pw))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p]
#print("rx="+rx)
return q
def p_get_between(self,p,tag_name,eqs,rtn):
#print("p_get_between <"+tag_name+">...</"+tag_na...
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
try:
q=self.html.index('<'+tag_name,p)
except:
return pw
#print("p_get_between 2 q="+str(q)+' '+self.html[...
q=q+len('<'+tag_name)
#print("p_get_between 3 q="+str(q)+' '+self.html[...
if p==q:
return pw
p=q
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+self.html...
p=q
p=self.p_b(p)
#print("p="+str(p))
q=self.p_key('>',p)
#print("after > q="+str(q)+' p='+str(p)+' '+self....
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if p!=q:
end_p=q
rtn[0]=""
return q
else:
#print("get between "+tag_name+",fail to ...
return pw
try:
q=self.html.index('</'+tag_name,start_p)
except:
#print("get between "+tag_name+",fail to find...
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("p="+str(p))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p+1]
#print("get between "+tag_name+" rx="+rx)
return q
#def text_to_post_form(self,text):
# #print("html_parser.text_to_post_form")
# #print(text)
# post_form=""
# for i in range(len(text)):
# post_form=post_form+self.code_convert(text[i])
# #print("html_parser.text_to_post_form return")
# return post_form
def code_convert(self,code):
if code in self.a2c:
return self.a2c[code]
else:
return code
def text_to_post_form(self,text):
#print("html_parser.text_to_post_form")
#print(text)
rtn=""
for i in range(len(text)):
rtn+=self.code_convert(text[i])
return rtn
def special_char_to_text(self,text):
#print("special_char_to_text")
#print(text)
text=text.replace("<","<")
text=text.replace(">",">")
text=text.replace("&","&")
text=text.replace(""",'"')
text=text.replace("'","'")
return text
def post(self,url,body):
#payload is a dictionary
headers={'Content-Type': 'application/x-www-form-...
#def http_post(url, value):
# https://docs.openmv.io/library/urequests.html
#body = "value=%s" % (value)
#headers = {'Content-Type': 'application/x-www-fo...
try:
response = requests.post(url, data=body.encod...
r_text=response.text
response.close()
return r_text
except Exception as e:
print(e)
return ""
class pico_wiki_driver:
url=""
def __init__(self):
print('pico_wiki_driver')
#self.url=url
#self.page=page
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
def set_url(self,url):
self.url=url
def set_page(self,page):
self.page=page
def get_url(self):
return self.url
def get_html(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
#print("get_html..."+uri)
#print(all_page)
return all_page
def get_wiki_page(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
if p!=q:
#print("get_wiki_page id=body..."+all_page[rt...
body_x=all_page[rtn[0][0]:rtn[0][1]]
body=parser.special_char_to_text(body_x)
return body
def replace_wiki_page(self,new_body):
print("pico_wiki_driver.replace_wiki_page...")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['write']=array_of_equations[i...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=r[0:start_p]+new_body+r[end_p+1:]
#print("body=......")
#print(new_body)
payload['msg']=edit_html.text_to_post_form(ne...
#print('msg=......')
#print(payload['msg'])
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
body=""
body=body+'encode_hint='+(payload['encode_hint'])...
body=body+'template_page=&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'digest='+payload['digest']+'&'
body=body+'msg='+payload['msg']+'&'
body=body+'write='+payload['write']+'&'
body=body+'original='+payload['original']
#print('body='+body)
edit_html.post(self.url,body)
#print(r)
def upload_file(self,file_dir,file_name,pass_word):
# MIMEタイプの定義
#XLSX_MIMETYPE = 'application/vnd.openxmlformats-...
#file_Name = '2024-12-20-222611.jpg'
#file_dir='C:/vscode/python/bot_computing/'
file_path=file_dir+file_name
MIMETYPE = mimetypes.guess_type(file_path)[0]
print('upload file '+file_path)
# ファイル名とデータの取得
file_Data_Binary = open(file_path, 'rb').read()
# アップロード先のURL
#url = 'http://www.yama-lab.org/fwb4pi/index.php'
#page = 'test02'
url=self.url
page=self.page
# アップロードするファイルの情報を辞書に格納
files = {'attach_file': (file_name, file_Data_Bin...
'encode_hint': 'ぷ',
'pass': pass_word,
'refer': page,
'pcmd': 'post',
'plugin': 'attach'}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': url+'?'+'plugin=attach&pcmd=upload...
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(url, data=m, headers=hx)
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path=file_dir+'return.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_attachment_list(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
attach_end=q
attach_list=[]
if p==q:
return attach_list
#print("get_wiki_page id=body..."+all_page[rtn[0]...
p=rtn[0][0]
eqs={}
while True:
q=parser.p_get_between(p,'a',eqs,rtn)
if p==q:
break
if q>attach_end:
break
print(parser.html[rtn[0][0]:rtn[0][1]])
eqs_img={}
q2=parser.p_get_equations_of_the_tag(rtn[0][0...
fname=parser.html[q2:(rtn[0][1]+1)]
print(fname)
attach_list.append(fname)
q=parser.p_get_between(q2,'a',eqs,rtn)
p=q
print(attach_list)
return attach_list
def delete_attachment(self,attach_name,pass_word):
# アップロードするファイルの情報を辞書に格納
files = {'encode_hint': 'ぷ',
'plugin': 'attach',
'refer': self.page,
'file': attach_name,
'age': '0',
'pcmd': 'delete',
'newname': attach_name,
'pass': pass_word}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': self.url+'?'+'plugin=attach&pcmd=i...
+'file='+attach_name+'&refer='+self.page
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(self.url, data=m, header...
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path='return2.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_wiki_source(self):
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['cancel']=array_of_equations[...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
wiki_source=""
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
wiki_source=r[start_p:end_p]
#body=""
#body=body+'encode_hint='+payload['encode_hint']+...
#body=body+'cmd='+payload['cmd']+'&'
#body=body+'page='+payload['page']+'&'
#body=body+'cancel='+payload['cancel']
##print('body='+body)
#r=edit_html.post(self.url,body)
wiki_source=edit_html.special_char_to_text(wiki_s...
return wiki_source
def get_result(self):
#print("pico_wiki_bot.get_result")
#r=self.get_script_and_result()
all_text=self.get_wiki_source()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
print('pico_wiki_bot.get_result return')
#print(all_text[p+1:q])
return all_text[p+1:q]
def get_script(self):
#print("pico_wiki_bot.get_script")
#r=self.get_script_and_result()
all_text=self.get_wiki_source()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
return all_text[p:q]
class USB_Serial:
PORT0 = '/dev/ttyACM0' # Windows �̏ꍇCOM*
PORT1 = '/dev/ttyACM1'
def __init__(self):
try:
self.ser = serial.Serial(port=self.PORT0,baud...
except Exception as e:
try:
self.ser = serial.Serial(port=self.PORT1,...
except Exception as e:
print(e)
#self.GUI.write_message(str(e))
self.ser=None
self.usb_serial_input_thread=threading.Thread(tar...
self.usb_serial_input_thread.start()
def set_gui(self,gui):
self.GUI=gui;
def send_command(self,command):
self.rtn=None
print('writing '+command)
send_bytes = bytes(command+'\r\n','utf-8')
try:
if self.ser!=None:
self.ser.write(send_bytes)
self.ser.flush()
except Exception as e:
print("serial write error")
self.GUI.write_message("serial write error.")
print(e)
self.GUI.write_message(str(e))
self.rtn="0"
count=0
while True:
if self.rtn!=None:
rtnx=self.rtn
self.rtn=None
return rtnx
if count>1000:
self.rtn=None
return "0"
time.sleep(0.01)
count=count+1
def receive_loop(self):
while True:
try:
if self.ser==None:
continue
buf = self.ser.readlines()
buf = [i.decode().strip() for i in buf]
if len(buf)>0:
print(buf)
for l in buf:
if '>>>' in l:
print('please reset pico.')
continue
#print("buf[i]="+buf)
self.rtn= buf[1]
except Exception as e:
print("error receiving")
#self.GUI.write_message("error receiving")
print(e)
#self.GUI.write_message(str(e))
try:
self.ser = serial.Serial(port=self.PO...
except Exception as e:
print(e)
try:
self.ser = serial.Serial(port=sel...
except Exception as e:
print(e)
#self.GUI.write_message('error '+...
self.ser=None
self.rtn="0"
time.sleep(0.01)
class pico_wiki_bot:
result_lines=[]
script_lines=[]
urls=[]
programs={}
environments={}
time_millis=0
timer=None
current_program_name=""
current_program=""
report=[]
#reportLength=120
current_url=""
current_page=""
current_text=""
next_url_index=""
def __init__(self):
print('wiki_bot')
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
self.bot=None
print('time now='+t)
self.read_initials()
#self.current_url=url
#self.current_page=page
self.next_url_index=0
self.Pico_Wiki_Driver=pico_wiki_driver()
self.all_text=""
self.u_serial=USB_Serial()
if self.Pico_Wiki_Driver.get_url()=="":
return
self.timer=Timer()
#all_text=self.Pico_Wiki_Driver.get_wiki_page()
#if all_text==None:
# return None
#self.timer.init(mode=Timer.PERIODIC, freq=1, cal...
def read_initials(self):
print("read_initials")
if self.bot==None:
return
if self.bot.prop==None:
return
self.environments['waiting_term']=self.bot.prop.g...
self.environments['read_interval']=self.bot.prop....
self.environments['exec_interval']=self.bot.prop....
self.environments['write_interval']=self.bot.prop...
self.environments['report_length']=self.bot.prop....
uri=self.bot.prop.get('object_page_1')
url=uri.split('?')[0]
self.page=uri.split('?')[1]
def set_current_page(self,page):
self.current_page=page
def set_current_url(self,url):
self.current_url=url
def set_current_pass(self,pass_w):
self.pass_word=pass_w
def clear_report(self):
self.report.clear()
def add_report_line(self,send):
#print("add_report_line, report_length="+str(self...
#print(" current report_length="+str(len(self.rep...
#print(" adding..."+send)
while len(self.report)>(int(self.environments['re...
for i in range(len(self.report)-1):
self.report[i]=self.report[i+1]
self.report.pop()
self.report.append(send)
def set_GUI(self,gui):
self.GUI=gui
self.u_serial.set_gui(gui)
def init_params(self):
if self.GUI==None:
return
self.GUI.init_params()
def add_one_second_to_time_millis(self,timer):
#print("pico_wiki_bot.add_one_second_to_time_mill...
self.time_millis=self.time_millis+1000
#print("pico_wiki_bot.add_one_second_to_time_mill...
def stop_timer(self):
self.timer=None
def get_the_object_page(self):
#print("pico_wiki_bot.get_the_object_page")
#print("uri="+uri)
if self.GUI!=None:
try:
object_page=self.GUI.lookup_prop('object_...
pass_word=self.GUI.lookup_prop('object_pa...
except:
object_page=initials['object_page_1']
pass_word=initials['object_page_pass_1'] ...
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
if html!="":
return html
try:
object_page=self.GUI.lookup_prop('object_...
pass_word=self.GUI.lookup_prop('object_pa...
except:
object_page=initials['object_page_2']
pass_word=initials['object_page_pass_1'] ...
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
if html!="":
return html
else:
return ""
def get_script_and_result(self):
#print("pico_wiki_bot.get_script_and_result")
html=self.get_the_object_page()
if html=="":
return ""
parser=html_parser(html)
p=0
eqs={}
rtn=[(0,0)]
q=parser.p_get_between(p,'pre',eqs,rtn)
be=rtn[0]
command_and_result=parser.html[be[0]:be[1]]
self.current_text=parser.special_char_to_text(com...
return self.current_text
def get_result(self):
#print("pico_wiki_bot.get_result")
#r=self.get_script_and_result()
all_text=self.current_text
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
print('pico_wiki_bot.get_result return')
return all_text[p+1:q]
def get_script(self):
#print("pico_wiki_bot.get_script")
r=self.get_script_and_result()
all_text=self.current_text
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
return all_text[p:q]
def get_current_device_line(self):
#command_and_result=self.get_script_and_result()
command_and_result=self.current_text
parser=html_parser(command_and_result)
if command_and_result==None:
return None
p=command_and_result.index("current_device=")
p=p+len("current_device=")
try:
q=command_and_result.index("\n",p)
except:
q=len(command_and_result)
return parser.special_char_to_text(command_and_re...
def replace_wiki(self,new_wiki):
self.Pico_Wiki_Driver.replace_wiki_page(new_wiki)
#print(r)
def replace_result(self):
#print("pico_wiki_bot.replace_result")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
eqs_msg={}
rtn=[(0,0)]
body=""
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=edit_html.special_char_to_text(r[start_p...
#print("body=......")
#print(body)
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
#print("body="+body)
px=body.index("result:")+len("result:")
q=body.index("current_device=",px)
if q<0:
q=len(body)
head_area=body[0:px]
#tail_area=msg[q:]
#result_area=msg[px:q]
new_area=head_area+'\n'
for i in range(len(self.report)):
new_area=new_area+' '+self.report[i]+'\n'
bot_id=self.GUI.lookup_prop('bot_id')
new_area=new_area+" current_device=\""+bot_id+"\"...
gc.collect()
self.Pico_Wiki_Driver.replace_wiki_page(new_area)
def get_attachment_list(self):
return self.Pico_Wiki_Driver.get_attachment_list()
def upload_file(self,file_path,file_name):
self.Pico_Wiki_Driver.upload_file(file_path,file_...
def delete_attachment(self,attach_name):
self.Pico_Wiki_Driver.delete_attachment(attach_na...
def read_script(self):
#print("read_script")
all_text=self.get_script()
if all_text==None or all_text=="":
self.script_lines=[]
return False
else:
self.script_lines=all_text.split('\n')
#print("-----script_lines-----")
#for i in range(len(self.script_lines)):
# print(str(i)+' - '+self.script_lines[i])
if self.GUI != None:
#self.GUI.write_script_start(all_text)
self.GUI.write_script_lines(all_text)
return True
def eval_script(self):
print("eval_script")
self.interpret_lines()
#def print_result(self):
# print("print_result")
def is_object_page(self,parser):
#print("is_object_page, line="+line[:10]+"...")
p=0
p=parser.p_b(p)
q=parser.p_key("object_page ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
self.urls=[]
xurl=[""]
i=0
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
while p!=q:
i=i+1
self.urls.append(xurl[0])
p=q
p=parser.p_b(p)
q=parser.p_key("or ",p)
#print("after key or p="+str(p)+" q="+str(q))
if p==q:
break
p=q
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
#print("p="+str(p)+" q="+str(q))
#for i in range(len(self.urls)):
# print("url["+str(i)+"]="+self.urls[i])
return True
def is_device(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("current_device=",p)
if p==q:
return False
p=q
p=parser.p_b(p)
xname=[""]
q=parser.p_String_Constant(p,xname)
if p==q:
return False
p=parser.p_b(q)
self.current_device=xname[0]
q=parser.p_key(",",p)
if p==q:
return False
p=parser.p_b(q)
q=parser.p_key("Date=",p)
xdate=[]
q=parser.p_Date(p,xdate)
def is_set_command(self,parser,p):
#print("is_set_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("set ",p)
if p==q:
#print("is_set_command return False")
return False
p=parser.p_b(q)
eq={}
q=parser.p_an_equation(p,eq)
if p==q:
return False
for key in eq:
self.environments[key]=eq[key]
self.GUI.write_message("set "+str(key)+"="+st...
self.GUI.map_to_gui(key,self.environments[key])
return True
def is_py_command(self,parser,p):
#print("is_py_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("py ",p)
if p==q:
return False
p=parser.p_b(q)
#print("p="+str(p)+",line[p:10]="+parser.html[p:p...
xname=[""]
q=parser.p_name(p,xname)
self.current_program_name=xname[0]
self.current_program=""
def is_end_command(self,parser,p):
#print("is_end_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("end ",p)
if p==q:
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
self.programs[self.current_program_name]=self.cur...
def ex_pico(self,command):
return(self.u_serial.send_command(command))
def is_run_command(self,parser,p):
# reference : https://qiita.com/kyoshidajp/items/...
# reference : https://qiita.com/kammultica/items/...
print("is_run_command.."+parser.html[p:p+10])
q=parser.p_key("run ",p)
if p==q:
#print("is_run_command return False")
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
if p==q:
#print("is_run_command return False")
return False
r=self.get_result()
if r!=None and r!="":
self.clear_report()
lines=r.split('\n')
for i in range(len(lines)):
self.add_report_line(lines[i])
exec_locals={'self':self}
#print("line="+parser.html)
#print("xname[0]="+xname[0])
program=self.programs[xname[0]]
print("program="+program)
try:
exec(program,globals(),exec_locals)
except Exception as e:
print("fail to exec "+program)
self.add_report_line("fail to exec:"+xname[0])
self.GUI.write_message("fail to exec "+program)
print(str(e))
self.GUI.write_message(str(e))
#self.add_report_line(str(e))
#sys.print_exception()
#try:
# exec(program)
#except:
# print("fail to exec "+program)
# self.add_report_line("fail to exec:"+xname[0])
# sys.print_exception()
def is_command(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("command:",p)
#print("line="+parser.html)
#print("command="+parser.html[q:])
if p==q:
return False
q=parser.p_b(q)
#lx=line[q:]
if self.is_set_command(parser,q):
return True
if self.is_py_command(parser,q):
return True
if self.is_end_command(parser,q):
return True
if self.is_run_command(parser,q):
return True
return False
def is_py(self,parser):
#print("is_py line="+line[:10])
p=0
p=parser.p_b(p)
q=parser.p_key("py: ",p)
if p==q:
return False
lx=parser.html[q:]
self.current_program=self.current_program+lx+'\n'
def interpret_a_line(self,line):
#print("interpret_a_line:"+line)
parser=html_parser(line)
if self.is_object_page(parser):
for i in range(len(self.urls)):
url=self.urls[i]
self.GUI.add_url(url,i)
return
if self.is_device(parser):
return
if self.is_command(parser):
return
if self.is_py(parser):
return
def interpret_lines(self):
#print("interpret_lines")
for i in range(len(self.script_lines)):
try:
self.interpret_a_line(self.script_lines[i])
except Exception as e:
line="error in '"+self.script_lines[i]+"'"
print(line)
self.write_line(line)
line="...:{e}"
print(line)
self.write_line(line)
self.send_result()
def next_url(self):
print("next_url")
if len(self.urls)==0:
return
if int(self.next_url_index)>=len(self.urls):
self.next_url_index=0
urlx=self.urls[int(self.next_url_index)]
p=urlx.index("?")
self.current_page=urlx[p+1:]
#self.current_url=urlx[0:p-1]
self.current_url=urlx[0:p]
print("current_url="+self.current_url)
print("current_page="+self.current_page)
self.next_url_index=int(self.next_url_index)+1
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
def send_result(self):
#print("send_result")
self.GUI.write_message("send_result")
self.replace_result()
def write_line(self,line):
#print("write_line")
self.GUI.write_message("add_report "+line)
self.add_report_line(line)
def read_eval_print_loop(self):
print("read_eval_print_loop")
self.continue_read_eval_print_loop=True
last_read_time=int(time.time() * 1000)
last_eval_time=last_read_time
last_write_time=last_read_time
while self.continue_read_eval_print_loop:
gc.collect()
current_time=int(time.time() * 1000)
if current_time<last_read_time:
last_read_time=current_time
if current_time-last_read_time>self.GUI.looku...
last_read_time=current_time
if self.read_script():
if self.GUI.lookup_prop('exec_interva...
self.eval_script()
self.GUI.update()
else:
self.next_url()
continue
if self.GUI.lookup_prop('exec_interval')>0:
exec_interval_time=self.GUI.lookup_prop('...
if current_time-last_eval_time>exec_inter...
last_eval_time=current_time
self.eval_script()
if self.GUI.lookup_prop('write_interval')>0:
if current_time-last_write_time>self.GUI....
last_write_time=current_time
self.send_result()
#time.sleep_ms(100)
time.sleep(0.1)
def stop_read_eval_print_loop(self):
self.continue_read_eval_print_loop=False
def init_params(self):
uri=self.GUI.lookup_prop('object_page_1')
pass_word=self.GUI.lookup_prop('object_page_pass_...
page=uri.split('?')[1]
url=uri.split('?')[0]
self.set_current_url(url)
self.set_current_page(page)
self.set_current_pass(pass_word)
self.next_url_index=0
def start_thread(gui):
gui.click_start_button()
def main() -> None:
args = sys.argv
if len(args)>=2:
fst_opt=args[1]
else:
fst_opt=None
print('fst_opt='+str(fst_opt))
root=Tk()
bot=pico_wiki_bot()
gui=GUI(root,bot)
bot.set_GUI(gui)
gui.set_Pico_Time(bot.Pico_Time)
gui.init_params()
bot.init_params()
if fst_opt=='-s':
start_thread(gui)
root.mainloop()
#print("-----------get_script_and_result-------------...
#print(bot.get_script_and_result())
#print("-----------get_result-----------------")
#print(bot.get_result())
#print("-----------get_script-----------------")
#print(bot.get_script())
#print("-----------get_current_device_line-----------...
#print(bot.get_current_device_line())
#print("-----------replace_result-----------------")
#bot.replace_result("Hello!\nThis is a test2....\nxxx...
#print("-----------stop_timer--------------------")
#bot.stop_timer()
#bot.read_eval_print_loop()
if __name__=='__main__':
main()
}}
----
#counter
終了行:
[[FrontPage]]
#code(Python){{
# -*- coding: utf-8 -*-
#
# 20250117
#
import time
import requests
from requests_toolbelt import MultipartEncoder
import socket
try:
import ustruct as struct
except:
import struct
from time import sleep
import gc
import copy
import threading
import sys
from tkinter import *
from tkinter import Label
from tkinter import Button
import tkinter as tk
from tkinter import scrolledtext
#from PIL import Image, ImageTk, ImageOps
import tkinter.ttk as ttk
from tkinter import messagebox
from tkinter import filedialog
import time
import json
import pickle
import serial
import mimetypes
initials = {'bot_id':'yama_bot_0000_0000_0000_0001',
'waiting_term':600000,
'object_page_1':'http://192.168.10.10/fwb4pi/...
'object_page_2':'http://192.168.10.11/fwb4pi/...
'object_page_pass_1':'',
'object_page_pass_2':'',
'read_interval': 20000,
'exec_interval': 0,
'write_interval':0,
'report_length':120
}
class Properties:
def __init__(self):
self.properties=initials
self.file_name="bot.properties"
def set_gui(self,gui):
self.gui=gui
def load(self,fn):
try:
with open(fn,mode='rb') as f:
prop_w=pickle.load(f)
for key in prop_w:
self.properties[key]=prop_w[key]
self.map_to_gui()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
def map_to_gui(self):
for key in self.properties:
self.gui.map_to_gui(key,self.properties[key])
def map_from_gui(self):
for key in self.properties:
v=self.gui.lookup_prop(key)
self.properties[key]=v
def get(self,key):
if key in self.properties:
return self.properties[key]
return None
def set(self,key,value):
self.properties[key]=value
def save(self,fn):
self.map_from_gui()
try:
with open(fn,mode='wb') as f:
pickle.dump(self.properties,f)
except Exception as e:
print("file "+fn+" write error")
tb=sys.exc_info()[2]
print("message:{0}".format(e.with_traceback(t...
class GUI:
def __init__(self,root,bot):
self.bot=bot
self.recv_queue=[]
self.root=root
self.root.title("Wiki Bot panel")
self.root.configure(background="white")
self.root.geometry("850x800")
self.notebook =ttk.Notebook(root)
self.main_tab=tk.Frame(self.notebook, bg='white')
self.script_tab=tk.Frame(self.notebook, bg='white')
self.notebook.add(self.main_tab, text="main", und...
self.notebook.add(self.script_tab, text="script")
self.notebook.pack(expand=True, fill='both', padx...
yy=10
# command list
self.start_button=Button(self.main_tab, text="sta...
self.start_button.place(x=10,y=yy)
self.stop_button=Button(self.main_tab, text="stop...
self.stop_button.place(x=80,y=yy)
self.save_prop_button=Button(self.main_tab, text=...
self.save_prop_button.place(x=150,y=yy)
self.clear_message_button=Button(self.main_tab, t...
self.clear_message_button.place(x=280,y=yy)
yy=40
# this bot id, waiting term
self.bot_label=Label(self.main_tab,text="bot", wi...
self.bot_label.place(x=10,y=yy)
self.bot_id_field=Entry(self.main_tab,width=50)
self.bot_id_field.place(x=50,y=yy)
self.waiting_term_label=Label(self.main_tab,text=...
self.waiting_term_label.place(x=380,y=yy)
self.waiting_term_field=Entry(self.main_tab,width...
self.waiting_term_field.place(x=450,y=yy)
yy=70
self.page_frame = ttk.Frame(self.main_tab)
self.page_frame.grid(row=0, column=0)
# 項目をつくる
items = ['URI', 'name', 'pass']
for i in range(0, len(items)):
label_item = ttk.Label(self.page_frame,text=i...
label_item.grid(row=0, column=i)
n = 2 # 行数
# 項目1
self.object_page_table_uri = [0] * n
for i in range(0, n):
self.object_page_table_uri[i] = ttk.Entry(sel...
self.object_page_table_uri[i].grid(row=i+1, c...
# 項目2
self.object_page_table_name = [0] * n
for i in range(0, n):
self.object_page_table_name[i] = ttk.Entry(se...
self.object_page_table_name[i].grid(row=i+1, ...
# 項目3
self.object_page_table_pass = [0] * n
for i in range(0, n):
self.object_page_table_pass[i] = ttk.Entry(se...
self.object_page_table_pass[i].grid(row=i+1, ...
self.page_frame.place(x=10,y=yy)
self.param_frame = ttk.Frame(self.main_tab)
self.param_frame.grid(row=0, column=0)
# 項目をつくる
param_items = ['property', 'value']
for i in range(0, len(param_items)):
label_param_item = ttk.Label(self.param_frame...
label_param_item.grid(row=0, column=i)
n = 4 # 行数
# 項目1
self.param_table_property = [0] * n
for i in range(0, n):
self.param_table_property[i] = ttk.Entry(self...
self.param_table_property[i].grid(row=i+1, co...
# 項目2
self.param_table_value = [0] * n
for i in range(0, n):
self.param_table_value[i] = ttk.Entry(self.pa...
self.param_table_value[i].grid(row=i+1, colum...
self.param_frame.place(x=550,y=yy)
self.param_table_property[0].insert(END,'read_int...
self.param_table_property[1].insert(END,'exec_int...
self.param_table_property[2].insert(END,'write_in...
self.param_table_property[3].insert(END,'report_l...
yy=200
self.command_field_label=Label(self.main_tab,text...
self.command_field_label.place(x=10,y=yy)
self.command_field=Entry(self.main_tab,width=50)
self.command_field.place(x=150,y=yy)
self.command_enter_button=Button(self.main_tab,te...
self.command_enter_button.place(x=500,y=yy)
self.clear_command_button=Button(self.main_tab,te...
self.clear_command_button.place(x=580,y=yy)
yy=230
# script area
self.script_frame=tk.Frame(self.main_tab,width=50...
self.script_frame.place(x=10,y=yy)
self.script_area=scrolledtext.ScrolledText(self.s...
self.script_area.pack()
yy=600
# output area
self.message_frame=tk.Frame(self.main_tab,width=5...
self.message_frame.place(x=10,y=yy)
self.message_area=scrolledtext.ScrolledText(self....
self.message_area.pack()
#script tab
yy=10
self.dir_name_label=Label(self.script_tab,text="d...
self.dir_name_label.place(x=30,y=yy)
self.dir_name_field=Entry(self.script_tab,width=50)
self.dir_name_field.place(x=100,y=yy)
#self.read_button=Button(self.script_tab, text="r...
#self.read_button.place(x=500,y=yy)
self.dir_button=Button(self.script_tab, text="dir...
self.dir_button.place(x=570,y=yy)
self.run_script_button2=Button(self.script_tab,te...
self.run_script_button2.place(x=680,y=yy)
yy=50
self.file_name_label=Label(self.script_tab,text="...
self.file_name_label.place(x=30,y=yy)
self.file_name_field=Entry(self.script_tab,width=...
self.file_name_field.place(x=100,y=yy)
self.read_button=Button(self.script_tab, text="re...
self.read_button.place(x=500,y=yy)
self.dir_button=Button(self.script_tab, text="sel...
self.dir_button.place(x=570,y=yy)
yy=100
self.script_frame2=tk.Frame(self.script_tab,width...
self.script_frame2.place(x=50,y=yy)
self.script_area2=scrolledtext.ScrolledText(self....
self.script_area2.pack()
#self.run_script()
#self.set_script()
#self.display_help()
self.prop=Properties()
self.prop.set_gui(self)
def set_Pico_Time(self,time):
self.Pico_Time=time
def init_params(self):
self.prop.load(self.prop.file_name)
self.prop.map_to_gui()
def click_read_button(self):
fn=self.file_name_field.get()
print("fn="+fn)
f=None
try:
f=open(fn,"r")
except Exception as e:
print("file open error.")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
return
try:
self.script_area.delete(1.0,END)
line=f.readline()
while line:
print(line)
self.script_area.insert(END,line)
line=f.readline()
f.close()
except Exception as e:
print("file "+fn+" read error")
tb=sys.exec_info()[2]
print("message:{0}".format(e.with_traceback(t...
def click_dir_button(self):
iDir = os.path.abspath(os.path.dirname(__file__))
folder_name=tk.filedialog.askdirectory(initialdir...
if len(folder_name)==0:
print("cancel selcting folder")
else:
self.dir_name_field.insert(END,folder_name)
def click_select_file_button(self):
fType = [("python", "*.py")]
iDir=""
xdir=self.dir_name_field.get()
if xdir==None or xdir=="":
iDir=os.path.abspath(os.path.dirname(__file__))
else:
iDir=xdir
iFilePath=filedialog.askopenfilename(filetypes=fT...
if iFilePath==None or iFilePath=="":
print("cancel")
else:
self.file_name_field.delete(0,END)
self.file_name_field.insert(END,iFilePath)
def enter_command(self):
print("enter_command")
command=self.command_field.get()
self.ex(command)
def click_start_button(self):
print('click_start_button')
#exec(self.script)
self.script_thread=threading.Thread(target=self.s...
print('new thread, start thread')
#self.start_script()
self.script_thread.start()
print('script_started.')
def click_stop_button(self):
print('click_stop_button')
#exec(self.script)
if self.bot==None:
return
self.bot.stop_read_eval_print_loop()
def start_script(self):
print('start_script')
#self.script=self.script_area.get(0.,END)
#print("run_script:"+self.script)
#exec(self.script)
if self.bot==None:
return
self.bot.read_eval_print_loop()
def run_script(self):
print('run_script')
def click_save_prop_button(self):
self.prop.save(self.prop.file_name)
def add_url(self,url,i):
self.object_page_table_uri[i].delete(0,END)
self.object_page_table_uri[i].insert(END,url)
def map_to_gui(self,key,value):
if key=='bot_id':
self.bot_id_field.delete(0,END)
self.bot_id_field.insert(END,value)
elif key=='waiting_term':
self.waiting_term_field.delete(0,END)
self.waiting_term_field.insert(END,str(value))
elif key=='object_page_1':
self.object_page_table_uri[0].delete(0,END)
self.object_page_table_uri[0].insert(END,value)
elif key=='object_page_2':
self.object_page_table_uri[1].delete(0,END)
self.object_page_table_uri[1].insert(END,value)
elif key=='object_page_pass_1':
self.object_page_table_pass[0].delete(0,END)
self.object_page_table_pass[0].insert(END,val...
elif key=='object_page_pass_2':
self.object_page_table_pass[1].delete(0,END)
self.object_page_table_pass[1].insert(END,val...
elif key=='read_interval':
self.param_table_value[0].delete(0,END)
self.param_table_value[0].insert(END,str(valu...
elif key=='exec_interval':
self.param_table_value[1].delete(0,END)
self.param_table_value[1].insert(END,str(valu...
elif key=='write_interval':
self.param_table_value[2].delete(0,END)
self.param_table_value[2].insert(END,str(valu...
elif key=='report_length':
self.param_table_value[3].delete(0,END)
self.param_table_value[3].insert(END,str(valu...
def lookup_prop(self,key):
if key=='bot_id':
return self.bot_id_field.get()
elif key=='waiting_term':
return int(self.waiting_term_field.get())
elif key=='object_page_1':
return self.object_page_table_uri[0].get()
elif key=='object_page_2':
return self.object_page_table_uri[1].get()
elif key=='object_page_pass_1':
return self.object_page_table_pass[0].get()
elif key=='object_page_pass_2':
return self.object_page_table_pass[1].get() ...
elif key=='read_interval':
return int(self.param_table_value[0].get())
elif key=='exec_interval':
return int(self.param_table_value[1].get())
elif key=='write_interval':
return int(self.param_table_value[2].get())
elif key=='report_length':
return int(self.param_table_value[3].get())
return None
def update(self):
self.prop.map_from_gui()
self.prop.save(self.prop.file_name)
def ini_recv_queue(self):
self.recv_queue=[]
def is_not_empty_recv_queue(self):
if len(self.recv_queue)>0 :
print('recv_queue is empty')
return True
print('recv_queue is not empty')
return False
def are_in_the_recv_queue(self,messages):
print('start are_in_the_recv_queue')
for x in messages:
print('is '+x+' in the recv_queue?')
flag=False
for m in self.recv_queue:
print(m+' in the received queue')
if x in m:
print(x + ' is in the '+m)
flag=True
self.write_message("message "+x+" fou...
break
if flag==False:
return False
return True
def enter_all_command(self):
command=self.command_field_for_all_server.get()
print("send "+command+" to all")
self.write_message("send "+command+" to all")
self.coms.send_command_to_all(command)
self.command_field_for_all_server.delete(0,END)
def ex(self,command):
print("parse "+command)
command=self.r_b(command)
if self.parse_command(command):
self.write_message("OK")
print("OK")
else:
self.write_message("ERROR")
print("ERROR")
def write_script(self, message):
self.clear_script()
self.append_script(message)
def write_script_start(self,message):
self.write_script_thread=threading.Thread(target=...
self.write_script_thread.start()
def write_script_lines(self,lines):
print('start write_script_lines')
self.write_message('start_write_script_lines')
xlines=lines.splitlines()
self.clear_script()
for l in xlines:
self.append_script(l)
self.script_area.update()
print('end write_script_lines')
def append_script(self, message):
self.script_area.insert(tk.END,message)
self.script_area.insert(tk.END,'\n')
#self.script_area.yview_moveto(1)
def clear_script(self):
self.script_area.delete("1.0",END)
self.script_area.update()
def write_message(self, message):
#self.write_message_thread=threading.Thread(targe...
#self.write_message_thread.start()
self.append_message(message)
def clear_and_append_message(self, message):
self.clear_message()
self.append_message(message)
def append_message(self, message):
now = self.Pico_Time.get_now()
addline=now+': '+message
#print('start append_message')
self.message_area.insert(tk.END,addline)
self.message_area.insert(tk.END,'\n')
self.message_area.yview_moveto(1)
tx=self.message_area.get(0.0,tk.END)
count_lines = tx.count('\n') + 1
if count_lines>=500:
self.message_area.delete(0.0,99.0-1)
self.message_area.update()
#print('end append_message')
def clear_message(self):
self.message_area.delete("1.0",END)
self.message_area.update()
class pico_net:
def __init__(self):
print('current ssid= '+ssid)
def set_access_point(self, ssid, password):
self.ssid = ssid
self.password = password
def connect(self):
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
chipid = int.from_bytes(self.wlan.config('mac'), ...
self.wlan.connect(ssid, password)
max_wait = 10
while max_wait > 0:
if self.wlan.status() < 0 or self.wlan.status...
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)
if self.wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('connected')
self.status = self.wlan.ifconfig()
print('ip = ' + self.status[0])
class pico_http:
def __init__(self):
print('pico_http')
def set_url(self, url):
self.url = url
def get(self, url):
#print('pico_http.get(urL='+url+')')
self.url=url
#print('urL='+url)
try:
r = requests.get(url)
#print(r)
r_text=r.text
r.close()
#print('pico_http.get...return')
return r_text
except:
print('pico_http.get...exception')
return ""
def put(self, url, payload):
#print('pico_http.put(urL='+url+'payload='+str(pa...
#payload is a dictionary
uri=url+'?'
for key in payload:
#print(key+'=',end="")
#print(payload[key])
uri=uri+key+'='+payload[key]+'&'
uri=uri[:-1]
#print('uri='+uri)
#r = urequests.post(uri)
#r = urequests.post(url,data=payload)
r = requests.get(uri)
if r=="":
print('pico_http.put...exception')
return ""
#print(r)
r_text=r.text
r.close()
#print(r_json)
return r_text
#post, ref: https://docs.micropython.org/en/latest/li...
def post(self, url, body, headers):
#print('pico_http.post(url='+url+' headers='+str(...
try:
r = requests.post(url,data=body,headers=heade...
#print(r)
r_text=r.text
#print(r_text)
#r_json=r.json()
r.close()
#print(r_json)
print('pico_http.post...return')
return r_text
except:
print('pico_http.post...exception')
return ""
import datetime
class pico_time:
# reference: https://wisteriahill.sakura.ne.jp/CMS/Wo...
# NTPサーバー
#ntp_host = "time-c.nist.gov"
#ntp_host = "time.cloudflare.com"
#ntp_host = "time.google.com"
ntp_host = "ntp.nict.jp"
def __init__(self):
print('pico_time')
def ptime(self):
NTP_DELTA = 2208988800
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
#addr = socket.getaddrinfo(self.ntp_host, 123)[0]...
addr=(self.ntp_host, 123)
s = socket.socket(socket.AF_INET, socket.SOCK_DGR...
# s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
wday = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
def zfill(self, s, width):
if len(s) < width:
return ("0" * (width - len(s))) + s
else:
return s
def get_now(self):
#t=self.ptime()
#t = t + 9*60*60
##print(t)
#datetimetuple = time.gmtime(t)
#year = str(datetimetuple[0])
#month = self.zfill(str(datetimetuple[1]),2)
#mday = self.zfill(str(datetimetuple[2]),2)
#hour = self.zfill(str(datetimetuple[3]),2)
#minute = self.zfill(str(datetimetuple[4]),2)
#seconds = self.zfill(str(datetimetuple[5]),2)
#wd_index = datetimetuple[6]
##日付と時刻を分けるデリミター("|")を付加
##datetime = year + "/" + month + "/" + mday + " ...
#datetime = year + "/" + month + "/" + mday + " "...
dt=str(datetime.datetime.now())
return dt
class html_parser:
html=""
tokens=[]
reserved_words={'<':'<','>':'>','&':'&','"'...
' ':' ','\n':'<br>','\t':' ...
'\\':'\','{':'{','}':'}...
}
a2c={' ':"+",'\n':"%0D%0A",'\r':"%0D",'\t':"%09",'!':...
'#':"%23",'$':"%24",'%':"%25",'&':"%26", '\'':"%2...
'*':"%2A",'+':"%2B",',':"%2C",'-':"%2D",'/':"%2F"...
';':"%3B",'<':"%3C",'=':"%3D",'>':"%3E",'?':"%3F",
'@':"%40",'[':"%5B",'\\':"%5C",']':"%5D",'^':"%5E",
'`':"%60",'{':"%7B",'|':"%7C",'}':"%7D",'~':"%7E"}
#Spechal char to ascii code
sc2a={'<':"<",'>':">",'&':"&",'"':'"',...
def __init__(self,html):
self.html=html
def html_tokenizer(self):
#print("html_tokenizer")
self.token_ith=0
w=""
tx=[""]
rest=[""]
inx=self.html
xlen=len(self.html)
while xlen>0:
if self.parse_tag(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('<'+tx[0]+'>')
#print('<'+tx[0]+'>')
inx=rest[0]
xlen=len(inx)
elif self.parse_String_Constant(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('"'+tx[0]+'"')
#print('"'+tx[0]+'"')
inx=rest[0]
xlen=len(inx)
else:
wc=inx[0]
inx=inx[1:xlen]
xlen=len(inx)
w=w+wc
if w!="":
self.tokens.append(w)
w=""
def get_html(self):
return self.html
def parse_key(self,key,inx,rest):
#print("parse_key-"+key)
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
#print("parse_String_Constant")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def parse_tag(self,inx,tag,rest):
rx=[""]
xstr=""
wstr=[""]
if self.parse_key('<',inx,rx):
#print("parse_tag")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='>':
if xlen<=0:
return False
if fx=='\\':
inx=inx[1:xlen]
if self.parse_String_Constant(inx,wstr,rx):
inx=rx[0]
xstr=xstr+'"'+wstr[0]+'"'
else:
xstr=xstr+fx
inx=inx[1:xlen]
fx=inx[0]
xlen=len(inx)
if self.parse_key('>',inx,rx):
tag[0]=xstr
rest[0]=rx[0]
return True
return False
def get_tokens(self):
return self.tokens
def next_token(self):
if self.token_ith<len(self.tokens):
self.token_ith=self.token_ith+1
return self.tokens[self.token_ith-1]
return ""
def has_more_tokens(self):
if self.token_ith<len(self.tokens):
return True
return False
def index_from(self,i,str):
p=self.html[i:].index(str)
return p
def p_url(self,p,url):
#print("p_url p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xurl=""
pw=p
xlen=len(self.html)
q=self.p_key('http://',p)
if p!=q:
xurl='http://'
else:
q=self.p_key('https://',p)
if p!=q:
xurl='https://'
else:
return pw
p=q
while not (self.html[p] in [' ','\n','\t','\r']):
p=p+1
if p>=xlen:
p=xlen
break
url[0]=xurl+self.html[q:p]
return p
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if p+keylen>=len(self.html):
return p
q=p
if self.html[q:q+keylen]==key:
#print("key="+key+" have found, q="+str(q+key...
return q+keylen
#print("key="+key+" is not found, p="+str(p))
return p
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',p)
if p!=q:
#print("p_String_Constant p="+str(p)+' '+self...
p=q
fx=self.html[p]
xlen=len(self.html)-p
while fx!='"':
if xlen==0:
return pw
if fx in ['\n','\t','\r']:
return pw
if fx=='\\':
p=p+1
p=p+1
fx=self.html[p]
xlen=len(self.html)-p
q=self.p_key('"',p)
if p!=q:
strc[0]=self.html[pw+1:q-1]
#print("p_string_constant, strc="+str(str...
return q
#print("p_String_Constant, fail to find \"")
return pw
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
q=self.p_key('-',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'-'+str(month[0])+'-'+st...
return p
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
xlen=len(self.html)
if p>=xlen:
return p
while self.html[p]==' ':
p=p+1
if p>=xlen:
p=xlen
break
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.html)
while self.html[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.html[p:q]
#print("p_name p="+str(p)+" q="+str(q)+" name="+s...
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.html)
q=p
while self.html[p] in '0123456789':
q=q+1
if q>=last_p:
break
num[0]=int(self.html[p:q])
#print("p_number p="+str(p)+" q="+str(q)+" num="+...
return q
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
#print("p_qn_equation get "+nx[0]+"="+str...
return q
else:
eqs[nx[0]]=strc[0]
#print("p_an_equation get "+nx[0]+"="+str(str...
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(p,eqs)
if p==q:
return p
p=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.html.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
def p_get_between_tag_with_cond(self,p,tag_name,cond,...
#print("p_get_between_with_cond <"+tag_name+">......
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
while True:
try:
q=self.html.index('<'+tag_name,p)
#print("p_get_between_with_cond p="+str(p...
if p==q:
return pw
q=q+len('<'+tag_name)
p=q
eqs={}
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+s...
if p!=q:
if cond[0] in eqs:
#print("cond[0]="+cond[0]+" eqs[c...
if eqs[cond[0]]!=cond[1]:
continue
else:
break
except:
#print("fail to find <"+tag_name)
return pw
p=self.p_b(p)
#print("p_get_between q="+str(q)+' '+self.html[q:...
#print("pw="+str(pw))
q=self.p_key('>',p)
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if pw!=q:
start_p=q
else:
return p
try:
q=self.html.index('</'+tag_name,start_p)
except:
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("pw="+str(pw))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p]
#print("rx="+rx)
return q
def p_get_between(self,p,tag_name,eqs,rtn):
#print("p_get_between <"+tag_name+">...</"+tag_na...
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
try:
q=self.html.index('<'+tag_name,p)
except:
return pw
#print("p_get_between 2 q="+str(q)+' '+self.html[...
q=q+len('<'+tag_name)
#print("p_get_between 3 q="+str(q)+' '+self.html[...
if p==q:
return pw
p=q
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+self.html...
p=q
p=self.p_b(p)
#print("p="+str(p))
q=self.p_key('>',p)
#print("after > q="+str(q)+' p='+str(p)+' '+self....
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if p!=q:
end_p=q
rtn[0]=""
return q
else:
#print("get between "+tag_name+",fail to ...
return pw
try:
q=self.html.index('</'+tag_name,start_p)
except:
#print("get between "+tag_name+",fail to find...
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("p="+str(p))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p+1]
#print("get between "+tag_name+" rx="+rx)
return q
#def text_to_post_form(self,text):
# #print("html_parser.text_to_post_form")
# #print(text)
# post_form=""
# for i in range(len(text)):
# post_form=post_form+self.code_convert(text[i])
# #print("html_parser.text_to_post_form return")
# return post_form
def code_convert(self,code):
if code in self.a2c:
return self.a2c[code]
else:
return code
def text_to_post_form(self,text):
#print("html_parser.text_to_post_form")
#print(text)
rtn=""
for i in range(len(text)):
rtn+=self.code_convert(text[i])
return rtn
def special_char_to_text(self,text):
#print("special_char_to_text")
#print(text)
text=text.replace("<","<")
text=text.replace(">",">")
text=text.replace("&","&")
text=text.replace(""",'"')
text=text.replace("'","'")
return text
def post(self,url,body):
#payload is a dictionary
headers={'Content-Type': 'application/x-www-form-...
#def http_post(url, value):
# https://docs.openmv.io/library/urequests.html
#body = "value=%s" % (value)
#headers = {'Content-Type': 'application/x-www-fo...
try:
response = requests.post(url, data=body.encod...
r_text=response.text
response.close()
return r_text
except Exception as e:
print(e)
return ""
class pico_wiki_driver:
url=""
def __init__(self):
print('pico_wiki_driver')
#self.url=url
#self.page=page
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
def set_url(self,url):
self.url=url
def set_page(self,page):
self.page=page
def get_url(self):
return self.url
def get_html(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
#print("get_html..."+uri)
#print(all_page)
return all_page
def get_wiki_page(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
if p!=q:
#print("get_wiki_page id=body..."+all_page[rt...
body_x=all_page[rtn[0][0]:rtn[0][1]]
body=parser.special_char_to_text(body_x)
return body
def replace_wiki_page(self,new_body):
print("pico_wiki_driver.replace_wiki_page...")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['write']=array_of_equations[i...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=r[0:start_p]+new_body+r[end_p+1:]
#print("body=......")
#print(new_body)
payload['msg']=edit_html.text_to_post_form(ne...
#print('msg=......')
#print(payload['msg'])
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
body=""
body=body+'encode_hint='+(payload['encode_hint'])...
body=body+'template_page=&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'digest='+payload['digest']+'&'
body=body+'msg='+payload['msg']+'&'
body=body+'write='+payload['write']+'&'
body=body+'original='+payload['original']
#print('body='+body)
edit_html.post(self.url,body)
#print(r)
def upload_file(self,file_dir,file_name,pass_word):
# MIMEタイプの定義
#XLSX_MIMETYPE = 'application/vnd.openxmlformats-...
#file_Name = '2024-12-20-222611.jpg'
#file_dir='C:/vscode/python/bot_computing/'
file_path=file_dir+file_name
MIMETYPE = mimetypes.guess_type(file_path)[0]
print('upload file '+file_path)
# ファイル名とデータの取得
file_Data_Binary = open(file_path, 'rb').read()
# アップロード先のURL
#url = 'http://www.yama-lab.org/fwb4pi/index.php'
#page = 'test02'
url=self.url
page=self.page
# アップロードするファイルの情報を辞書に格納
files = {'attach_file': (file_name, file_Data_Bin...
'encode_hint': 'ぷ',
'pass': pass_word,
'refer': page,
'pcmd': 'post',
'plugin': 'attach'}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': url+'?'+'plugin=attach&pcmd=upload...
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(url, data=m, headers=hx)
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path=file_dir+'return.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_attachment_list(self):
uri = self.url+"?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
attach_end=q
attach_list=[]
if p==q:
return attach_list
#print("get_wiki_page id=body..."+all_page[rtn[0]...
p=rtn[0][0]
eqs={}
while True:
q=parser.p_get_between(p,'a',eqs,rtn)
if p==q:
break
if q>attach_end:
break
print(parser.html[rtn[0][0]:rtn[0][1]])
eqs_img={}
q2=parser.p_get_equations_of_the_tag(rtn[0][0...
fname=parser.html[q2:(rtn[0][1]+1)]
print(fname)
attach_list.append(fname)
q=parser.p_get_between(q2,'a',eqs,rtn)
p=q
print(attach_list)
return attach_list
def delete_attachment(self,attach_name,pass_word):
# アップロードするファイルの情報を辞書に格納
files = {'encode_hint': 'ぷ',
'plugin': 'attach',
'refer': self.page,
'file': attach_name,
'age': '0',
'pcmd': 'delete',
'newname': attach_name,
'pass': pass_word}
m = MultipartEncoder(files)
hx={'Content-Type': m.content_type,
# 'Accept:': 'text/html,application/xhtml+...
'Referer': self.url+'?'+'plugin=attach&pcmd=i...
+'file='+attach_name+'&refer='+self.page
# 'Accept-Language': 'ja,en;q=0.9,en-GB;q=...
}
# ファイルのアップロード
response = requests.post(self.url, data=m, header...
# レスポンスの表示
print(response.status_code)
#print(response.content)
#wfile_path='return2.html'
#result_file= open(wfile_path, 'w')
#result_file.write((response.content).decode(enco...
return response.status_code
def get_wiki_source(self):
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['cancel']=array_of_equations[...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
wiki_source=""
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
wiki_source=r[start_p:end_p]
#body=""
#body=body+'encode_hint='+payload['encode_hint']+...
#body=body+'cmd='+payload['cmd']+'&'
#body=body+'page='+payload['page']+'&'
#body=body+'cancel='+payload['cancel']
##print('body='+body)
#r=edit_html.post(self.url,body)
wiki_source=edit_html.special_char_to_text(wiki_s...
return wiki_source
def get_result(self):
#print("pico_wiki_bot.get_result")
#r=self.get_script_and_result()
all_text=self.get_wiki_source()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
print('pico_wiki_bot.get_result return')
#print(all_text[p+1:q])
return all_text[p+1:q]
def get_script(self):
#print("pico_wiki_bot.get_script")
#r=self.get_script_and_result()
all_text=self.get_wiki_source()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
return all_text[p:q]
class USB_Serial:
PORT0 = '/dev/ttyACM0' # Windows �̏ꍇCOM*
PORT1 = '/dev/ttyACM1'
def __init__(self):
try:
self.ser = serial.Serial(port=self.PORT0,baud...
except Exception as e:
try:
self.ser = serial.Serial(port=self.PORT1,...
except Exception as e:
print(e)
#self.GUI.write_message(str(e))
self.ser=None
self.usb_serial_input_thread=threading.Thread(tar...
self.usb_serial_input_thread.start()
def set_gui(self,gui):
self.GUI=gui;
def send_command(self,command):
self.rtn=None
print('writing '+command)
send_bytes = bytes(command+'\r\n','utf-8')
try:
if self.ser!=None:
self.ser.write(send_bytes)
self.ser.flush()
except Exception as e:
print("serial write error")
self.GUI.write_message("serial write error.")
print(e)
self.GUI.write_message(str(e))
self.rtn="0"
count=0
while True:
if self.rtn!=None:
rtnx=self.rtn
self.rtn=None
return rtnx
if count>1000:
self.rtn=None
return "0"
time.sleep(0.01)
count=count+1
def receive_loop(self):
while True:
try:
if self.ser==None:
continue
buf = self.ser.readlines()
buf = [i.decode().strip() for i in buf]
if len(buf)>0:
print(buf)
for l in buf:
if '>>>' in l:
print('please reset pico.')
continue
#print("buf[i]="+buf)
self.rtn= buf[1]
except Exception as e:
print("error receiving")
#self.GUI.write_message("error receiving")
print(e)
#self.GUI.write_message(str(e))
try:
self.ser = serial.Serial(port=self.PO...
except Exception as e:
print(e)
try:
self.ser = serial.Serial(port=sel...
except Exception as e:
print(e)
#self.GUI.write_message('error '+...
self.ser=None
self.rtn="0"
time.sleep(0.01)
class pico_wiki_bot:
result_lines=[]
script_lines=[]
urls=[]
programs={}
environments={}
time_millis=0
timer=None
current_program_name=""
current_program=""
report=[]
#reportLength=120
current_url=""
current_page=""
current_text=""
next_url_index=""
def __init__(self):
print('wiki_bot')
#self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
#self.Pico_Net.connect()
t=self.Pico_Time.get_now()
self.bot=None
print('time now='+t)
self.read_initials()
#self.current_url=url
#self.current_page=page
self.next_url_index=0
self.Pico_Wiki_Driver=pico_wiki_driver()
self.all_text=""
self.u_serial=USB_Serial()
if self.Pico_Wiki_Driver.get_url()=="":
return
self.timer=Timer()
#all_text=self.Pico_Wiki_Driver.get_wiki_page()
#if all_text==None:
# return None
#self.timer.init(mode=Timer.PERIODIC, freq=1, cal...
def read_initials(self):
print("read_initials")
if self.bot==None:
return
if self.bot.prop==None:
return
self.environments['waiting_term']=self.bot.prop.g...
self.environments['read_interval']=self.bot.prop....
self.environments['exec_interval']=self.bot.prop....
self.environments['write_interval']=self.bot.prop...
self.environments['report_length']=self.bot.prop....
uri=self.bot.prop.get('object_page_1')
url=uri.split('?')[0]
self.page=uri.split('?')[1]
def set_current_page(self,page):
self.current_page=page
def set_current_url(self,url):
self.current_url=url
def set_current_pass(self,pass_w):
self.pass_word=pass_w
def clear_report(self):
self.report.clear()
def add_report_line(self,send):
#print("add_report_line, report_length="+str(self...
#print(" current report_length="+str(len(self.rep...
#print(" adding..."+send)
while len(self.report)>(int(self.environments['re...
for i in range(len(self.report)-1):
self.report[i]=self.report[i+1]
self.report.pop()
self.report.append(send)
def set_GUI(self,gui):
self.GUI=gui
self.u_serial.set_gui(gui)
def init_params(self):
if self.GUI==None:
return
self.GUI.init_params()
def add_one_second_to_time_millis(self,timer):
#print("pico_wiki_bot.add_one_second_to_time_mill...
self.time_millis=self.time_millis+1000
#print("pico_wiki_bot.add_one_second_to_time_mill...
def stop_timer(self):
self.timer=None
def get_the_object_page(self):
#print("pico_wiki_bot.get_the_object_page")
#print("uri="+uri)
if self.GUI!=None:
try:
object_page=self.GUI.lookup_prop('object_...
pass_word=self.GUI.lookup_prop('object_pa...
except:
object_page=initials['object_page_1']
pass_word=initials['object_page_pass_1'] ...
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
if html!="":
return html
try:
object_page=self.GUI.lookup_prop('object_...
pass_word=self.GUI.lookup_prop('object_pa...
except:
object_page=initials['object_page_2']
pass_word=initials['object_page_pass_1'] ...
url_page=object_page.split('?')
self.url=url_page[0]
self.page=url_page[1]
self.pass_word=pass_word
self.Pico_Wiki_Driver.set_url(self.url)
self.Pico_Wiki_Driver.set_page(self.page)
html=self.Pico_Wiki_Driver.get_html()
if html!="":
return html
else:
return ""
def get_script_and_result(self):
#print("pico_wiki_bot.get_script_and_result")
html=self.get_the_object_page()
if html=="":
return ""
parser=html_parser(html)
p=0
eqs={}
rtn=[(0,0)]
q=parser.p_get_between(p,'pre',eqs,rtn)
be=rtn[0]
command_and_result=parser.html[be[0]:be[1]]
self.current_text=parser.special_char_to_text(com...
return self.current_text
def get_result(self):
#print("pico_wiki_bot.get_result")
#r=self.get_script_and_result()
all_text=self.current_text
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
print('pico_wiki_bot.get_result return')
return all_text[p+1:q]
def get_script(self):
#print("pico_wiki_bot.get_script")
r=self.get_script_and_result()
all_text=self.current_text
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
return all_text[p:q]
def get_current_device_line(self):
#command_and_result=self.get_script_and_result()
command_and_result=self.current_text
parser=html_parser(command_and_result)
if command_and_result==None:
return None
p=command_and_result.index("current_device=")
p=p+len("current_device=")
try:
q=command_and_result.index("\n",p)
except:
q=len(command_and_result)
return parser.special_char_to_text(command_and_re...
def replace_wiki(self,new_wiki):
self.Pico_Wiki_Driver.replace_wiki_page(new_wiki)
#print(r)
def replace_result(self):
#print("pico_wiki_bot.replace_result")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
eqs_msg={}
rtn=[(0,0)]
body=""
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=edit_html.special_char_to_text(r[start_p...
#print("body=......")
#print(body)
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
#print("body="+body)
px=body.index("result:")+len("result:")
q=body.index("current_device=",px)
if q<0:
q=len(body)
head_area=body[0:px]
#tail_area=msg[q:]
#result_area=msg[px:q]
new_area=head_area+'\n'
for i in range(len(self.report)):
new_area=new_area+' '+self.report[i]+'\n'
bot_id=self.GUI.lookup_prop('bot_id')
new_area=new_area+" current_device=\""+bot_id+"\"...
gc.collect()
self.Pico_Wiki_Driver.replace_wiki_page(new_area)
def get_attachment_list(self):
return self.Pico_Wiki_Driver.get_attachment_list()
def upload_file(self,file_path,file_name):
self.Pico_Wiki_Driver.upload_file(file_path,file_...
def delete_attachment(self,attach_name):
self.Pico_Wiki_Driver.delete_attachment(attach_na...
def read_script(self):
#print("read_script")
all_text=self.get_script()
if all_text==None or all_text=="":
self.script_lines=[]
return False
else:
self.script_lines=all_text.split('\n')
#print("-----script_lines-----")
#for i in range(len(self.script_lines)):
# print(str(i)+' - '+self.script_lines[i])
if self.GUI != None:
#self.GUI.write_script_start(all_text)
self.GUI.write_script_lines(all_text)
return True
def eval_script(self):
print("eval_script")
self.interpret_lines()
#def print_result(self):
# print("print_result")
def is_object_page(self,parser):
#print("is_object_page, line="+line[:10]+"...")
p=0
p=parser.p_b(p)
q=parser.p_key("object_page ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
self.urls=[]
xurl=[""]
i=0
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
while p!=q:
i=i+1
self.urls.append(xurl[0])
p=q
p=parser.p_b(p)
q=parser.p_key("or ",p)
#print("after key or p="+str(p)+" q="+str(q))
if p==q:
break
p=q
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
#print(str(i)+"th url="+xurl[0])
#print("p="+str(p)+" q="+str(q))
#for i in range(len(self.urls)):
# print("url["+str(i)+"]="+self.urls[i])
return True
def is_device(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("current_device=",p)
if p==q:
return False
p=q
p=parser.p_b(p)
xname=[""]
q=parser.p_String_Constant(p,xname)
if p==q:
return False
p=parser.p_b(q)
self.current_device=xname[0]
q=parser.p_key(",",p)
if p==q:
return False
p=parser.p_b(q)
q=parser.p_key("Date=",p)
xdate=[]
q=parser.p_Date(p,xdate)
def is_set_command(self,parser,p):
#print("is_set_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("set ",p)
if p==q:
#print("is_set_command return False")
return False
p=parser.p_b(q)
eq={}
q=parser.p_an_equation(p,eq)
if p==q:
return False
for key in eq:
self.environments[key]=eq[key]
self.GUI.write_message("set "+str(key)+"="+st...
self.GUI.map_to_gui(key,self.environments[key])
return True
def is_py_command(self,parser,p):
#print("is_py_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("py ",p)
if p==q:
return False
p=parser.p_b(q)
#print("p="+str(p)+",line[p:10]="+parser.html[p:p...
xname=[""]
q=parser.p_name(p,xname)
self.current_program_name=xname[0]
self.current_program=""
def is_end_command(self,parser,p):
#print("is_end_command.."+parser.html[p:p+10])
#p=parser.p_b(p)
q=parser.p_key("end ",p)
if p==q:
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
self.programs[self.current_program_name]=self.cur...
def ex_pico(self,command):
return(self.u_serial.send_command(command))
def is_run_command(self,parser,p):
# reference : https://qiita.com/kyoshidajp/items/...
# reference : https://qiita.com/kammultica/items/...
print("is_run_command.."+parser.html[p:p+10])
q=parser.p_key("run ",p)
if p==q:
#print("is_run_command return False")
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
if p==q:
#print("is_run_command return False")
return False
r=self.get_result()
if r!=None and r!="":
self.clear_report()
lines=r.split('\n')
for i in range(len(lines)):
self.add_report_line(lines[i])
exec_locals={'self':self}
#print("line="+parser.html)
#print("xname[0]="+xname[0])
program=self.programs[xname[0]]
print("program="+program)
try:
exec(program,globals(),exec_locals)
except Exception as e:
print("fail to exec "+program)
self.add_report_line("fail to exec:"+xname[0])
self.GUI.write_message("fail to exec "+program)
print(str(e))
self.GUI.write_message(str(e))
#self.add_report_line(str(e))
#sys.print_exception()
#try:
# exec(program)
#except:
# print("fail to exec "+program)
# self.add_report_line("fail to exec:"+xname[0])
# sys.print_exception()
def is_command(self,parser):
p=0
p=parser.p_b(p)
q=parser.p_key("command:",p)
#print("line="+parser.html)
#print("command="+parser.html[q:])
if p==q:
return False
q=parser.p_b(q)
#lx=line[q:]
if self.is_set_command(parser,q):
return True
if self.is_py_command(parser,q):
return True
if self.is_end_command(parser,q):
return True
if self.is_run_command(parser,q):
return True
return False
def is_py(self,parser):
#print("is_py line="+line[:10])
p=0
p=parser.p_b(p)
q=parser.p_key("py: ",p)
if p==q:
return False
lx=parser.html[q:]
self.current_program=self.current_program+lx+'\n'
def interpret_a_line(self,line):
#print("interpret_a_line:"+line)
parser=html_parser(line)
if self.is_object_page(parser):
for i in range(len(self.urls)):
url=self.urls[i]
self.GUI.add_url(url,i)
return
if self.is_device(parser):
return
if self.is_command(parser):
return
if self.is_py(parser):
return
def interpret_lines(self):
#print("interpret_lines")
for i in range(len(self.script_lines)):
try:
self.interpret_a_line(self.script_lines[i])
except Exception as e:
line="error in '"+self.script_lines[i]+"'"
print(line)
self.write_line(line)
line="...:{e}"
print(line)
self.write_line(line)
self.send_result()
def next_url(self):
print("next_url")
if len(self.urls)==0:
return
if int(self.next_url_index)>=len(self.urls):
self.next_url_index=0
urlx=self.urls[int(self.next_url_index)]
p=urlx.index("?")
self.current_page=urlx[p+1:]
#self.current_url=urlx[0:p-1]
self.current_url=urlx[0:p]
print("current_url="+self.current_url)
print("current_page="+self.current_page)
self.next_url_index=int(self.next_url_index)+1
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
def send_result(self):
#print("send_result")
self.GUI.write_message("send_result")
self.replace_result()
def write_line(self,line):
#print("write_line")
self.GUI.write_message("add_report "+line)
self.add_report_line(line)
def read_eval_print_loop(self):
print("read_eval_print_loop")
self.continue_read_eval_print_loop=True
last_read_time=int(time.time() * 1000)
last_eval_time=last_read_time
last_write_time=last_read_time
while self.continue_read_eval_print_loop:
gc.collect()
current_time=int(time.time() * 1000)
if current_time<last_read_time:
last_read_time=current_time
if current_time-last_read_time>self.GUI.looku...
last_read_time=current_time
if self.read_script():
if self.GUI.lookup_prop('exec_interva...
self.eval_script()
self.GUI.update()
else:
self.next_url()
continue
if self.GUI.lookup_prop('exec_interval')>0:
exec_interval_time=self.GUI.lookup_prop('...
if current_time-last_eval_time>exec_inter...
last_eval_time=current_time
self.eval_script()
if self.GUI.lookup_prop('write_interval')>0:
if current_time-last_write_time>self.GUI....
last_write_time=current_time
self.send_result()
#time.sleep_ms(100)
time.sleep(0.1)
def stop_read_eval_print_loop(self):
self.continue_read_eval_print_loop=False
def init_params(self):
uri=self.GUI.lookup_prop('object_page_1')
pass_word=self.GUI.lookup_prop('object_page_pass_...
page=uri.split('?')[1]
url=uri.split('?')[0]
self.set_current_url(url)
self.set_current_page(page)
self.set_current_pass(pass_word)
self.next_url_index=0
def start_thread(gui):
gui.click_start_button()
def main() -> None:
args = sys.argv
if len(args)>=2:
fst_opt=args[1]
else:
fst_opt=None
print('fst_opt='+str(fst_opt))
root=Tk()
bot=pico_wiki_bot()
gui=GUI(root,bot)
bot.set_GUI(gui)
gui.set_Pico_Time(bot.Pico_Time)
gui.init_params()
bot.init_params()
if fst_opt=='-s':
start_thread(gui)
root.mainloop()
#print("-----------get_script_and_result-------------...
#print(bot.get_script_and_result())
#print("-----------get_result-----------------")
#print(bot.get_result())
#print("-----------get_script-----------------")
#print(bot.get_script())
#print("-----------get_current_device_line-----------...
#print(bot.get_current_device_line())
#print("-----------replace_result-----------------")
#bot.replace_result("Hello!\nThis is a test2....\nxxx...
#print("-----------stop_timer--------------------")
#bot.stop_timer()
#bot.read_eval_print_loop()
if __name__=='__main__':
main()
}}
----
#counter
ページ名: