#author("2025-02-19T14:58:24+09:00","default:BotComputing-WikiIoT","BotComputing-WikiIoT") #author("2025-02-20T09:26:16+09:00","default:BotComputing-WikiIoT","BotComputing-WikiIoT") [[wiki_bot_05.py]] #code(Python){{ # -*- coding: utf-8 -*- # # wiki_bot_06 .py # wiki_bot_06.py # t.yamanoue # 20250219 # 20250220 # import time import requests from requests_toolbelt import MultipartEncoder import socket try: import ustruct as struct except: import struct from time import sleep import gc import copy import threading import sys from tkinter import * from tkinter import Label from tkinter import Button import tkinter as tk from tkinter import scrolledtext #from PIL import Image, ImageTk, ImageOps import tkinter.ttk as ttk from tkinter import messagebox from tkinter import filedialog import time import json import pickle import serial import mimetypes import traceback initials = {'bot_id':'yama_bot_0000_0000_0000_0001', 'waiting_term':600000, 'object_page_1':'http://192.168.10.10/fwb4pi/index.php?work20240902-01', 'object_page_2':'http://192.168.10.11/fwb4pi/index.php?work20240902-01', 'object_page_pass_1':'', 'object_page_pass_2':'', 'read_interval': 20000, 'exec_interval': 0, 'write_interval':0, 'report_length':120 } class Properties: def __init__(self): self.properties=initials self.file_name="bot.properties" def set_gui(self,gui): self.gui=gui def load(self,fn): try: with open(fn,mode='rb') as f: prop_w=pickle.load(f) for key in prop_w: self.properties[key]=prop_w[key] self.map_to_gui() except Exception as e: print("file "+fn+" read error") tb=sys.exc_info()[2] print("message:{0}".format(e.with_traceback(tb))) def map_to_gui(self): for key in self.properties: self.gui.map_to_gui(key,self.properties[key]) def map_from_gui(self): for key in self.properties: v=self.gui.lookup_prop(key) self.properties[key]=v def get(self,key): if key in self.properties: return self.properties[key] return None def set(self,key,value): self.properties[key]=value def save(self,fn): self.map_from_gui() try: with open(fn,mode='wb') as f: pickle.dump(self.properties,f) except Exception as e: print("file "+fn+" write error") tb=sys.exc_info()[2] print("message:{0}".format(e.with_traceback(tb))) class GUI: def __init__(self,root,bot): self.bot=bot self.recv_queue=[] self.root=root self.root.title("Wiki Bot panel") self.root.configure(background="white") self.root.geometry("850x800") self.notebook =ttk.Notebook(root) self.main_tab=tk.Frame(self.notebook, bg='white') self.script_tab=tk.Frame(self.notebook, bg='white') self.notebook.add(self.main_tab, text="main", underline=0) self.notebook.add(self.script_tab, text="script") self.notebook.pack(expand=True, fill='both', padx=10, pady=10) yy=10 # command list self.start_button=Button(self.main_tab, text="start",bd=2,bg='white',command=self.click_start_button, width=12, relief=RIDGE) self.start_button.place(x=10,y=yy) self.stop_button=Button(self.main_tab, text="stop",bd=2,bg='white',command=self.click_stop_button, width=12, relief=RIDGE) self.stop_button.place(x=80,y=yy) self.save_prop_button=Button(self.main_tab, text="save prop",bd=2,bg='white',command=self.click_save_prop_button, width=24, relief=RIDGE) self.save_prop_button.place(x=150,y=yy) self.clear_message_button=Button(self.main_tab, text="clear",bd=2,bg='white',command=self.clear_message, width=15, relief=RIDGE) self.clear_message_button.place(x=280,y=yy) yy=40 # this bot id, waiting term self.bot_label=Label(self.main_tab,text="bot", width=6) self.bot_label.place(x=10,y=yy) self.bot_id_field=Entry(self.main_tab,width=50) self.bot_id_field.place(x=50,y=yy) self.waiting_term_label=Label(self.main_tab,text="wait", width=8) self.waiting_term_label.place(x=380,y=yy) self.waiting_term_field=Entry(self.main_tab,width=10) self.waiting_term_field.place(x=450,y=yy) yy=70 self.page_frame = ttk.Frame(self.main_tab) self.page_frame.grid(row=0, column=0) # 項目をつくる items = ['URI', 'name', 'pass'] for i in range(0, len(items)): label_item = ttk.Label(self.page_frame,text=items[i]) label_item.grid(row=0, column=i) n = 2 # 行数 # 項目1 self.object_page_table_uri = [0] * n for i in range(0, n): self.object_page_table_uri[i] = ttk.Entry(self.page_frame, width=40) self.object_page_table_uri[i].grid(row=i+1, column=0) # 項目2 self.object_page_table_name = [0] * n for i in range(0, n): self.object_page_table_name[i] = ttk.Entry(self.page_frame, width=10) self.object_page_table_name[i].grid(row=i+1, column=1) # 項目3 self.object_page_table_pass = [0] * n for i in range(0, n): self.object_page_table_pass[i] = ttk.Entry(self.page_frame, show='*', width=10) self.object_page_table_pass[i].grid(row=i+1, column=2) self.page_frame.place(x=10,y=yy) self.param_frame = ttk.Frame(self.main_tab) self.param_frame.grid(row=0, column=0) # 項目をつくる param_items = ['property', 'value'] for i in range(0, len(param_items)): label_param_item = ttk.Label(self.param_frame,text=param_items[i]) label_param_item.grid(row=0, column=i) n = 4 # 行数 # 項目1 self.param_table_property = [0] * n for i in range(0, n): self.param_table_property[i] = ttk.Entry(self.param_frame, width=20) self.param_table_property[i].grid(row=i+1, column=0) # 項目2 self.param_table_value = [0] * n for i in range(0, n): self.param_table_value[i] = ttk.Entry(self.param_frame, width=10) self.param_table_value[i].grid(row=i+1, column=1) self.param_frame.place(x=550,y=yy) self.param_table_property[0].insert(END,'read_interval') self.param_table_property[1].insert(END,'exec_interval') self.param_table_property[2].insert(END,'write_interval') self.param_table_property[3].insert(END,'report_length') yy=200 self.command_field_label=Label(self.main_tab,text="command", width=10) self.command_field_label.place(x=10,y=yy) self.command_field=Entry(self.main_tab,width=50) self.command_field.place(x=150,y=yy) self.command_enter_button=Button(self.main_tab,text="enter",bd=2,bg='white',command=self.enter_all_command,width=8,relief=RIDGE) self.command_enter_button.place(x=500,y=yy) self.clear_command_button=Button(self.main_tab,text="clear_message",bd=2,bg='white',command=self.clear_message,width=12,relief=RIDGE) self.clear_command_button.place(x=580,y=yy) yy=230 # script area self.script_frame=tk.Frame(self.main_tab,width=50,height=5) self.script_frame.place(x=10,y=yy) self.script_area=scrolledtext.ScrolledText(self.script_frame) self.script_area.pack() yy=600 # output area self.message_frame=tk.Frame(self.main_tab,width=50,height=5) self.message_frame.place(x=10,y=yy) self.message_area=scrolledtext.ScrolledText(self.message_frame) self.message_area.pack() #script tab yy=10 self.dir_name_label=Label(self.script_tab,text="dir name",width=10) self.dir_name_label.place(x=30,y=yy) self.dir_name_field=Entry(self.script_tab,width=50) self.dir_name_field.place(x=100,y=yy) #self.read_button=Button(self.script_tab, text="read",bd=2,bg='white',command=self.click_read_button, width=8, relief=RIDGE) #self.read_button.place(x=500,y=yy) self.dir_button=Button(self.script_tab, text="directory",bd=2,bg='white',command=self.click_dir_button, width=10, relief=RIDGE) self.dir_button.place(x=570,y=yy) self.run_script_button2=Button(self.script_tab,text="run script",bd=2,bg='white',command=self.run_script,width=8, relief=RIDGE) self.run_script_button2.place(x=680,y=yy) yy=50 self.file_name_label=Label(self.script_tab,text="file name",width=10) self.file_name_label.place(x=30,y=yy) self.file_name_field=Entry(self.script_tab,width=50) self.file_name_field.place(x=100,y=yy) self.read_button=Button(self.script_tab, text="read",bd=2,bg='white',command=self.click_read_button, width=8, relief=RIDGE) self.read_button.place(x=500,y=yy) self.dir_button=Button(self.script_tab, text="select",bd=2,bg='white',command=self.click_select_file_button, width=10, relief=RIDGE) self.dir_button.place(x=570,y=yy) yy=100 self.script_frame2=tk.Frame(self.script_tab,width=50,height=100) self.script_frame2.place(x=50,y=yy) self.script_area2=scrolledtext.ScrolledText(self.script_frame2) self.script_area2.pack() #self.run_script() #self.set_script() #self.display_help() self.prop=Properties() self.prop.set_gui(self) def set_Pico_Time(self,time): self.Pico_Time=time def init_params(self): self.prop.load(self.prop.file_name) self.prop.map_to_gui() def click_read_button(self): fn=self.file_name_field.get() print("fn="+fn) f=None try: f=open(fn,"r") except Exception as e: print("file open error.") tb=sys.exec_info()[2] print("message:{0}".format(e.with_traceback(tb))) return try: self.script_area.delete(1.0,END) line=f.readline() while line: print(line) self.script_area.insert(END,line) line=f.readline() f.close() except Exception as e: print("file "+fn+" read error") tb=sys.exec_info()[2] print("message:{0}".format(e.with_traceback(tb))) def click_dir_button(self): iDir = os.path.abspath(os.path.dirname(__file__)) folder_name=tk.filedialog.askdirectory(initialdir=iDir) if len(folder_name)==0: print("cancel selcting folder") else: self.dir_name_field.insert(END,folder_name) def click_select_file_button(self): fType = [("python", "*.py")] iDir="" xdir=self.dir_name_field.get() if xdir==None or xdir=="": iDir=os.path.abspath(os.path.dirname(__file__)) else: iDir=xdir iFilePath=filedialog.askopenfilename(filetypes=fType, initialdir=iDir) if iFilePath==None or iFilePath=="": print("cancel") else: self.file_name_field.delete(0,END) self.file_name_field.insert(END,iFilePath) def enter_command(self): print("enter_command") command=self.command_field.get() self.ex(command) def click_start_button(self): print('click_start_button') #exec(self.script) self.script_thread=threading.Thread(target=self.start_script) print('new thread, start thread') #self.start_script() self.script_thread.start() print('script_started.') def click_stop_button(self): print('click_stop_button') #exec(self.script) if self.bot==None: return self.bot.stop_read_eval_print_loop() def start_script(self): print('start_script') #self.script=self.script_area.get(0.,END) #print("run_script:"+self.script) #exec(self.script) if self.bot==None: return self.bot.read_eval_print_loop() def run_script(self): print('run_script') def click_save_prop_button(self): self.prop.save(self.prop.file_name) def add_uri(self,uri,i): if i<2 and i >= 0: self.object_page_table_uri[i].delete(0,END) self.object_page_table_uri[i].insert(END,uri) def map_to_gui(self,key,value): if key=='bot_id': self.bot_id_field.delete(0,END) self.bot_id_field.insert(END,value) elif key=='waiting_term': self.waiting_term_field.delete(0,END) self.waiting_term_field.insert(END,str(value)) elif key=='object_page_1': self.object_page_table_uri[0].delete(0,END) self.object_page_table_uri[0].insert(END,value) elif key=='object_page_2': self.object_page_table_uri[1].delete(0,END) self.object_page_table_uri[1].insert(END,value) elif key=='object_page_pass_1': self.object_page_table_pass[0].delete(0,END) self.object_page_table_pass[0].insert(END,value) elif key=='object_page_pass_2': self.object_page_table_pass[1].delete(0,END) self.object_page_table_pass[1].insert(END,value) elif key=='read_interval': self.param_table_value[0].delete(0,END) self.param_table_value[0].insert(END,str(value)) elif key=='exec_interval': self.param_table_value[1].delete(0,END) self.param_table_value[1].insert(END,str(value)) elif key=='write_interval': self.param_table_value[2].delete(0,END) self.param_table_value[2].insert(END,str(value)) elif key=='report_length': self.param_table_value[3].delete(0,END) self.param_table_value[3].insert(END,str(value)) def lookup_prop(self,key): if key=='bot_id': return self.bot_id_field.get() elif key=='waiting_term': return int(self.waiting_term_field.get()) elif key=='object_page_1': return self.object_page_table_uri[0].get() elif key=='object_page_2': return self.object_page_table_uri[1].get() elif key=='object_page_pass_1': return self.object_page_table_pass[0].get() elif key=='object_page_pass_2': return self.object_page_table_pass[1].get() elif key=='read_interval': return int(self.param_table_value[0].get()) elif key=='exec_interval': return int(self.param_table_value[1].get()) elif key=='write_interval': return int(self.param_table_value[2].get()) elif key=='report_length': return int(self.param_table_value[3].get()) return None def update(self): self.prop.map_from_gui() self.prop.save(self.prop.file_name) def ini_recv_queue(self): self.recv_queue=[] def is_not_empty_recv_queue(self): if len(self.recv_queue)>0 : print('recv_queue is empty') return True print('recv_queue is not empty') return False def are_in_the_recv_queue(self,messages): print('start are_in_the_recv_queue') for x in messages: print('is '+x+' in the recv_queue?') flag=False for m in self.recv_queue: print(m+' in the received queue') if x in m: print(x + ' is in the '+m) flag=True self.write_message("message "+x+" found!") break if flag==False: return False return True def enter_all_command(self): command=self.command_field_for_all_server.get() print("send "+command+" to all") self.write_message("send "+command+" to all") self.coms.send_command_to_all(command) self.command_field_for_all_server.delete(0,END) def ex(self,command): print("parse "+command) command=self.r_b(command) if self.parse_command(command): self.write_message("OK") print("OK") else: self.write_message("ERROR") print("ERROR") def write_script(self, message): self.clear_script() self.append_script(message) def write_script_start(self,message): self.write_script_thread=threading.Thread(target=self.write_script_lines, args=(message,)) self.write_script_thread.start() def write_script_list(self,xlines): print('start write_script_lines') self.write_message('start_write_script_lines') self.clear_script() for l in xlines: self.append_script(l) self.script_area.update() print('end write_script_lines') def write_script_lines(self,lines): print('start write_script_lines') self.write_message('start_write_script_lines') xlines=lines.splitlines() self.clear_script() for l in xlines: self.append_script(l) self.script_area.update() print('end write_script_lines') def append_script(self, message): self.script_area.insert(tk.END,message) self.script_area.insert(tk.END,'\n') #self.script_area.yview_moveto(1) def clear_script(self): self.script_area.delete("1.0",END) self.script_area.update() def write_message(self, message): #self.write_message_thread=threading.Thread(target=self.clear_and_append_message, args=(message,)) #self.write_message_thread.start() self.append_message(message) def clear_and_append_message(self, message): self.clear_message() self.append_message(message) def append_message(self, message): now = self.Pico_Time.get_now() addline=now+': '+message #print('start append_message') self.message_area.insert(tk.END,addline) self.message_area.insert(tk.END,'\n') self.message_area.yview_moveto(1) tx=self.message_area.get(0.0,tk.END) count_lines = tx.count('\n') + 1 if count_lines>=500: self.message_area.delete(0.0,99.0-1) self.message_area.update() #print('end append_message') def clear_message(self): self.message_area.delete("1.0",END) self.message_area.update() class pico_net: def __init__(self): print('current ssid= '+ssid) def set_access_point(self, ssid, password): self.ssid = ssid self.password = password def connect(self): self.wlan = network.WLAN(network.STA_IF) self.wlan.active(True) chipid = int.from_bytes(self.wlan.config('mac'), 'big') % 65536 self.wlan.connect(ssid, password) max_wait = 10 while max_wait > 0: if self.wlan.status() < 0 or self.wlan.status() >= 3: break max_wait -= 1 print('waiting for connection...') time.sleep(1) if self.wlan.status() != 3: raise RuntimeError('network connection failed') else: print('connected') self.status = self.wlan.ifconfig() print('ip = ' + self.status[0]) class pico_http: def __init__(self): print('pico_http') def set_url(self, url): self.url = url def get(self, url): #print('pico_http.get(urL='+url+')') self.url=url #print('urL='+url) try: r = requests.get(url) #print(r) r_text=r.text r.close() #print('pico_http.get...return') return r_text except: print('pico_http.get...exception') return "" def put(self, url, payload): #print('pico_http.put(urL='+url+'payload='+str(payload)+')') #payload is a dictionary uri=url+'?' for key in payload: #print(key+'=',end="") #print(payload[key]) uri=uri+key+'='+payload[key]+'&' uri=uri[:-1] #print('uri='+uri) #r = urequests.post(uri) #r = urequests.post(url,data=payload) r = requests.get(uri) if r=="": print('pico_http.put...exception') return "" #print(r) r_text=r.text r.close() #print(r_json) return r_text #post, ref: https://docs.micropython.org/en/latest/library/urequests.html def post(self, url, body, headers): #print('pico_http.post(url='+url+' headers='+str(headers)+')') try: r = requests.post(url,data=body,headers=headers) #print(r) r_text=r.text #print(r_text) #r_json=r.json() r.close() #print(r_json) print('pico_http.post...return') return r_text except: print('pico_http.post...exception') return "" import datetime class pico_time: # reference: https://wisteriahill.sakura.ne.jp/CMS/WordPress/2023/05/10/raspberry-pi-pico-w-ntp-server/ # NTPサーバー #ntp_host = "time-c.nist.gov" #ntp_host = "time.cloudflare.com" #ntp_host = "time.google.com" ntp_host = "ntp.nict.jp" def __init__(self): print('pico_time') def ptime(self): NTP_DELTA = 2208988800 NTP_QUERY = bytearray(48) NTP_QUERY[0] = 0x1b #addr = socket.getaddrinfo(self.ntp_host, 123)[0][-1] addr=(self.ntp_host, 123) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # s.settimeout(1) res = s.sendto(NTP_QUERY, addr) msg = s.recv(48) s.close() val = struct.unpack("!I", msg[40:44])[0] return val - NTP_DELTA wday = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun'] def zfill(self, s, width): if len(s) < width: return ("0" * (width - len(s))) + s else: return s def get_now(self): #t=self.ptime() #t = t + 9*60*60 ##print(t) #datetimetuple = time.gmtime(t) #year = str(datetimetuple[0]) #month = self.zfill(str(datetimetuple[1]),2) #mday = self.zfill(str(datetimetuple[2]),2) #hour = self.zfill(str(datetimetuple[3]),2) #minute = self.zfill(str(datetimetuple[4]),2) #seconds = self.zfill(str(datetimetuple[5]),2) #wd_index = datetimetuple[6] ##日付と時刻を分けるデリミター("|")を付加 ##datetime = year + "/" + month + "/" + mday + " " + self.wday[wd_index] + "|" + hour + ":" + minute + ":" + seconds #datetime = year + "/" + month + "/" + mday + " " + hour + ":" + minute + ":" + seconds dt=str(datetime.datetime.now()) return dt class html_parser: html="" tokens=[] reserved_words={'<':'<','>':'>','&':'&','"':'"','\'':''', ' ':' ','\n':'<br>','\t':' ','\r':'', '\\':'\','{':'{','}':'}','[':'[',']':']' } a2c={' ':"+",'\n':"%0D%0A",'\r':"%0D",'\t':"%09",'!':"%21",'"':"%22", '#':"%23",'$':"%24",'%':"%25",'&':"%26", '\'':"%27",'(': "%28",')':"%29", '*':"%2A",'+':"%2B",',':"%2C",'-':"%2D",'/':"%2F",':':"%3A", ';':"%3B",'<':"%3C",'=':"%3D",'>':"%3E",'?':"%3F", '@':"%40",'[':"%5B",'\\':"%5C",']':"%5D",'^':"%5E", '`':"%60",'{':"%7B",'|':"%7C",'}':"%7D",'~':"%7E"} #Spechal char to ascii code sc2a={'<':"<",'>':">",'&':"&",'"':'"',''':"\'"} def __init__(self,html): self.html=html def html_tokenizer(self): #print("html_tokenizer") self.token_ith=0 w="" tx=[""] rest=[""] inx=self.html xlen=len(self.html) while xlen>0: if self.parse_tag(inx,tx,rest): if w!="": self.tokens.append(w) w="" self.tokens.append('<'+tx[0]+'>') #print('<'+tx[0]+'>') inx=rest[0] xlen=len(inx) elif self.parse_String_Constant(inx,tx,rest): if w!="": self.tokens.append(w) w="" self.tokens.append('"'+tx[0]+'"') #print('"'+tx[0]+'"') inx=rest[0] xlen=len(inx) else: wc=inx[0] inx=inx[1:xlen] xlen=len(inx) w=w+wc if w!="": self.tokens.append(w) w="" def get_html(self): return self.html def parse_key(self,key,inx,rest): #print("parse_key-"+key) keylen=len(key) inlen=len(inx) if inx.startswith(key): rest[0]=inx[keylen:inlen] return True rest[0]=inx return False def parse_String_Constant(self,inx,strc,rest): rx=[""] xstr="" if self.parse_key('"',inx,rx): #print("parse_String_Constant") inx=rx[0] fx=inx[0] xlen=len(inx) while fx!='"': if xlen==0: return False if fx=='\\': inx=inx[1:xlen] else: xstr=xstr+fx xlen=len(inx) inx=inx[1:xlen] fx=inx[0] if self.parse_key('"',inx,rx): strc[0]=xstr rest[0]=rx[0] return True return False def parse_tag(self,inx,tag,rest): rx=[""] xstr="" wstr=[""] if self.parse_key('<',inx,rx): #print("parse_tag") inx=rx[0] fx=inx[0] xlen=len(inx) while fx!='>': if xlen<=0: return False if fx=='\\': inx=inx[1:xlen] if self.parse_String_Constant(inx,wstr,rx): inx=rx[0] xstr=xstr+'"'+wstr[0]+'"' else: xstr=xstr+fx inx=inx[1:xlen] fx=inx[0] xlen=len(inx) if self.parse_key('>',inx,rx): tag[0]=xstr rest[0]=rx[0] return True return False def get_tokens(self): return self.tokens def next_token(self): if self.token_ith<len(self.tokens): self.token_ith=self.token_ith+1 return self.tokens[self.token_ith-1] return "" def has_more_tokens(self): if self.token_ith<len(self.tokens): return True return False def index_from(self,i,str): p=self.html[i:].index(str) return p def p_url(self,p,url): #print("p_url p="+str(p)+' '+self.html[p:p+10]) rx=[""] xurl="" pw=p xlen=len(self.html) q=self.p_key('http://',p) if p!=q: xurl='http://' else: q=self.p_key('https://',p) if p!=q: xurl='https://' else: return pw p=q while not (self.html[p] in [' ','\n','\t','\r']): p=p+1 if p>=xlen: p=xlen break url[0]=xurl+self.html[q:p] return p def p_key(self,key,p): #print("p_key key="+key+" p="+str(p)+" "+self.html[p:p+10]) keylen=len(key) if p+keylen>len(self.html): return p q=p if self.html[q:q+keylen]==key: #print("key="+key+" have found, q="+str(q+keylen)) return q+keylen #print("key="+key+" is not found, p="+str(p)) return p def p_String_Constant(self,p,strc): #print("p_String_Constant p="+str(p)+' '+self.html[p:p+10]) rx=[""] xstr="" pw=p q=self.p_key('"',p) if p!=q: #print("p_String_Constant p="+str(p)+' '+self.html[p:p+10]) p=q fx=self.html[p] xlen=len(self.html)-p while fx!='"': if xlen==0: return pw if fx in ['\n','\t','\r']: return pw if fx=='\\': p=p+1 p=p+1 fx=self.html[p] xlen=len(self.html)-p q=self.p_key('"',p) if p!=q: strc[0]=self.html[pw+1:q-1] #print("p_string_constant, strc="+str(strc[0])) return q #print("p_String_Constant, fail to find \"") return pw def p_Date(self,p,date_str): #print("p_Date p="+str(p)+' '+self.html[p:p+10]) rx=[""] xstr="" pw=p year=[0] q=self.p_number(p,year) if p==q: return pw p=q q=self.p_key('/',p) if p==q: q=self.p_key('-',p) if p==q: return pw p=q month=[0] q=self.p_number(p,month) if p==q: return pw p=q q=self.p_key('/',p) if p==q: q=self.p_key('-',p) if p==q: return pw p=q day=[0] q=self.p_number(p,day) if p==q: return False p=q q=self.p_b(p) if p==q: return pw p=q hour=[0] q=self.p_number(p,hour) if p==q: return pw p=q q=self.p_key(':',p) if p==q: return pw p=q minute=[0] q=self.p_number(p,minute) if p==q: return pw p=q q=self.p_key(':',p) if p==q: return pw p=q second=[0] q=self.p_number(p,second) if p==q: return pw p=q date_str[0]=str(year[0])+'-'+str(month[0])+'-'+str(day[0])+' '+str(hour[0])+':'+str(minute[0])+':'+str(second[0]) return p def p_b(self,p): #print("parse_b") #print("p="+str(p)) xlen=len(self.html) if p>=xlen: return p while self.html[p]==' ': p=p+1 if p>=xlen: p=xlen break return p def p_name(self,p,name): #print("p_name, self.html="+ self.html[p:p+10]) #print("p="+str(p)+"..."+self.html[p:p+10]) q=p last_p=len(self.html) while self.html[q] in 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_': #print("self.html["+str(q)+"]="+self.html[q]) q=q+1 if q>=last_p: break name[0]=self.html[p:q] #print("p_name p="+str(p)+" q="+str(q)+" name="+str(name[0])) #print("p="+str(p)+" q="+str(q)+" name="+name[0]) #print("name[0]= "+name[0]) return q def p_number(self,p,num): #print("p_number") #print("p="+str(p)) n="" last_p=len(self.html) q=p while self.html[p] in '0123456789': q=q+1 if q>=last_p: break num[0]=int(self.html[p:q]) #print("p_number p="+str(p)+" q="+str(q)+" num="+str(num[0])) return q def p_an_equation(self,p,eqs): #print("p_an_equation p="+str(p)+' '+self.html[p:p+10]) pw=p p=self.p_b(p) nx=[""] strc=[""] q=self.p_name(p,nx) #print("p_an_equation name="+nx[0]) if p==q: #print("p_an_equation fail to find name") return pw p=q p=self.p_b(p) q=self.p_key('=',p) if p==q: #print("p_an_equation fail to find =") return pw p=q p=self.p_b(p) q=self.p_String_Constant(p,strc) if p==q: num=[0] q=self.p_number(p,num) if p==q: return pw else: eqs[nx[0]]=num[0] #print("p_qn_equation get "+nx[0]+"="+str(num[0])) return q else: eqs[nx[0]]=strc[0] #print("p_an_equation get "+nx[0]+"="+str(strc[0])) return q def p_equations(self,p,eqs): #print("parse_equations") #print("p="+str(p)) pw=p while True: q=self.p_an_equation(p,eqs) if p==q: return p p=q def p_get_equations_of_the_tag(self,p,tag_name,eqs): #print("p_get_equations_of_the_tag <"+tag_name) #print("p="+str(p)+' '+self.html[p:p+10]) pw=p try: q=self.html.index('<'+tag_name,pw) except: #print("p_get_equation_of_the_tag fail to find <"+tag_name) return p #print("q="+str(q)+' '+self.html[q:q+10]) q=q+len('<'+tag_name) #print("q="+str(q)+' '+self.html[q:q+10]) if pw==q: #print("p_get_equation_of_the_tag fail to find <"+tag_name) return p pw=q q=self.p_equations(pw,eqs) #print("after equations q="+str(q) +' '+self.html[q:q+10]) if pw==q: return p pw=q pw=self.p_b(pw) #print("pw="+str(pw)) q=self.p_key('>',pw) #print("after > q="+str(q)+' '+self.html[q:q+10]) if pw!=q: return q q=self.p_key('/>',pw) if pw!=q: return q return p def p_get_between_tag_with_cond(self,p,tag_name,cond,rtn): #print("p_get_between_with_cond <"+tag_name+">...</"+tag_name+">") #print("p="+str(p)+' '+self.html[p:p+10]) pw=p q=pw while True: try: q=self.html.index('<'+tag_name,p) #print("p_get_between_with_cond p="+str(p)+" q="+str(q)+' '+self.html[q:q+20]) if p==q: return pw q=q+len('<'+tag_name) p=q eqs={} q=self.p_equations(p,eqs) #print("after equations q="+str(q) +' '+self.html[q:q+10]) if p!=q: if cond[0] in eqs: #print("cond[0]="+cond[0]+" eqs[cond[0]]="+eqs[cond[0]]+" cond[1]="+cond[1]) if eqs[cond[0]]!=cond[1]: continue else: break except: #print("fail to find <"+tag_name) return pw p=self.p_b(p) #print("p_get_between q="+str(q)+' '+self.html[q:q+10]) #print("pw="+str(pw)) q=self.p_key('>',p) if p!=q: start_p=q else: q=self.p_key('/>',p) if pw!=q: start_p=q else: return p try: q=self.html.index('</'+tag_name,start_p) except: return pw end_p=q-1 p=q q=q+len('</'+tag_name) p=self.p_b(q) #print("pw="+str(pw)) q=self.p_key('>',p) if q==p: return pw rtn[0]=(start_p,end_p) #rx=self.html[start_p:end_p] #print("rx="+rx) return q def p_get_between(self,p,tag_name,eqs,rtn): #print("p_get_between <"+tag_name+">...</"+tag_name+">") #print("p="+str(p)+' '+self.html[p:p+10]) pw=p q=pw try: q=self.html.index('<'+tag_name,p) except: return pw #print("p_get_between 2 q="+str(q)+' '+self.html[q:q+10]) q=q+len('<'+tag_name) #print("p_get_between 3 q="+str(q)+' '+self.html[q:q+10]) if p==q: return pw p=q q=self.p_equations(p,eqs) #print("after equations q="+str(q) +' '+self.html[q:q+10]) p=q p=self.p_b(p) #print("p="+str(p)) q=self.p_key('>',p) #print("after > q="+str(q)+' p='+str(p)+' '+self.html[p:q+10]) if p!=q: start_p=q else: q=self.p_key('/>',p) if p!=q: end_p=q rtn[0]="" return q else: #print("get between "+tag_name+",fail to find <"+tag_name+"/>") return pw try: q=self.html.index('</'+tag_name,start_p) except: #print("get between "+tag_name+",fail to find </"+tag_name) return pw end_p=q-1 p=q q=q+len('</'+tag_name) p=self.p_b(q) #print("p="+str(p)) q=self.p_key('>',p) if q==p: return pw rtn[0]=(start_p,end_p) #rx=self.html[start_p:end_p+1] #print("get between "+tag_name+" rx="+rx) return q #def text_to_post_form(self,text): # #print("html_parser.text_to_post_form") # #print(text) # post_form="" # for i in range(len(text)): # post_form=post_form+self.code_convert(text[i]) # #print("html_parser.text_to_post_form return") # return post_form def code_convert(self,code): if code in self.a2c: return self.a2c[code] else: return code def text_to_post_form(self,text): #print("html_parser.text_to_post_form") #print(text) rtn="" for i in range(len(text)): rtn+=self.code_convert(text[i]) return rtn def special_char_to_text(self,text): #print("special_char_to_text") #print(text) text=text.replace("<","<") text=text.replace(">",">") text=text.replace("&","&") text=text.replace(""",'"') text=text.replace("'","'") return text def post(self,url,body): #payload is a dictionary headers={'Content-Type': 'application/x-www-form-urlencoded'} #def http_post(url, value): # https://docs.openmv.io/library/urequests.html #body = "value=%s" % (value) #headers = {'Content-Type': 'application/x-www-form-urlencoded'} try: response = requests.post(url, data=body.encode('utf-8'), headers=headers) r_text=response.text response.close() return r_text except Exception as e: print(e) return "" class pico_wiki_driver: url="" def __init__(self): print('pico_wiki_driver') #self.url=url #self.page=page #self.Pico_Net=pico_net() self.Pico_Time=pico_time() self.Pico_Http=pico_http() #self.Pico_Net.connect() t=self.Pico_Time.get_now() print('time now='+t) def set_url(self,url): self.url=url def set_page(self,page): self.page=page def get_url(self): return self.url def get_html(self): uri = self.url+"?"+self.page all_page=self.Pico_Http.get(uri) #print("get_html..."+uri) #print(all_page) return all_page def get_wiki_page(self): uri = self.url+"?"+self.page all_page=self.Pico_Http.get(uri) print("get_wiki_page...") #print(all_page) parser=html_parser(all_page) p=0 rtn=[(0,0)] q=parser.p_get_between_tag_with_cond(p,"div",('id',"body"),rtn) if p!=q: #print("get_wiki_page id=body..."+all_page[rtn[0][0]:rtn[0][1]]) body_x=all_page[rtn[0][0]:rtn[0][1]] body=parser.special_char_to_text(body_x) return body def replace_wiki_page(self,new_body): print("pico_wiki_driver.replace_wiki_page...") payload={'cmd':'edit', 'page':self.page} #r=Pico_Http.get(url+'index.php?work202401') #print('uri='+r) r = self.Pico_Http.put(self.url,payload) #print(r) edit_html=html_parser(r) #edit_html.html_tokenizer() p=0 array_of_equations=[] equations={} q=edit_html.p_get_equations_of_the_tag(p,'input',equations) while p!=q: #print('p='+str(p)+' q='+str(q)+' <input....>') array_of_equations.append(equations) p=q equations={} q=edit_html.p_get_equations_of_the_tag(p,'input',equations) for i in range(len(array_of_equations)): #print('i='+str(i)) eqs=array_of_equations[i] for key in eqs: #print(key+'='+array_of_equations[i][key]) if key=='name' and array_of_equations[i][key]=="encode_hint": payload['encode_hint']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="template": payload['template']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="cmd": payload['cmd']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="page": payload['page']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="digest": payload['digest']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="write": payload['write']=array_of_equations[i]['value'] #tokens=edit_html.get_tokens() #for i in range(len(tokens)):_ # print(str(i)+'-'+tokens[i]) eqs_msg={} rtn=[(0,0)] p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn) if rtn[0]!=(0,0): start_p=rtn[0][0] end_p=rtn[0][1] body=r[0:start_p]+new_body+r[end_p+1:] #print("body=......") #print(new_body) payload['msg']=edit_html.text_to_post_form(new_body) #print('msg=......') #print(payload['msg']) eqs_original={} rtn=[(0,0)] p=edit_html.p_get_between(p,'textarea',eqs_original,rtn) if rtn[0]!=(0,0): start_p=rtn[0][0] end_p=rtn[0][1] original_pre=r[start_p:end_p+1] original=edit_html.special_char_to_text(original_pre) payload['original']=edit_html.text_to_post_form(original) body="" body=body+'encode_hint='+(payload['encode_hint'])+'&' body=body+'template_page=&' body=body+'cmd='+payload['cmd']+'&' body=body+'page='+payload['page']+'&' body=body+'digest='+payload['digest']+'&' body=body+'msg='+payload['msg']+'&' body=body+'write='+payload['write']+'&' body=body+'original='+payload['original'] #print('body='+body) edit_html.post(self.url,body) #print(r) def upload_file(self,file_dir,file_name,pass_word): # MIMEタイプの定義 #XLSX_MIMETYPE = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' #file_Name = '2024-12-20-222611.jpg' #file_dir='C:/vscode/python/bot_computing/' file_path=file_dir+file_name MIMETYPE = mimetypes.guess_type(file_path)[0] print('upload file '+file_path) # ファイル名とデータの取得 file_Data_Binary = open(file_path, 'rb').read() # アップロード先のURL #url = 'http://www.yama-lab.org/fwb4pi/index.php' #page = 'test02' url=self.url page=self.page # アップロードするファイルの情報を辞書に格納 files = {'attach_file': (file_name, file_Data_Binary, MIMETYPE), 'encode_hint': 'ぷ', 'pass': pass_word, 'refer': page, 'pcmd': 'post', 'plugin': 'attach'} m = MultipartEncoder(files) hx={'Content-Type': m.content_type, # 'Accept:': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Referer': url+'?'+'plugin=attach&pcmd=upload&page='+page # 'Accept-Language': 'ja,en;q=0.9,en-GB;q=0.8,en-US;q=0.7' } # ファイルのアップロード response = requests.post(url, data=m, headers=hx) # レスポンスの表示 print(response.status_code) #print(response.content) #wfile_path=file_dir+'return.html' #result_file= open(wfile_path, 'w') #result_file.write((response.content).decode(encoding='utf-8')) return response.status_code def get_attachment_list(self): uri = self.url+"?"+self.page all_page=self.Pico_Http.get(uri) print("get_wiki_page...") #print(all_page) parser=html_parser(all_page) p=0 rtn=[(0,0)] q=parser.p_get_between_tag_with_cond(p,"div",('id',"attach"),rtn) attach_end=q attach_list=[] if p==q: return attach_list #print("get_wiki_page id=body..."+all_page[rtn[0][0]:rtn[0][1]]) p=rtn[0][0] eqs={} while True: q=parser.p_get_between(p,'a',eqs,rtn) if p==q: break if q>attach_end: break print(parser.html[rtn[0][0]:rtn[0][1]]) eqs_img={} q2=parser.p_get_equations_of_the_tag(rtn[0][0],'img',eqs_img) fname=parser.html[q2:(rtn[0][1]+1)] print(fname) attach_list.append(fname) q=parser.p_get_between(q2,'a',eqs,rtn) p=q print(attach_list) return attach_list def delete_attachment(self,attach_name,pass_word): # アップロードするファイルの情報を辞書に格納 files = {'encode_hint': 'ぷ', 'plugin': 'attach', 'refer': self.page, 'file': attach_name, 'age': '0', 'pcmd': 'delete', 'newname': attach_name, 'pass': pass_word} m = MultipartEncoder(files) hx={'Content-Type': m.content_type, # 'Accept:': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Referer': self.url+'?'+'plugin=attach&pcmd=info&' +'file='+attach_name+'&refer='+self.page # 'Accept-Language': 'ja,en;q=0.9,en-GB;q=0.8,en-US;q=0.7' } # ファイルのアップロード response = requests.post(self.url, data=m, headers=hx) # レスポンスの表示 print(response.status_code) #print(response.content) #wfile_path='return2.html' #result_file= open(wfile_path, 'w') #result_file.write((response.content).decode(encoding='utf-8')) return response.status_code def get_wiki_source_of_the_uri(uri): url_page=uri.split('?') self.set_url(url_page[0]) self.set_page(url_page[1]) self.get_wiki_source() def get_wiki_source(self): payload={'cmd':'edit', 'page':self.page} #r=Pico_Http.get(url+'index.php?work202401') #print('uri='+r) r = self.Pico_Http.put(self.url,payload) #print(r) edit_html=html_parser(r) #edit_html.html_tokenizer() p=0 array_of_equations=[] equations={} q=edit_html.p_get_equations_of_the_tag(p,'input',equations) while p!=q: #print('p='+str(p)+' q='+str(q)+' <input....>') array_of_equations.append(equations) p=q equations={} q=edit_html.p_get_equations_of_the_tag(p,'input',equations) for i in range(len(array_of_equations)): #print('i='+str(i)) eqs=array_of_equations[i] for key in eqs: #print(key+'='+array_of_equations[i][key]) if key=='name' and array_of_equations[i][key]=="encode_hint": payload['encode_hint']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="template": payload['template']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="cmd": payload['cmd']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="page": payload['page']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="digest": payload['digest']=array_of_equations[i]['value'] elif key=='name' and array_of_equations[i][key]=="cancel": payload['cancel']=array_of_equations[i]['value'] #tokens=edit_html.get_tokens() #for i in range(len(tokens)):_ # print(str(i)+'-'+tokens[i]) eqs_msg={} rtn=[(0,0)] p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn) wiki_source="" if rtn[0]!=(0,0): start_p=rtn[0][0] end_p=rtn[0][1] wiki_source=r[start_p:end_p] #body="" #body=body+'encode_hint='+payload['encode_hint']+'&' #body=body+'cmd='+payload['cmd']+'&' #body=body+'page='+payload['page']+'&' #body=body+'cancel='+payload['cancel'] ##print('body='+body) #r=edit_html.post(self.url,body) wiki_source=edit_html.special_char_to_text(wiki_source) return wiki_source def get_result(self): #print("pico_wiki_bot.get_result") #r=self.get_script_and_result() all_text=self.get_wiki_source() if all_text==None: return None if all_text=="": return "" #parser=html_parser(all_text) p=all_text.index("result:")+len("result:") q=all_text.index("current_device=",p) if q<0: q=len(all_text) else: q=q-1 print('pico_wiki_bot.get_result return') #print('pico_wiki_bot.get_result return') #print(all_text[p+1:q]) print('all_text size='+str(len(all_text))) print('p='+str(p)) print('q='+str(q)) #print('all_text size='+str(len(all_text))) #print('p='+str(p)) #print('q='+str(q)) return all_text[p+1:q] def get_result_of_the_uri(self,uri): try: url_page=uri.split('?') self.set_url(url_page[0]) self.set_page(url_page[1]) return self.get_result() except Exception as e: print('error in get_result_of_the_uri:'+uri) print(e) class USB_Serial: PORT0 = '/dev/ttyACM0' # Windows �̏ꍇCOM* PORT1 = '/dev/ttyACM1' def __init__(self): try: self.ser = serial.Serial(port=self.PORT0,baudrate=115200,timeout=2) except Exception as e: try: self.ser = serial.Serial(port=self.PORT1,baudrate=115200,timeout=2) except Exception as e: print(e) #self.GUI.write_message(str(e)) self.ser=None self.usb_serial_input_thread=threading.Thread(target=self.receive_loop) self.usb_serial_input_thread.start() def set_gui(self,gui): self.GUI=gui; def send_command(self,command): self.rtn=None print('writing '+command) send_bytes = bytes(command+'\r\n','utf-8') try: if self.ser!=None: self.ser.write(send_bytes) self.ser.flush() except Exception as e: print("serial write error") self.GUI.write_message("serial write error.") print(e) self.GUI.write_message(str(e)) self.rtn="0" count=0 while True: if self.rtn!=None: rtnx=self.rtn self.rtn=None return rtnx if count>1000: self.rtn=None return "0" time.sleep(0.01) count=count+1 def receive_loop(self): while True: try: if self.ser==None: continue buf = self.ser.readlines() buf = [i.decode().strip() for i in buf] if len(buf)>0: print(buf) for l in buf: if '>>>' in l: print('please reset pico.') continue #print("buf[i]="+buf) self.rtn= buf[1] except Exception as e: print("error receiving") #self.GUI.write_message("error receiving") print(e) #self.GUI.write_message(str(e)) try: self.ser = serial.Serial(port=self.PORT0,baudrate=115200,timeout=2) except Exception as e: print(e) try: self.ser = serial.Serial(port=self.PORT1,baudrate=115200,timeout=2) except Exception as e: print(e) #self.GUI.write_message('error '+str(e)+' at receive_loop') self.ser=None self.rtn="0" time.sleep(0.01) class pico_wiki_bot: result_lines=[] script_lines=[] urls=[] programs={} environments={} time_millis=0 timer=None current_program_name="" current_program="" report=[] #reportLength=120 current_url="" current_page="" current_text="" next_url_index="" def __init__(self): print('wiki_bot') #self.Pico_Net=pico_net() self.Pico_Time=pico_time() self.Pico_Http=pico_http() #self.Pico_Net.connect() t=self.Pico_Time.get_now() self.bot=None print('time now='+t) self.read_initials() #self.current_url=url #self.current_page=page self.next_url_index=0 self.Pico_Wiki_Driver=pico_wiki_driver() self.all_text="" self.u_serial=USB_Serial() if self.Pico_Wiki_Driver.get_url()=="": return self.timer=Timer() #all_text=self.Pico_Wiki_Driver.get_wiki_page() #if all_text==None: # return None #self.timer.init(mode=Timer.PERIODIC, freq=1, callback=self.add_one_second_to_time_millis) def read_initials(self): print("read_initials") if self.bot==None: return if self.bot.prop==None: return self.environments['waiting_term']=self.bot.prop.get('waiting_term') self.environments['read_interval']=self.bot.prop.get('read_interval') self.environments['exec_interval']=self.bot.prop.get('exec_interval') self.environments['write_interval']=self.bot.prop.get('write_interval') self.environments['report_length']=self.bot.prop.get('report_length') uri=self.bot.prop.get('object_page_1') url=uri.split('?')[0] self.page=uri.split('?')[1] def set_current_page(self,page): self.current_page=page def set_current_url(self,url): self.current_url=url def set_current_pass(self,pass_w): self.pass_word=pass_w def clear_report(self): self.report.clear() def add_report_line(self,send): #print("add_report_line, report_length="+str(self.environments['report_length'])) #print(" current report_length="+str(len(self.report))) #print(" adding..."+send) #while len(self.report)>(int(self.environments['report_length'])-1): while len(self.report)>(int(self.GUI.lookup_prop('report_length'))-1): for i in range(len(self.report)-1): self.report[i]=self.report[i+1] self.report.pop() self.report.append(send) def set_GUI(self,gui): self.GUI=gui self.u_serial.set_gui(gui) def init_params(self): if self.GUI==None: return self.GUI.init_params() def add_one_second_to_time_millis(self,timer): #print("pico_wiki_bot.add_one_second_to_time_millis") self.time_millis=self.time_millis+1000 #print("pico_wiki_bot.add_one_second_to_time_millis return") def stop_timer(self): self.timer=None def get_the_object_page(self,url): #print("pico_wiki_bot.get_the_object_page") #print("uri="+uri) #if self.GUI!=None: # try: # object_page=self.GUI.lookup_prop('object_page_1') # pass_word=self.GUI.lookup_prop('object_page_pass_1') # except: # object_page=initials['object_page_1'] # pass_word=initials['object_page_pass_1'] url_page=url.split('?') #self.url=url_page[0] #self.page=url_page[1] #self.pass_word=pass_word driver=pico_wiki_driver() driver.set_url(url_page[0]) driver.set_page(url_page[1]) html=driver.get_html() return html def get_the_page(self,uri): #print("pico_wiki_bot.get_the_object_page") #print("uri="+uri) url_page=uri.split('?') url=url_page[0] page=url_page[1] driver=pico_wiki_driver() driver.set_url(url) driver.set_page(page) html=driver.get_html() if html!="": return html try: object_page=self.GUI.lookup_prop('object_page_2') pass_word=self.GUI.lookup_prop('object_page_pass_2') except: object_page=initials['object_page_2'] pass_word=initials['object_page_pass_1'] url_page=object_page.split('?') self.url=url_page[0] self.page=url_page[1] self.pass_word=pass_word self.Pico_Wiki_Driver.set_url(self.url) self.Pico_Wiki_Driver.set_page(self.page) html=self.Pico_Wiki_Driver.get_html() if html!="": return html def get_script_and_result(self,url): #print("pico_wiki_bot.get_script_and_result") html=self.get_the_object_page(url) if html=="": return "" parser=html_parser(html) p=0 eqs={} rtn=[(0,0)] q=parser.p_get_between(p,'pre',eqs,rtn) be=rtn[0] command_and_result=parser.html[be[0]:(be[1]+1)] self.current_text=parser.special_char_to_text(command_and_result) return self.current_text def get_result(self,url): #print("pico_wiki_bot.get_result") r=self.get_script_and_result(url) all_text=self.current_text if all_text==None: return None if all_text=="": return "" #parser=html_parser(all_text) p=all_text.index("result:")+len("result:") q=all_text.index("current_device=",p) if q<0: q=len(all_text) else: q=q-1 #print('pico_wiki_bot.get_result return') return all_text[p+1:q] def get_script(self,url): #print("pico_wiki_bot.get_script") all_text=self.get_script_and_result(url) if all_text==None: return None if all_text=="": return "" #parser=html_parser(all_text) p=0 try: q=all_text.index("result:",p) q=q-1 except: q=len(all_text) x_text=all_text[p:q] text_lines=x_text.split('\n') include_text=[[]] head=[] for i in range(len(text_lines)): line=text_lines[i] parser=html_parser(line) if self.is_include(parser,include_text): inc=include_text[0] head=text_lines[:i-1] tail=text_lines[i+1:] head.extend(inc) head.extend(tail) break if head!=[]: text_lines=head return text_lines def get_current_device_line(self): #command_and_result=self.get_script_and_result() command_and_result=self.current_text parser=html_parser(command_and_result) if command_and_result==None: return None p=command_and_result.index("current_device=") p=p+len("current_device=") try: q=command_and_result.index("\n",p) except: q=len(command_and_result) return parser.special_char_to_text(command_and_result[p:q]) def replace_wiki(self,new_wiki): self.Pico_Wiki_Driver.replace_wiki_page(new_wiki) #print(r) def replace_result(self): #print("pico_wiki_bot.replace_result") payload={'cmd':'edit', 'page':self.page} #r=Pico_Http.get(url+'index.php?work202401') #print('uri='+r) r = self.Pico_Http.put(self.url,payload) #print(r) edit_html=html_parser(r) #edit_html.html_tokenizer() p=0 eqs_msg={} rtn=[(0,0)] body="" p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn) if rtn[0]!=(0,0): start_p=rtn[0][0] end_p=rtn[0][1] body=edit_html.special_char_to_text(r[start_p:end_p]) #print("body=......") #print(body) eqs_original={} rtn=[(0,0)] p=edit_html.p_get_between(p,'textarea',eqs_original,rtn) if rtn[0]!=(0,0): start_p=rtn[0][0] end_p=rtn[0][1] original_pre=r[start_p:end_p+1] original=edit_html.special_char_to_text(original_pre) payload['original']=edit_html.text_to_post_form(original) #print("body="+body) px=body.index("result:")+len("result:") q=body.index("current_device=",px) if q<0: q=len(body) head_area=body[0:px] #tail_area=msg[q:] #result_area=msg[px:q] new_area=head_area+'\n' for i in range(len(self.report)): new_area=new_area+' '+self.report[i]+'\n' bot_id=self.GUI.lookup_prop('bot_id') new_area=new_area+" current_device=\""+bot_id+"\". Date="+self.Pico_Time.get_now() gc.collect() self.Pico_Wiki_Driver.replace_wiki_page(new_area) def get_attachment_list(self): return self.Pico_Wiki_Driver.get_attachment_list() def upload_file(self,file_path,file_name): self.Pico_Wiki_Driver.upload_file(file_path,file_name,self.pass_word) def delete_attachment(self,attach_name): self.Pico_Wiki_Driver.delete_attachment(attach_name,self.pass_word) def read_script(self,script_lines): #print("read_script") if self.GUI!=None: try: object_page=self.GUI.lookup_prop('object_page_1') pass_word=self.GUI.lookup_prop('object_page_pass_1') except: object_page=initials['object_page_1'] pass_word=initials['object_page_pass_1'] else: print("failed to read") return False url_page=object_page.split('?') self.url=url_page[0] self.page=url_page[1] self.pass_word=pass_word self.Pico_Wiki_Driver.set_url(self.url) self.Pico_Wiki_Driver.set_page(self.page) html=self.Pico_Wiki_Driver.get_html() all_text=self.get_script(object_page) if all_text==None or all_text==[]: script_lines[0]=None return False script_lines[0]=all_text #print("-----script_lines-----") #for i in range(len(self.script_lines)): # print(str(i)+' - '+self.script_lines[i]) #if self.GUI != None: # #self.GUI.write_script_start(all_text) # self.GUI.write_script_lines(all_text) return True def eval_script(self): print("eval_script") self.interpret_lines() #def print_result(self): # print("print_result") def is_object_page(self,parser): #print("is_object_page, line="+line[:10]+"...") p=0 p=parser.p_b(p) q=parser.p_key("object_page ",p) if p==q: #print("is_object_page return False") return False p=q p=parser.p_b(p) self.urls=[] xurl=[""] i=0 q=parser.p_url(p,xurl) #print(str(i)+"th url="+xurl[0]) while p!=q: i=i+1 self.urls.append(xurl[0]) p=q p=parser.p_b(p) q=parser.p_key("or ",p) #print("after key or p="+str(p)+" q="+str(q)) if p==q: break p=q p=parser.p_b(p) xurl=[""] q=parser.p_url(p,xurl) #print(str(i)+"th url="+xurl[0]) #print("p="+str(p)+" q="+str(q)) #for i in range(len(self.urls)): # print("url["+str(i)+"]="+self.urls[i]) return True def is_device(self,parser): p=0 p=parser.p_b(p) q=parser.p_key("current_device=",p) if p==q: return False p=q p=parser.p_b(p) xname=[""] q=parser.p_String_Constant(p,xname) if p==q: return False p=parser.p_b(q) self.current_device=xname[0] q=parser.p_key(",",p) if p==q: return False p=parser.p_b(q) q=parser.p_key("Date=",p) xdate=[] q=parser.p_Date(p,xdate) def is_include(self,parser,script_lines): #print("is_object_page, line="+line[:10]+"...") p=0 p=parser.p_b(p) q=parser.p_key("include ",p) if p==q: #print("is_object_page return False") return False p=q p=parser.p_b(p) include_pages=[] xurl=[""] i=0 q=parser.p_url(p,xurl) #print(str(i)+"th url="+xurl[0]) while p!=q: i=i+1 include_pages.append(xurl[0]) p=q p=parser.p_b(p) q=parser.p_key("or ",p) #print("after key or p="+str(p)+" q="+str(q)) if p==q: break p=q try: p=parser.p_b(p) xurl=[""] q=parser.p_url(p,xurl) #print(str(i)+"th url="+xurl[0]) #print("p="+str(p)+" q="+str(q)) except Exception as e: print('error in include '+e.toString()) return False for i in range(len(include_pages)): print("include_pages["+str(i)+"]="+include_pages[i]) x_lines=self.get_script(include_pages[i]) if x_lines==None or x_lines=="": continue else: script_lines[0]=x_lines break return True def set_page_name(self,eq): rv=eq['pageName'] print("set pageName="+rv) date=self.Pico_Time.get_now() print('ok?') #date=2024-12-31 14:25:16.61944 time_x=(date.split())[1] print('time_x='+time_x) if "<hour>" in rv: #need to correct, to replace <day> and hour>simultanewously. time_xx=time_x.split(':') #hm=time_xx[0]+'_'+time_xx[1] xrv=rv.replace("<hour>",time_xx[0]) page_1=self.GUI.lookup_prop('object_page_1') url_and_page=page_1.split('?') url_and_page[1]=xrv new_page_1=url_and_page[0]+'?'+url_and_page[1] print(new_page_1) self.GUI.map_to_gui('object_page_1',new_page_1) page_2=self.GUI.lookup_prop('object_page_2') url_and_page=page_2.split('?') url_and_page[1]=xrv new_page_2=url_and_page[0]+'?'+url_and_page[1] print(new_page_2) self.GUI.map_to_gui('object_page_2',new_page_2) if "<day>" in rv: #need to correct date_x=(date.split())[0] print('ok?') print('date_x='+date_x) date_xx=date_x.split('-') xrv=rv.replace("<day>",date_xx[2]) page_1=self.GUI.lookup_prop('object_page_1') url_and_page=page_1.split('?') url_and_page[1]=xrv new_page_1=url_and_page[0]+'?'+url_and_page[1] print(new_page_1) self.GUI.add_uri(new_page_1,0) page_2=self.GUI.lookup_prop('object_page_2') url_and_page=page_2.split('?') url_and_page[1]=xrv new_page_2=url_and_page[0]+'?'+url_and_page[1] print(new_page_2) self.GUI.add_uri(new_page_2,1) def is_set_command(self,parser,p): #print("is_set_command.."+parser.html[p:p+10]) #p=parser.p_b(p) q=parser.p_key("set ",p) if p==q: #print("is_set_command return False") return False p=parser.p_b(q) eq={} q=parser.p_an_equation(p,eq) if p==q: return False if "pageName" in eq: print(eq) self.set_page_name(eq) return True for key in eq: self.environments[key]=eq[key] self.GUI.write_message("set "+str(key)+"="+str(eq[key])) self.GUI.map_to_gui(key,self.environments[key]) return True def is_py_command(self,parser,p): #print("is_py_command.."+parser.html[p:p+10]) #p=parser.p_b(p) q=parser.p_key("py ",p) if p==q: return False p=parser.p_b(q) #print("p="+str(p)+",line[p:10]="+parser.html[p:p+10]) xname=[""] q=parser.p_name(p,xname) self.current_program_name=xname[0] self.current_program="" def is_end_command(self,parser,p): #print("is_end_command.."+parser.html[p:p+10]) #p=parser.p_b(p) q=parser.p_key("end ",p) if p==q: return False p=parser.p_b(q) xname=[""] q=parser.p_name(p,xname) self.programs[self.current_program_name]=self.current_program def ex_pico(self,command): return(self.u_serial.send_command(command)) def is_run_command(self,parser,p): # reference : https://qiita.com/kyoshidajp/items/57ae371b3f5d8a84fb13 # reference : https://qiita.com/kammultica/items/3201f43eec53e3e56f54 print("is_run_command.."+parser.html[p:p+10]) q=parser.p_key("run ",p) if p==q: #print("is_run_command return False") return False p=parser.p_b(q) xname=[""] q=parser.p_name(p,xname) if p==q: #print("is_run_command return False") return False try: #r=self.get_result(self.current_url+'?'+self.current_page) r=self.get_result(self.url+'?'+self.page) except Exception as e: print("error in run command, get_result..:.{e}") if r!=None and r!="": self.clear_report() lines=r.split('\n') for i in range(len(lines)): self.add_report_line(lines[i]) exec_locals={'self':self} #print("line="+parser.html) #print("xname[0]="+xname[0]) program=self.programs[xname[0]] print("program="+program) try: exec(program,globals(),exec_locals) except Exception as e: print("fail to exec "+program) self.add_report_line("fail to exec:"+xname[0]) self.GUI.write_message("fail to exec "+program) tb = sys.exc_info()[2] print("message:{0}".format(e.with_traceback(tb))) #print(str(e)) self.GUI.write_message(str(e)) traceback.print_stack() traceback.format_exc() #self.add_report_line(str(e)) #sys.print_exception() #try: # exec(program) #except: # print("fail to exec "+program) # self.add_report_line("fail to exec:"+xname[0]) # sys.print_exception() def is_command(self,parser): p=0 p=parser.p_b(p) q=parser.p_key("command:",p) #print("line="+parser.html) #print("command="+parser.html[q:]) if p==q: return False q=parser.p_b(q) #lx=line[q:] if self.is_set_command(parser,q): return True if self.is_py_command(parser,q): return True if self.is_end_command(parser,q): return True if self.is_run_command(parser,q): return True return False def is_py(self,parser): #print("is_py line="+line[:10]) p=0 p=parser.p_b(p) q=parser.p_key("py: ",p) if p==q: return False lx=parser.html[q:] self.current_program=self.current_program+lx+'\n' def interpret_a_line(self,line): #print("interpret_a_line:"+line) parser=html_parser(line) if self.is_object_page(parser): for i in range(len(self.urls)): uri=self.urls[i] self.GUI.add_uri(uri,i) return if self.is_device(parser): return #if self.is_include(parser): # return if self.is_command(parser): return if self.is_py(parser): return def interpret_lines(self): #print("interpret_lines") for i in range(len(self.script_lines)): try: self.interpret_a_line(self.script_lines[i]) except Exception as e: line="error in '"+self.script_lines[i]+"'" print(line) self.write_line(line) line="...:{e}" print(line) self.write_line(line) self.send_result() def next_url(self): print("next_url") if len(self.urls)==0: return if int(self.next_url_index)>=len(self.urls): self.next_url_index=0 urlx=self.urls[int(self.next_url_index)] p=urlx.index("?") self.current_page=urlx[p+1:] #self.current_url=urlx[0:p-1] self.current_url=urlx[0:p] print("current_url="+self.current_url) print("current_page="+self.current_page) self.next_url_index=int(self.next_url_index)+1 self.Pico_Wiki_Driver.set_url(self.current_url) self.Pico_Wiki_Driver.set_page(self.current_page) def send_result(self): print("send_result") self.GUI.write_message("send_result") try: self.replace_result() except Exception as e: print('error while send_result') print(e) def write_line(self,line): #print("write_line") try: self.GUI.write_message("add_report "+line) self.add_report_line(line) except Exception as e: print('error while write_line('+line+')') print(e) def read_eval_print_loop(self): print("read_eval_print_loop") self.continue_read_eval_print_loop=True last_read_time=int(time.time() * 1000) last_eval_time=last_read_time last_write_time=last_read_time while self.continue_read_eval_print_loop: gc.collect() current_time=int(time.time() * 1000) if current_time<last_read_time: last_read_time=current_time if current_time-last_read_time>self.GUI.lookup_prop('read_interval'): last_read_time=current_time new_lines=[None] if self.read_script(new_lines): if self.GUI != None: #self.GUI.write_script_start(all_text) self.GUI.write_script_list(new_lines[0]) self.script_lines=new_lines[0] if self.GUI.lookup_prop('exec_interval')==0: self.eval_script() self.GUI.update() #else: # self.next_url() # continue if self.GUI.lookup_prop('exec_interval')>0: exec_interval_time=self.GUI.lookup_prop('exec_interval') if current_time-last_eval_time>exec_interval_time: last_eval_time=current_time self.eval_script() if self.GUI.lookup_prop('write_interval')>0: if current_time-last_write_time>self.GUI.lookup_prop('exec_interval'): last_write_time=current_time self.send_result() #time.sleep_ms(100) time.sleep(0.1) def stop_read_eval_print_loop(self): self.continue_read_eval_print_loop=False def init_params(self): uri=self.GUI.lookup_prop('object_page_1') pass_word=self.GUI.lookup_prop('object_page_pass_1') page=uri.split('?')[1] url=uri.split('?')[0] self.set_current_url(url) self.set_current_page(page) self.set_current_pass(pass_word) self.next_url_index=0 def start_thread(gui): gui.click_start_button() def main() -> None: args = sys.argv if len(args)>=2: fst_opt=args[1] else: fst_opt=None print('fst_opt='+str(fst_opt)) root=Tk() bot=pico_wiki_bot() gui=GUI(root,bot) bot.set_GUI(gui) gui.set_Pico_Time(bot.Pico_Time) gui.init_params() bot.init_params() if fst_opt=='-s': start_thread(gui) root.mainloop() #print("-----------get_script_and_result-----------------") #print(bot.get_script_and_result()) #print("-----------get_result-----------------") #print(bot.get_result()) #print("-----------get_script-----------------") #print(bot.get_script()) #print("-----------get_current_device_line-----------------") #print(bot.get_current_device_line()) #print("-----------replace_result-----------------") #bot.replace_result("Hello!\nThis is a test2....\nxxxx") #print("-----------stop_timer--------------------") #bot.stop_timer() #bot.read_eval_print_loop() if __name__=='__main__': main() }}