pico_wiki_driver_03.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[Pico_Wiki_Driver]]
#code(python){{
import machine
import time
import network
import urequests
import random
import socket
try:
import ustruct as struct
except:
import struct
from time import sleep
from machine import Timer
device_id="yama_pico_0000_0000_0000_0001"
class pico_net:
ssid = 'SSID-xxxx' # replace by the appropriate ssid
password = '12345678' # replace by the appropriate pa...
def __init__(self):
print('current ssid= '+self.ssid)
def set_access_point(self, ssid, password):
self.ssid = ssid
self.password = password
def connect(self):
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
chipid = int.from_bytes(self.wlan.config('mac'), ...
self.wlan.connect(self.ssid, self.password)
max_wait = 10
while max_wait > 0:
if self.wlan.status() < 0 or self.wlan.status...
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)
if self.wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('connected')
self.status = self.wlan.ifconfig()
print('ip = ' + self.status[0])
class pico_http:
def __init__(self):
print('pico_http')
def set_url(self, url):
self.url = url
def get(self, url):
#print('pico_http.get(urL='+url+')')
self.url=url
#print('urL='+url)
try:
r = urequests.get(url)
#print(r)
r_text=r.text
r.close()
#print('pico_http.get...return')
return r_text
except:
print('pico_http.get...exception')
return ""
def put(self, url, payload):
#print('pico_http.put(urL='+url+'payload='+str(pa...
#payload is a dictionary
uri=url+'?'
for key in payload:
#print(key+'=',end="")
#print(payload[key])
uri=uri+key+'='+payload[key]+'&'
uri=uri[:-1]
#print('uri='+uri)
#r = urequests.post(uri)
#r = urequests.post(url,data=payload)
r = urequests.get(uri)
if r=="":
print('pico_http.put...exception')
return ""
#print(r)
r_text=r.text
r.close()
#print(r_json)
return r_text
#post, ref: https://docs.micropython.org/en/latest/li...
def post(self, url, body, headers):
#print('pico_http.post(url='+url+' body='+str(bod...
try:
r = urequests.post(url,data=body,headers=head...
#print(r)
r_text=r.text
#print(r_text)
#r_json=r.json()
r.close()
#print(r_json)
#print('pico_http.post...return')
return r_text
except:
print('pico_http.post...exception')
return ""
class pico_time:
# reference: https://wisteriahill.sakura.ne.jp/CMS/Wo...
# NTPサーバー
#ntp_host = "time-c.nist.gov"
#ntp_host = "time.cloudflare.com"
#ntp_host = "time.google.com"
ntp_host = "ntp.nict.jp"
def __init__(self):
print('pico_time')
def ptime(self):
NTP_DELTA = 2208988800
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
addr = socket.getaddrinfo(self.ntp_host, 123)[0][...
s = socket.socket(socket.AF_INET, socket.SOCK_DGR...
# s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
wday = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
def zfill(self, s, width):
if len(s) < width:
return ("0" * (width - len(s))) + s
else:
return s
def get_now(self):
t=self.ptime()
t = t + 9*60*60
#print(t)
datetimetuple = time.gmtime(t)
year = str(datetimetuple[0])
month = self.zfill(str(datetimetuple[1]),2)
mday = self.zfill(str(datetimetuple[2]),2)
hour = self.zfill(str(datetimetuple[3]),2)
minute = self.zfill(str(datetimetuple[4]),2)
seconds = self.zfill(str(datetimetuple[5]),2)
wd_index = datetimetuple[6]
#日付と時刻を分けるデリミター("|")を付加
#datetime = year + "/" + month + "/" + mday + " "...
datetime = year + "/" + month + "/" + mday + " " ...
return datetime
class html_parser:
html=""
tokens=[]
reserved_words={'<':'<','>':'>','&':'&','"'...
' ':' ','\n':'<br>','\t':' &...
'\\':'\','{':'{','}':'}'...
}
a2c={' ':"+",'\n':"%0D%0A",'\r':"%0D",'\t':"%09",'!':...
'#':"%23",'$':"%24",'%':"%25",'&':"%26", '\'':"%2...
'*':"%2A",'+':"%2B",',':"%2C",'-':"%2D",'/':"%2F"...
';':"%3B",'<':"%3C",'=':"%3D",'>':"%3E",'?':"%3F",
'@':"%40",'[':"%5B",'\\':"%5C",']':"%5D",'^':"%5E",
'`':"%60",'{':"%7B",'|':"%7C",'}':"%7D",'~':"%7E"}
#Spechal char to ascii code
sc2a={'<':"<",'>':">",'&':"&",'"':'"',...
def __init__(self,html):
self.html=html
def html_tokenizer(self):
#print("html_tokenizer")
self.token_ith=0
w=""
tx=[""]
rest=[""]
inx=self.html
xlen=len(self.html)
while xlen>0:
if self.parse_tag(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('<'+tx[0]+'>')
#print('<'+tx[0]+'>')
inx=rest[0]
xlen=len(inx)
elif self.parse_String_Constant(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('"'+tx[0]+'"')
#print('"'+tx[0]+'"')
inx=rest[0]
xlen=len(inx)
else:
wc=inx[0]
inx=inx[1:xlen]
xlen=len(inx)
w=w+wc
if w!="":
self.tokens.append(w)
w=""
def get_html(self):
return self.html
def parse_key(self,key,inx,rest):
#print("parse_key-"+key)
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def p_url(self,p,url):
#print("p_url p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xurl=""
pw=p
xlen=len(self.html)
q=self.p_key('http://',p)
if p!=q:
xurl='http://'
p=q
while not (self.html[p] in [' ','\n','\t','\r...
xurl=xurl+self.html[p]
p=p+1
if p>=xlen:
return pw
url[0]=xurl
return p
q=self.p_key('https://',p)
if p!=q:
xurl='https://'
p=q
while not (self.html[p] in [' ','\n','\t','\r...
xurl=xurl+self.html[p]
p=p+1
if p>=xlen:
return pw
url[0]=xurl
return p
return pw
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if self.html[p:p+keylen]==key:
return p+keylen
return p
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
#print("parse_String_Constant")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',pw)
if pw!=q:
#print("parse_String_Constant")
pw=q
fx=self.html[pw]
xlen=len(self.html)-pw
while fx!='"':
if xlen==0:
return p
if fx=='\\':
pw=pw+1
else:
xstr=xstr+fx
pw=pw+1
fx=self.html[pw]
xlen=len(self.html)-pw
q=self.p_key('"',pw)
if pw!=q:
strc[0]=xstr
return q
return p
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'/'+str(month[0])+'/'+st...
return p
def parse_tag(self,inx,tag,rest):
rx=[""]
xstr=""
wstr=[""]
if self.parse_key('<',inx,rx):
#print("parse_tag")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='>':
if xlen<=0:
return False
if fx=='\\':
inx=inx[1:xlen]
if self.parse_String_Constant(inx,wstr,rx):
inx=rx[0]
xstr=xstr+'"'+wstr[0]+'"'
else:
xstr=xstr+fx
inx=inx[1:xlen]
fx=inx[0]
xlen=len(inx)
if self.parse_key('>',inx,rx):
tag[0]=xstr
rest[0]=rx[0]
return True
return False
def get_tokens(self):
return self.tokens
def next_token(self):
if self.token_ith<len(self.tokens):
self.token_ith=self.token_ith+1
return self.tokens[self.token_ith-1]
return ""
def has_more_tokens(self):
if self.token_ith<len(self.tokens):
return True
return False
def index_from(self,i,str):
p=self.html[i:].index(str)
return p
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
while self.html[p]==' ':
p=p+1
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.html)
while self.html[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.html[p:q]
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.html)
while self.html[p] in '0123456789':
n=n+self.html[p]
p=p+1
if p>=last_p:
break
num[0]=int(n)
return p
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
print("num="+str(num[0]))
return q
else:
#print("strc="+str(strc[0]))
eqs[nx[0]]=strc[0]
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(pw,eqs)
if pw==q:
return pw
pw=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.html.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
def p_get_between_tag_with_cond(self,p,tag_name,cond,...
#print("p_get_between_with_cond <"+tag_name+">......
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
while True:
try:
q=self.html.index('<'+tag_name,p)
#print("p_get_between_with_cond p="+str(p...
if p==q:
return pw
q=q+len('<'+tag_name)
p=q
eqs={}
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+s...
if p!=q:
if cond[0] in eqs:
#print("cond[0]="+cond[0]+" eqs[c...
if eqs[cond[0]]!=cond[1]:
continue
else:
break
except:
#print("fail to find <"+tag_name)
return pw
p=self.p_b(p)
#print("p_get_between q="+str(q)+' '+self.html[q:...
#print("pw="+str(pw))
q=self.p_key('>',p)
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if pw!=q:
start_p=q
else:
return p
try:
q=self.html.index('</'+tag_name,start_p)
except:
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("pw="+str(pw))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p]
#print("rx="+rx)
return q
def p_get_between(self,p,tag_name,eqs,rtn):
#print("p_get_between <"+tag_name+">...</"+tag_na...
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
try:
q=self.html.index('<'+tag_name,p)
except:
return pw
#print("p_get_between 2 q="+str(q)+' '+self.html[...
q=q+len('<'+tag_name)
#print("p_get_between 3 q="+str(q)+' '+self.html[...
if p==q:
return pw
p=q
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+self.html...
p=q
p=self.p_b(p)
#print("p="+str(p))
q=self.p_key('>',p)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if p!=q:
start_p=q
else:
#print("get between "+tag_name+",fail to ...
return pw
try:
q=self.html.index('</'+tag_name,start_p)
except:
#print("get between "+tag_name+",fail to find...
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("p="+str(p))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p+1]
#print("get between "+tag_name+" rx="+rx)
return q
def code_convert(self,code):
if code in self.a2c:
return self.a2c[code]
else:
return code
def text_to_post_form(self,text):
#print("html_parser.text_to_post_form")
#print(text)
post_form=""
for i in range(len(text)):
post_form=post_form+self.code_convert(text[i])
#print("html_parser.text_to_post_form return")
return post_form
def special_char_to_text(self,text):
#print("special_char_to_text")
#print(text)
text=text.replace("<","<")
text=text.replace(">",">")
text=text.replace("&","&")
text=text.replace(""",'"')
text=text.replace("'","'")
return text
def post(self,url,body):
#payload is a dictionary
headers={'Content-Type': 'application/x-www-form-...
#def http_post(url, value):
# https://docs.openmv.io/library/urequests.html
#body = "value=%s" % (value)
#headers = {'Content-Type': 'application/x-www-fo...
response = urequests.post(url, data=body, headers...
r_text=response.text
#print(r_text)
response.close()
#print(r_json)
return r_text
class pico_wiki_driver:
url=""
def __init__(self,url,page):
print('pico_wiki_driver')
self.url=url
self.page=page
self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
def set_url(self,url):
self.url=url
def set_page(self,page):
self.page=page
def get_url(self):
return self.url
def get_html(self):
uri = self.url+"index.php?"+self.page
all_page=self.Pico_Http.get(uri)
#print("get_html..."+uri)
#print(all_page)
return all_page
def get_wiki_page(self):
uri = self.url+"index.php?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
if p!=q:
#print("get_wiki_page id=body..."+all_page[rt...
body_x=all_page[rtn[0][0]:rtn[0][1]]
body=parser.special_char_to_text(body_x)
return body
def replace_wiki_page(self,new_body):
print("pico_wiki_driver.replace_wiki_page...")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['write']=array_of_equations[i...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=r[0:start_p]+new_body+r[end_p+1:]
#print("body=......")
#print(new_body)
payload['msg']=edit_html.text_to_post_form(ne...
#print('msg=......')
#print(payload['msg'])
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
body=""
body=body+'encode_hint='+payload['encode_hint']+'&'
body=body+'template_page=&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'digest='+payload['digest']+'&'
body=body+'msg='+payload['msg']+'&'
body=body+'write='+payload['write']+'&'
body=body+'original='+payload['original']
#print('body='+body)
edit_html.post(self.url,body)
#print(r)
def get_wiki_source(self):
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['cancel']=array_of_equations[...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
wiki_source=""
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
wiki_source=r[start_p:end_p]
body=""
body=body+'encode_hint='+payload['encode_hint']+'&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'cancel='+payload['cancel']
#print('body='+body)
r=edit_html.post(self.url,body)
return wiki_source
class pico_wiki_bot:
result_lines=[]
script_lines=[]
urls=[]
programs={}
environments={}
time_millis=0
timer=None
current_program_name=""
current_program=""
report=[]
#reportLength=120
current_url=""
current_page=""
next_url_index=""
def __init__(self,url,page):
print('pico_wiki_bot')
self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
self.read_initials()
self.current_url=url
self.current_page=page
self.next_url_index=0
self.Pico_Wiki_Driver=pico_wiki_driver(url,page)
self.all_text=""
if self.Pico_Wiki_Driver.get_url()=="":
return ""
self.timer=Timer()
#all_text=self.Pico_Wiki_Driver.get_wiki_page()
#if all_text==None:
# return None
#self.timer.init(mode=Timer.PERIODIC, freq=1, cal...
def read_initials(self):
print("read_initials")
try:
f=open('initials.txt','r')
print("initials.txt is found")
lines=f.readlines()
f.close()
self.initials={}
p=0
for i in range(len(lines)):
print(str(i)+' - '+lines[i])
parser=html_parser(lines[i])
q=parser.p_an_equation(p,self.initials)
if p!=q:
p=q
else:
break
print("-----initials-----")
for key in self.initials:
print(key+'='+self.initials[key])
except:
print("initials.txt is not found")
self.initials={}
self.initials['url']="http://www.yama-lab.org...
self.initials['page']="work202401"
self.environments['read_interval']=20000
self.environments['exec_interval']=0
self.environments['write_interval']=0
self.environments['report_length']=120
self.url=self.initials['url']
self.page=self.initials['page']
def set_current_page(self,page):
self.current_page=page
def set_current_url(self,url):
self.current_url=url
def clear_report(self):
self.report=[]
def add_report_line(self,send):
self.report.append(send)
while len(self.report)>self.environments['report_...
self.report.pop(0)
def add_one_second_to_time_millis(self,timer):
#print("pico_wiki_bot.add_one_second_to_time_mill...
self.time_millis=self.time_millis+1000
#print("pico_wiki_bot.add_one_second_to_time_mill...
def stop_timer(self):
self.timer=None
def get_script_and_result(self):
#print("pico_wiki_bot.get_script_and_result")
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
html=self.Pico_Wiki_Driver.get_html()
if html=="":
return ""
parser=html_parser(html)
p=0
eqs={}
rtn=[(0,0)]
q=parser.p_get_between(p,'pre',eqs,rtn)
be=rtn[0]
command_and_result=parser.html[be[0]:be[1]]
return parser.special_char_to_text(command_and_re...
def get_result(self):
#print("pico_wiki_bot.get_result")
all_text=self.get_script_and_result()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
print('pico_wiki_bot.get_result return')
return all_text[p+1:q]
def get_script(self):
#print("pico_wiki_bot.get_script")
all_text=self.get_script_and_result()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
return all_text[p:q]
def get_current_device_line(self):
command_and_result=self.get_script_and_result()
parser=html_parser(command_and_result)
if command_and_result==None:
return None
p=command_and_result.index("current_device=")
p=p+len("current_device=")
try:
q=command_and_result.index("\n",p)
except:
q=len(command_and_result)
return parser.special_char_to_text(command_and_re...
def replace_wiki(self,new_wiki):
self.Pico_Wiki_Driver.replace_wiki_page(new_wiki)
#print(r)
def replace_result(self,new_result):
#print("pico_wiki_bot.replace_result")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
eqs_msg={}
rtn=[(0,0)]
body=""
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=edit_html.special_char_to_text(r[start_p...
#print("body=......")
#print(body)
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
#print("body="+body)
px=body.index("result:")+len("result:")
q=body.index("current_device=",px)
if q<0:
q=len(body)
head_area=body[0:px]
#tail_area=msg[q:]
#result_area=msg[px:q]
new_result_lines=new_result.split('\n')
new_area=head_area+'\n'
for i in range(len(new_result_lines)):
new_area=new_area+' '+new_result_lines[i]+'\n'
new_area=new_area+" current_device=\""+device_id+...
self.Pico_Wiki_Driver.replace_wiki_page(new_area)
def read_script(self):
#print("read_script")
all_text=self.get_script()
if all_text==None or all_text=="":
self.script_lines=[]
return False
else:
self.script_lines=all_text.split('\n')
#print("-----script_lines-----")
#for i in range(len(self.script_lines)):
# print(str(i)+' - '+self.script_lines[i])
return True
def eval_script(self):
print("eval_script")
self.interpret_lines()
#def print_result(self):
# print("print_result")
def is_object_page(self,line):
#print("is_object_page, line="+line[:10]+"...")
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("object_page ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
self.urls=[]
xurl=[""]
q=parser.p_url(p,xurl)
if p==q:
return False
while p!=q:
self.urls.append(xurl[0])
p=parser.p_b(p)
q=parser.p_key("or ",p)
if p==q:
break
p=q
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
for i in range(len(self.urls)):
print("url["+str(i)+"]="+self.urls[i])
return True
def is_device(self,line):
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("current_device=",p)
if p==q:
return False
p=q
p=parser.p_b(p)
xname=[""]
q=parser.p_String_Constant(p,xname)
if p==q:
return False
p=parser.p_b(q)
self.current_device=xname[0]
q=parser.p_key(",",p)
if p==q:
return False
p=parser.p_b(q)
q=parser.p_key("Date=",p)
xdate=[]
q=parser.p_Date(p,xdate)
def is_set_command(self,parser,p):
#print("is_set_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("set ",p)
if p==q:
#print("is_set_command return False")
return False
p=parser.p_b(q)
eq={}
q=parser.p_an_equation(p,eq)
if p==q:
return False
for key in eq:
self.environments[key]=eq[key]
print("set "+str(key)+"="+str(eq[key]))
return True
def is_py_command(self,parser,p):
#print("is_py_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("py ",p)
if p==q:
return False
p=parser.p_b(q)
#print("p="+str(p)+",line[p:10]="+parser.html[p:p...
xname=[""]
q=parser.p_name(p,xname)
self.current_program_name=xname[0]
self.current_program=""
def is_end_command(self,parser,p):
#print("is_end_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("end ",p)
if p==q:
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
self.programs[self.current_program_name]=self.cur...
def is_run_command(self,parser,p):
#print("is_run_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("run ",p)
#print("line="+parser.html)
if p==q:
#print("is_run_command return False")
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
#print("xname[0]="+xname[0])
program=self.programs[xname[0]]
#print("program="+program)
try:
exec(program)
except:
print("fail to exec "+program)
self.add_report_line("fail to exec:"+xname[0])
def is_command(self,line):
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("command:",p)
#print("line="+parser.html)
#print("command="+parser.html[q:])
if p==q:
return False
q=parser.p_b(q)
#lx=line[q:]
if self.is_set_command(parser,q):
return True
if self.is_py_command(parser,q):
return True
if self.is_end_command(parser,q):
return True
if self.is_run_command(parser,q):
return True
return False
def is_py(self,line):
#print("is_py line="+line[:10])
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("py: ",p)
if p==q:
return False
lx=line[q:]
self.current_program=self.current_program+lx+'\n'
def interpret_a_line(self,line):
if self.is_object_page(line):
return
if self.is_device(line):
return
if self.is_command(line):
return
if self.is_py(line):
return
def interpret_lines(self):
print("interpret_lines")
for i in range(len(self.script_lines)):
self.interpret_a_line(self.script_lines[i])
def next_url(self):
print("next_url")
if len(self.urls)==0:
return
if int(self.next_url_index)>=len(self.urls):
self.next_url_index=0
urlx=self.urls[int(self.next_url_index)]
p=urlx.index("?")
self.current_page=urlx[p+1:]
self.current_url=urlx[0:p-1]
print("current_url="+self.current_url)
print("current_page="+self.current_page)
self.next_url_index=int(self.next_url_index)+1
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
def print_result(self):
print("print_result")
report_length=len(self.report)
sending=""
for i in range(report_length):
sending=sending+self.report[i]+'\n'
self.replace_result(sending)
def read_eval_print_loop(self):
print("read_eval_print_loop")
last_read_time=time.ticks_ms()
last_eval_time=last_read_time
last_write_time=last_read_time
while True:
current_time=time.ticks_ms()
if current_time<last_read_time:
last_read_time=current_time
if current_time-last_read_time>self.environme...
last_read_time=current_time
if self.read_script():
r=self.get_result()
if r!=None and r!="":
lines=r.split('\n')
for i in range(len(lines)):
self.add_report_line(lines[i])
self.add_report_line(r)
if self.environments['exec_interval']...
self.eval_script()
else:
self.next_url()
continue
if self.environments['exec_interval']>0:
if current_time-last_eval_time>self.envir...
last_eval_time=current_time
self.eval_script()
if self.environments['write_interval']>0:
if current_time-last_write_time>self.envi...
last_write_time=current_time
self.print_result()
time.sleep_ms(100)
def main():
bot=pico_wiki_bot("http://www.a_pukiwiki_site.example...
#print("-----------get_script_and_result-------------...
#print(bot.get_script_and_result())
#print("-----------get_result-----------------")
#print(bot.get_result())
#print("-----------get_script-----------------")
#print(bot.get_script())
#print("-----------get_current_device_line-----------...
#print(bot.get_current_device_line())
#print("-----------replace_result-----------------")
#bot.replace_result("Hello!\nThis is a test2....\nxxx...
#print("-----------stop_timer--------------------")
#bot.stop_timer()
bot.read_eval_print_loop()
main()
}}
----
#counter
終了行:
[[Pico_Wiki_Driver]]
#code(python){{
import machine
import time
import network
import urequests
import random
import socket
try:
import ustruct as struct
except:
import struct
from time import sleep
from machine import Timer
device_id="yama_pico_0000_0000_0000_0001"
class pico_net:
ssid = 'SSID-xxxx' # replace by the appropriate ssid
password = '12345678' # replace by the appropriate pa...
def __init__(self):
print('current ssid= '+self.ssid)
def set_access_point(self, ssid, password):
self.ssid = ssid
self.password = password
def connect(self):
self.wlan = network.WLAN(network.STA_IF)
self.wlan.active(True)
chipid = int.from_bytes(self.wlan.config('mac'), ...
self.wlan.connect(self.ssid, self.password)
max_wait = 10
while max_wait > 0:
if self.wlan.status() < 0 or self.wlan.status...
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)
if self.wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('connected')
self.status = self.wlan.ifconfig()
print('ip = ' + self.status[0])
class pico_http:
def __init__(self):
print('pico_http')
def set_url(self, url):
self.url = url
def get(self, url):
#print('pico_http.get(urL='+url+')')
self.url=url
#print('urL='+url)
try:
r = urequests.get(url)
#print(r)
r_text=r.text
r.close()
#print('pico_http.get...return')
return r_text
except:
print('pico_http.get...exception')
return ""
def put(self, url, payload):
#print('pico_http.put(urL='+url+'payload='+str(pa...
#payload is a dictionary
uri=url+'?'
for key in payload:
#print(key+'=',end="")
#print(payload[key])
uri=uri+key+'='+payload[key]+'&'
uri=uri[:-1]
#print('uri='+uri)
#r = urequests.post(uri)
#r = urequests.post(url,data=payload)
r = urequests.get(uri)
if r=="":
print('pico_http.put...exception')
return ""
#print(r)
r_text=r.text
r.close()
#print(r_json)
return r_text
#post, ref: https://docs.micropython.org/en/latest/li...
def post(self, url, body, headers):
#print('pico_http.post(url='+url+' body='+str(bod...
try:
r = urequests.post(url,data=body,headers=head...
#print(r)
r_text=r.text
#print(r_text)
#r_json=r.json()
r.close()
#print(r_json)
#print('pico_http.post...return')
return r_text
except:
print('pico_http.post...exception')
return ""
class pico_time:
# reference: https://wisteriahill.sakura.ne.jp/CMS/Wo...
# NTPサーバー
#ntp_host = "time-c.nist.gov"
#ntp_host = "time.cloudflare.com"
#ntp_host = "time.google.com"
ntp_host = "ntp.nict.jp"
def __init__(self):
print('pico_time')
def ptime(self):
NTP_DELTA = 2208988800
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
addr = socket.getaddrinfo(self.ntp_host, 123)[0][...
s = socket.socket(socket.AF_INET, socket.SOCK_DGR...
# s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
wday = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
def zfill(self, s, width):
if len(s) < width:
return ("0" * (width - len(s))) + s
else:
return s
def get_now(self):
t=self.ptime()
t = t + 9*60*60
#print(t)
datetimetuple = time.gmtime(t)
year = str(datetimetuple[0])
month = self.zfill(str(datetimetuple[1]),2)
mday = self.zfill(str(datetimetuple[2]),2)
hour = self.zfill(str(datetimetuple[3]),2)
minute = self.zfill(str(datetimetuple[4]),2)
seconds = self.zfill(str(datetimetuple[5]),2)
wd_index = datetimetuple[6]
#日付と時刻を分けるデリミター("|")を付加
#datetime = year + "/" + month + "/" + mday + " "...
datetime = year + "/" + month + "/" + mday + " " ...
return datetime
class html_parser:
html=""
tokens=[]
reserved_words={'<':'<','>':'>','&':'&','"'...
' ':' ','\n':'<br>','\t':' &...
'\\':'\','{':'{','}':'}'...
}
a2c={' ':"+",'\n':"%0D%0A",'\r':"%0D",'\t':"%09",'!':...
'#':"%23",'$':"%24",'%':"%25",'&':"%26", '\'':"%2...
'*':"%2A",'+':"%2B",',':"%2C",'-':"%2D",'/':"%2F"...
';':"%3B",'<':"%3C",'=':"%3D",'>':"%3E",'?':"%3F",
'@':"%40",'[':"%5B",'\\':"%5C",']':"%5D",'^':"%5E",
'`':"%60",'{':"%7B",'|':"%7C",'}':"%7D",'~':"%7E"}
#Spechal char to ascii code
sc2a={'<':"<",'>':">",'&':"&",'"':'"',...
def __init__(self,html):
self.html=html
def html_tokenizer(self):
#print("html_tokenizer")
self.token_ith=0
w=""
tx=[""]
rest=[""]
inx=self.html
xlen=len(self.html)
while xlen>0:
if self.parse_tag(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('<'+tx[0]+'>')
#print('<'+tx[0]+'>')
inx=rest[0]
xlen=len(inx)
elif self.parse_String_Constant(inx,tx,rest):
if w!="":
self.tokens.append(w)
w=""
self.tokens.append('"'+tx[0]+'"')
#print('"'+tx[0]+'"')
inx=rest[0]
xlen=len(inx)
else:
wc=inx[0]
inx=inx[1:xlen]
xlen=len(inx)
w=w+wc
if w!="":
self.tokens.append(w)
w=""
def get_html(self):
return self.html
def parse_key(self,key,inx,rest):
#print("parse_key-"+key)
keylen=len(key)
inlen=len(inx)
if inx.startswith(key):
rest[0]=inx[keylen:inlen]
return True
rest[0]=inx
return False
def p_url(self,p,url):
#print("p_url p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xurl=""
pw=p
xlen=len(self.html)
q=self.p_key('http://',p)
if p!=q:
xurl='http://'
p=q
while not (self.html[p] in [' ','\n','\t','\r...
xurl=xurl+self.html[p]
p=p+1
if p>=xlen:
return pw
url[0]=xurl
return p
q=self.p_key('https://',p)
if p!=q:
xurl='https://'
p=q
while not (self.html[p] in [' ','\n','\t','\r...
xurl=xurl+self.html[p]
p=p+1
if p>=xlen:
return pw
url[0]=xurl
return p
return pw
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if self.html[p:p+keylen]==key:
return p+keylen
return p
def parse_String_Constant(self,inx,strc,rest):
rx=[""]
xstr=""
if self.parse_key('"',inx,rx):
#print("parse_String_Constant")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='"':
if xlen==0:
return False
if fx=='\\':
inx=inx[1:xlen]
else:
xstr=xstr+fx
xlen=len(inx)
inx=inx[1:xlen]
fx=inx[0]
if self.parse_key('"',inx,rx):
strc[0]=xstr
rest[0]=rx[0]
return True
return False
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',pw)
if pw!=q:
#print("parse_String_Constant")
pw=q
fx=self.html[pw]
xlen=len(self.html)-pw
while fx!='"':
if xlen==0:
return p
if fx=='\\':
pw=pw+1
else:
xstr=xstr+fx
pw=pw+1
fx=self.html[pw]
xlen=len(self.html)-pw
q=self.p_key('"',pw)
if pw!=q:
strc[0]=xstr
return q
return p
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'/'+str(month[0])+'/'+st...
return p
def parse_tag(self,inx,tag,rest):
rx=[""]
xstr=""
wstr=[""]
if self.parse_key('<',inx,rx):
#print("parse_tag")
inx=rx[0]
fx=inx[0]
xlen=len(inx)
while fx!='>':
if xlen<=0:
return False
if fx=='\\':
inx=inx[1:xlen]
if self.parse_String_Constant(inx,wstr,rx):
inx=rx[0]
xstr=xstr+'"'+wstr[0]+'"'
else:
xstr=xstr+fx
inx=inx[1:xlen]
fx=inx[0]
xlen=len(inx)
if self.parse_key('>',inx,rx):
tag[0]=xstr
rest[0]=rx[0]
return True
return False
def get_tokens(self):
return self.tokens
def next_token(self):
if self.token_ith<len(self.tokens):
self.token_ith=self.token_ith+1
return self.tokens[self.token_ith-1]
return ""
def has_more_tokens(self):
if self.token_ith<len(self.tokens):
return True
return False
def index_from(self,i,str):
p=self.html[i:].index(str)
return p
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
while self.html[p]==' ':
p=p+1
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.html)
while self.html[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.html[p:q]
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.html)
while self.html[p] in '0123456789':
n=n+self.html[p]
p=p+1
if p>=last_p:
break
num[0]=int(n)
return p
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
print("num="+str(num[0]))
return q
else:
#print("strc="+str(strc[0]))
eqs[nx[0]]=strc[0]
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(pw,eqs)
if pw==q:
return pw
pw=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.html.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
def p_get_between_tag_with_cond(self,p,tag_name,cond,...
#print("p_get_between_with_cond <"+tag_name+">......
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
while True:
try:
q=self.html.index('<'+tag_name,p)
#print("p_get_between_with_cond p="+str(p...
if p==q:
return pw
q=q+len('<'+tag_name)
p=q
eqs={}
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+s...
if p!=q:
if cond[0] in eqs:
#print("cond[0]="+cond[0]+" eqs[c...
if eqs[cond[0]]!=cond[1]:
continue
else:
break
except:
#print("fail to find <"+tag_name)
return pw
p=self.p_b(p)
#print("p_get_between q="+str(q)+' '+self.html[q:...
#print("pw="+str(pw))
q=self.p_key('>',p)
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if pw!=q:
start_p=q
else:
return p
try:
q=self.html.index('</'+tag_name,start_p)
except:
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("pw="+str(pw))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p]
#print("rx="+rx)
return q
def p_get_between(self,p,tag_name,eqs,rtn):
#print("p_get_between <"+tag_name+">...</"+tag_na...
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
q=pw
try:
q=self.html.index('<'+tag_name,p)
except:
return pw
#print("p_get_between 2 q="+str(q)+' '+self.html[...
q=q+len('<'+tag_name)
#print("p_get_between 3 q="+str(q)+' '+self.html[...
if p==q:
return pw
p=q
q=self.p_equations(p,eqs)
#print("after equations q="+str(q) +' '+self.html...
p=q
p=self.p_b(p)
#print("p="+str(p))
q=self.p_key('>',p)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if p!=q:
start_p=q
else:
q=self.p_key('/>',p)
if p!=q:
start_p=q
else:
#print("get between "+tag_name+",fail to ...
return pw
try:
q=self.html.index('</'+tag_name,start_p)
except:
#print("get between "+tag_name+",fail to find...
return pw
end_p=q-1
p=q
q=q+len('</'+tag_name)
p=self.p_b(q)
#print("p="+str(p))
q=self.p_key('>',p)
if q==p:
return pw
rtn[0]=(start_p,end_p)
#rx=self.html[start_p:end_p+1]
#print("get between "+tag_name+" rx="+rx)
return q
def code_convert(self,code):
if code in self.a2c:
return self.a2c[code]
else:
return code
def text_to_post_form(self,text):
#print("html_parser.text_to_post_form")
#print(text)
post_form=""
for i in range(len(text)):
post_form=post_form+self.code_convert(text[i])
#print("html_parser.text_to_post_form return")
return post_form
def special_char_to_text(self,text):
#print("special_char_to_text")
#print(text)
text=text.replace("<","<")
text=text.replace(">",">")
text=text.replace("&","&")
text=text.replace(""",'"')
text=text.replace("'","'")
return text
def post(self,url,body):
#payload is a dictionary
headers={'Content-Type': 'application/x-www-form-...
#def http_post(url, value):
# https://docs.openmv.io/library/urequests.html
#body = "value=%s" % (value)
#headers = {'Content-Type': 'application/x-www-fo...
response = urequests.post(url, data=body, headers...
r_text=response.text
#print(r_text)
response.close()
#print(r_json)
return r_text
class pico_wiki_driver:
url=""
def __init__(self,url,page):
print('pico_wiki_driver')
self.url=url
self.page=page
self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
def set_url(self,url):
self.url=url
def set_page(self,page):
self.page=page
def get_url(self):
return self.url
def get_html(self):
uri = self.url+"index.php?"+self.page
all_page=self.Pico_Http.get(uri)
#print("get_html..."+uri)
#print(all_page)
return all_page
def get_wiki_page(self):
uri = self.url+"index.php?"+self.page
all_page=self.Pico_Http.get(uri)
print("get_wiki_page...")
#print(all_page)
parser=html_parser(all_page)
p=0
rtn=[(0,0)]
q=parser.p_get_between_tag_with_cond(p,"div",('id...
if p!=q:
#print("get_wiki_page id=body..."+all_page[rt...
body_x=all_page[rtn[0][0]:rtn[0][1]]
body=parser.special_char_to_text(body_x)
return body
def replace_wiki_page(self,new_body):
print("pico_wiki_driver.replace_wiki_page...")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['write']=array_of_equations[i...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=r[0:start_p]+new_body+r[end_p+1:]
#print("body=......")
#print(new_body)
payload['msg']=edit_html.text_to_post_form(ne...
#print('msg=......')
#print(payload['msg'])
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
body=""
body=body+'encode_hint='+payload['encode_hint']+'&'
body=body+'template_page=&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'digest='+payload['digest']+'&'
body=body+'msg='+payload['msg']+'&'
body=body+'write='+payload['write']+'&'
body=body+'original='+payload['original']
#print('body='+body)
edit_html.post(self.url,body)
#print(r)
def get_wiki_source(self):
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
array_of_equations=[]
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'input',...
while p!=q:
#print('p='+str(p)+' q='+str(q)+' <input....>')
array_of_equations.append(equations)
p=q
equations={}
q=edit_html.p_get_equations_of_the_tag(p,'inp...
for i in range(len(array_of_equations)):
#print('i='+str(i))
eqs=array_of_equations[i]
for key in eqs:
#print(key+'='+array_of_equations[i][key])
if key=='name' and array_of_equations[i][...
payload['encode_hint']=array_of_equat...
elif key=='name' and array_of_equations[i...
payload['template']=array_of_equation...
elif key=='name' and array_of_equations[i...
payload['cmd']=array_of_equations[i][...
elif key=='name' and array_of_equations[...
payload['page']=array_of_equations[i]...
elif key=='name' and array_of_equations[i...
payload['digest']=array_of_equations[...
elif key=='name' and array_of_equations[i...
payload['cancel']=array_of_equations[...
#tokens=edit_html.get_tokens()
#for i in range(len(tokens)):_
# print(str(i)+'-'+tokens[i])
eqs_msg={}
rtn=[(0,0)]
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
wiki_source=""
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
wiki_source=r[start_p:end_p]
body=""
body=body+'encode_hint='+payload['encode_hint']+'&'
body=body+'cmd='+payload['cmd']+'&'
body=body+'page='+payload['page']+'&'
body=body+'cancel='+payload['cancel']
#print('body='+body)
r=edit_html.post(self.url,body)
return wiki_source
class pico_wiki_bot:
result_lines=[]
script_lines=[]
urls=[]
programs={}
environments={}
time_millis=0
timer=None
current_program_name=""
current_program=""
report=[]
#reportLength=120
current_url=""
current_page=""
next_url_index=""
def __init__(self,url,page):
print('pico_wiki_bot')
self.Pico_Net=pico_net()
self.Pico_Time=pico_time()
self.Pico_Http=pico_http()
self.Pico_Net.connect()
t=self.Pico_Time.get_now()
print('time now='+t)
self.read_initials()
self.current_url=url
self.current_page=page
self.next_url_index=0
self.Pico_Wiki_Driver=pico_wiki_driver(url,page)
self.all_text=""
if self.Pico_Wiki_Driver.get_url()=="":
return ""
self.timer=Timer()
#all_text=self.Pico_Wiki_Driver.get_wiki_page()
#if all_text==None:
# return None
#self.timer.init(mode=Timer.PERIODIC, freq=1, cal...
def read_initials(self):
print("read_initials")
try:
f=open('initials.txt','r')
print("initials.txt is found")
lines=f.readlines()
f.close()
self.initials={}
p=0
for i in range(len(lines)):
print(str(i)+' - '+lines[i])
parser=html_parser(lines[i])
q=parser.p_an_equation(p,self.initials)
if p!=q:
p=q
else:
break
print("-----initials-----")
for key in self.initials:
print(key+'='+self.initials[key])
except:
print("initials.txt is not found")
self.initials={}
self.initials['url']="http://www.yama-lab.org...
self.initials['page']="work202401"
self.environments['read_interval']=20000
self.environments['exec_interval']=0
self.environments['write_interval']=0
self.environments['report_length']=120
self.url=self.initials['url']
self.page=self.initials['page']
def set_current_page(self,page):
self.current_page=page
def set_current_url(self,url):
self.current_url=url
def clear_report(self):
self.report=[]
def add_report_line(self,send):
self.report.append(send)
while len(self.report)>self.environments['report_...
self.report.pop(0)
def add_one_second_to_time_millis(self,timer):
#print("pico_wiki_bot.add_one_second_to_time_mill...
self.time_millis=self.time_millis+1000
#print("pico_wiki_bot.add_one_second_to_time_mill...
def stop_timer(self):
self.timer=None
def get_script_and_result(self):
#print("pico_wiki_bot.get_script_and_result")
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
html=self.Pico_Wiki_Driver.get_html()
if html=="":
return ""
parser=html_parser(html)
p=0
eqs={}
rtn=[(0,0)]
q=parser.p_get_between(p,'pre',eqs,rtn)
be=rtn[0]
command_and_result=parser.html[be[0]:be[1]]
return parser.special_char_to_text(command_and_re...
def get_result(self):
#print("pico_wiki_bot.get_result")
all_text=self.get_script_and_result()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=all_text.index("result:")+len("result:")
q=all_text.index("current_device=",p)
if q<0:
q=len(all_text)
else:
q=q-1
print('pico_wiki_bot.get_result return')
return all_text[p+1:q]
def get_script(self):
#print("pico_wiki_bot.get_script")
all_text=self.get_script_and_result()
if all_text==None:
return None
if all_text=="":
return ""
#parser=html_parser(all_text)
p=0
try:
q=all_text.index("result:",p)
q=q-1
except:
q=len(all_text)
return all_text[p:q]
def get_current_device_line(self):
command_and_result=self.get_script_and_result()
parser=html_parser(command_and_result)
if command_and_result==None:
return None
p=command_and_result.index("current_device=")
p=p+len("current_device=")
try:
q=command_and_result.index("\n",p)
except:
q=len(command_and_result)
return parser.special_char_to_text(command_and_re...
def replace_wiki(self,new_wiki):
self.Pico_Wiki_Driver.replace_wiki_page(new_wiki)
#print(r)
def replace_result(self,new_result):
#print("pico_wiki_bot.replace_result")
payload={'cmd':'edit', 'page':self.page}
#r=Pico_Http.get(url+'index.php?work202401')
#print('uri='+r)
r = self.Pico_Http.put(self.url,payload)
#print(r)
edit_html=html_parser(r)
#edit_html.html_tokenizer()
p=0
eqs_msg={}
rtn=[(0,0)]
body=""
p=edit_html.p_get_between(0,'textarea',eqs_msg,rtn)
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
body=edit_html.special_char_to_text(r[start_p...
#print("body=......")
#print(body)
eqs_original={}
rtn=[(0,0)]
p=edit_html.p_get_between(p,'textarea',eqs_origin...
if rtn[0]!=(0,0):
start_p=rtn[0][0]
end_p=rtn[0][1]
original_pre=r[start_p:end_p+1]
original=edit_html.special_char_to_text(origi...
payload['original']=edit_html.text_to_post_fo...
#print("body="+body)
px=body.index("result:")+len("result:")
q=body.index("current_device=",px)
if q<0:
q=len(body)
head_area=body[0:px]
#tail_area=msg[q:]
#result_area=msg[px:q]
new_result_lines=new_result.split('\n')
new_area=head_area+'\n'
for i in range(len(new_result_lines)):
new_area=new_area+' '+new_result_lines[i]+'\n'
new_area=new_area+" current_device=\""+device_id+...
self.Pico_Wiki_Driver.replace_wiki_page(new_area)
def read_script(self):
#print("read_script")
all_text=self.get_script()
if all_text==None or all_text=="":
self.script_lines=[]
return False
else:
self.script_lines=all_text.split('\n')
#print("-----script_lines-----")
#for i in range(len(self.script_lines)):
# print(str(i)+' - '+self.script_lines[i])
return True
def eval_script(self):
print("eval_script")
self.interpret_lines()
#def print_result(self):
# print("print_result")
def is_object_page(self,line):
#print("is_object_page, line="+line[:10]+"...")
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("object_page ",p)
if p==q:
#print("is_object_page return False")
return False
p=q
p=parser.p_b(p)
self.urls=[]
xurl=[""]
q=parser.p_url(p,xurl)
if p==q:
return False
while p!=q:
self.urls.append(xurl[0])
p=parser.p_b(p)
q=parser.p_key("or ",p)
if p==q:
break
p=q
p=parser.p_b(p)
xurl=[""]
q=parser.p_url(p,xurl)
for i in range(len(self.urls)):
print("url["+str(i)+"]="+self.urls[i])
return True
def is_device(self,line):
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("current_device=",p)
if p==q:
return False
p=q
p=parser.p_b(p)
xname=[""]
q=parser.p_String_Constant(p,xname)
if p==q:
return False
p=parser.p_b(q)
self.current_device=xname[0]
q=parser.p_key(",",p)
if p==q:
return False
p=parser.p_b(q)
q=parser.p_key("Date=",p)
xdate=[]
q=parser.p_Date(p,xdate)
def is_set_command(self,parser,p):
#print("is_set_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("set ",p)
if p==q:
#print("is_set_command return False")
return False
p=parser.p_b(q)
eq={}
q=parser.p_an_equation(p,eq)
if p==q:
return False
for key in eq:
self.environments[key]=eq[key]
print("set "+str(key)+"="+str(eq[key]))
return True
def is_py_command(self,parser,p):
#print("is_py_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("py ",p)
if p==q:
return False
p=parser.p_b(q)
#print("p="+str(p)+",line[p:10]="+parser.html[p:p...
xname=[""]
q=parser.p_name(p,xname)
self.current_program_name=xname[0]
self.current_program=""
def is_end_command(self,parser,p):
#print("is_end_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("end ",p)
if p==q:
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
self.programs[self.current_program_name]=self.cur...
def is_run_command(self,parser,p):
#print("is_run_command.."+parser.html[p:p+10])
p=parser.p_b(p)
q=parser.p_key("run ",p)
#print("line="+parser.html)
if p==q:
#print("is_run_command return False")
return False
p=parser.p_b(q)
xname=[""]
q=parser.p_name(p,xname)
#print("xname[0]="+xname[0])
program=self.programs[xname[0]]
#print("program="+program)
try:
exec(program)
except:
print("fail to exec "+program)
self.add_report_line("fail to exec:"+xname[0])
def is_command(self,line):
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("command:",p)
#print("line="+parser.html)
#print("command="+parser.html[q:])
if p==q:
return False
q=parser.p_b(q)
#lx=line[q:]
if self.is_set_command(parser,q):
return True
if self.is_py_command(parser,q):
return True
if self.is_end_command(parser,q):
return True
if self.is_run_command(parser,q):
return True
return False
def is_py(self,line):
#print("is_py line="+line[:10])
p=0
parser=html_parser(line)
p=parser.p_b(p)
q=parser.p_key("py: ",p)
if p==q:
return False
lx=line[q:]
self.current_program=self.current_program+lx+'\n'
def interpret_a_line(self,line):
if self.is_object_page(line):
return
if self.is_device(line):
return
if self.is_command(line):
return
if self.is_py(line):
return
def interpret_lines(self):
print("interpret_lines")
for i in range(len(self.script_lines)):
self.interpret_a_line(self.script_lines[i])
def next_url(self):
print("next_url")
if len(self.urls)==0:
return
if int(self.next_url_index)>=len(self.urls):
self.next_url_index=0
urlx=self.urls[int(self.next_url_index)]
p=urlx.index("?")
self.current_page=urlx[p+1:]
self.current_url=urlx[0:p-1]
print("current_url="+self.current_url)
print("current_page="+self.current_page)
self.next_url_index=int(self.next_url_index)+1
self.Pico_Wiki_Driver.set_url(self.current_url)
self.Pico_Wiki_Driver.set_page(self.current_page)
def print_result(self):
print("print_result")
report_length=len(self.report)
sending=""
for i in range(report_length):
sending=sending+self.report[i]+'\n'
self.replace_result(sending)
def read_eval_print_loop(self):
print("read_eval_print_loop")
last_read_time=time.ticks_ms()
last_eval_time=last_read_time
last_write_time=last_read_time
while True:
current_time=time.ticks_ms()
if current_time<last_read_time:
last_read_time=current_time
if current_time-last_read_time>self.environme...
last_read_time=current_time
if self.read_script():
r=self.get_result()
if r!=None and r!="":
lines=r.split('\n')
for i in range(len(lines)):
self.add_report_line(lines[i])
self.add_report_line(r)
if self.environments['exec_interval']...
self.eval_script()
else:
self.next_url()
continue
if self.environments['exec_interval']>0:
if current_time-last_eval_time>self.envir...
last_eval_time=current_time
self.eval_script()
if self.environments['write_interval']>0:
if current_time-last_write_time>self.envi...
last_write_time=current_time
self.print_result()
time.sleep_ms(100)
def main():
bot=pico_wiki_bot("http://www.a_pukiwiki_site.example...
#print("-----------get_script_and_result-------------...
#print(bot.get_script_and_result())
#print("-----------get_result-----------------")
#print(bot.get_result())
#print("-----------get_script-----------------")
#print(bot.get_script())
#print("-----------get_current_device_line-----------...
#print(bot.get_current_device_line())
#print("-----------replace_result-----------------")
#bot.replace_result("Hello!\nThis is a test2....\nxxx...
#print("-----------stop_timer--------------------")
#bot.stop_timer()
bot.read_eval_print_loop()
main()
}}
----
#counter
ページ名: