sensing_07.py
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
|
ログイン
]
開始行:
[[FrontPage]]
#code(Python){{
# -*- coding: utf-8 -*-
from machine import UART,Pin,ADC, Timer
import micropython,time,gc
# for analog co2 sensor, gravity:analog infrared co2 sensor
# 20250316
# Updated 2018 and 2020
# This module is based on the below cited resources, whic...
# based on the documentation as provided in the Bosch Dat...
# the sample implementation provided therein.
#
# Final Document: BST-BME280-DS002-15
#
# Authors: Paul Cunnane 2016, Peter Dahlebrg 2016
#
# This module borrows from the Adafruit BME280 Python lib...
# Copyright notices are reproduced below.
#
# Those libraries were written for the Raspberry Pi. This...
# intended for the MicroPython and esp8266 boards.
#
# Copyright (c) 2014 Adafruit Industries
# Author: Tony DiCola
#
# Based on the BMP280 driver with BME280 changes provided...
# David J Taylor, Edinburgh (www.satsignal.eu)
#
# Based on Adafruit_I2C.py created by Kevin Townsend.
#
# Permission is hereby granted, free of charge, to any pe...
# of this software and associated documentation files (th...
# in the Software without restriction, including without ...
# to use, copy, modify, merge, publish, distribute, subli...
# copies of the Software, and to permit persons to whom t...
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice s...
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF A...
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF...
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. I...
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, D...
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OT...
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR...
# THE SOFTWARE.
#
from ustruct import unpack, unpack_from
from array import array
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_STATUS = 0xF3
BME280_REGISTER_CONTROL = 0xF4
MODE_SLEEP = const(0)
MODE_FORCED = const(1)
MODE_NORMAL = const(3)
BME280_TIMEOUT = const(100) # about 1 second timeout
class BME280:
def __init__(self,
mode=BME280_OSAMPLE_8,
address=BME280_I2CADDR,
i2c=None,
**kwargs):
# Check that mode is valid.
if type(mode) is tuple and len(mode) == 3:
self._mode_hum, self._mode_temp, self._mode_p...
elif type(mode) == int:
self._mode_hum, self._mode_temp, self._mode_p...
else:
raise ValueError("Wrong type for the mode par...
for mode in (self._mode_hum, self._mode_temp, sel...
if mode not in [BME280_OSAMPLE_1, BME280_OSAM...
BME280_OSAMPLE_8, BME280_OSAM...
raise ValueError(
'Unexpected mode value {0}. Set mode ...
'BME280_ULTRALOWPOWER, BME280_STANDAR...
'BME280_ULTRAHIGHRES'.format(mode))
self.address = address
if i2c is None:
raise ValueError('An I2C object is required.')
self.i2c = i2c
self.__sealevel = 101325
# load calibration data
dig_88_a1 = self.i2c.readfrom_mem(self.address, 0...
dig_e1_e7 = self.i2c.readfrom_mem(self.address, 0...
self.dig_T1, self.dig_T2, self.dig_T3, self.dig_P...
self.dig_P2, self.dig_P3, self.dig_P4, self.d...
self.dig_P6, self.dig_P7, self.dig_P8, self.d...
_, self.dig_H1 = unpack("<HhhHhhhhhhhhBB", di...
self.dig_H2, self.dig_H3, self.dig_H4,\
self.dig_H5, self.dig_H6 = unpack("<hBbhb", d...
# unfold H4, H5, keeping care of a potential sign
self.dig_H4 = (self.dig_H4 * 16) + (self.dig_H5 &...
self.dig_H5 //= 16
self.t_fine = 0
# temporary data holders which stay allocated
self._l1_barray = bytearray(1)
self._l8_barray = bytearray(8)
self._l3_resultarray = array("i", [0, 0, 0])
self._l1_barray[0] = self._mode_temp << 5 | self....
self.i2c.writeto_mem(self.address, BME280_REGISTE...
bytearray([0x3c | MODE_SLEEP...
def read_raw_data(self, result):
""" Reads the raw (uncompensated) data from the s...
Args:
result: array of length 3 or alike where ...
stored, in temperature, pressure, humidit...
Returns:
None
"""
self._l1_barray[0] = self._mode_hum
self.i2c.writeto_mem(self.address, BME280_REGISTE...
self._l1_barray)
self._l1_barray[0] = self._mode_temp << 5 | self....
self.i2c.writeto_mem(self.address, BME280_REGISTE...
self._l1_barray)
# Wait for conversion to complete
for _ in range(BME280_TIMEOUT):
if self.i2c.readfrom_mem(self.address, BME280...
time.sleep_ms(10) # still busy
else:
break # Sensor ready
else:
raise RuntimeError("Sensor BME280 not ready")
# burst readout from 0xF7 to 0xFE, recommended by...
self.i2c.readfrom_mem_into(self.address, 0xF7, se...
readout = self._l8_barray
# pressure(0xF7): ((msb << 16) | (lsb << 8) | xls...
raw_press = ((readout[0] << 16) | (readout[1] << ...
# temperature(0xFA): ((msb << 16) | (lsb << 8) | ...
raw_temp = ((readout[3] << 16) | (readout[4] << 8...
# humidity(0xFD): (msb << 8) | lsb
raw_hum = (readout[6] << 8) | readout[7]
result[0] = raw_temp
result[1] = raw_press
result[2] = raw_hum
def read_compensated_data(self, result=None):
""" Reads the data from the sensor and returns th...
Args:
result: array of length 3 or alike where ...
stored, in temperature, pressure, humidit...
this to read out the sensor without alloc...
Returns:
array with temperature, pressure, humidit...
from the result parameter if not None
"""
self.read_raw_data(self._l3_resultarray)
raw_temp, raw_press, raw_hum = self._l3_resultarray
# temperature
var1 = (((raw_temp // 8) - (self.dig_T1 * 2)) * s...
var2 = (raw_temp // 16) - self.dig_T1
var2 = (((var2 * var2) // 4096) * self.dig_T3) //...
self.t_fine = var1 + var2
temp = (self.t_fine * 5 + 128) // 256
# pressure
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) << 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
pressure = 0
else:
p = ((((1048576 - raw_press) << 31) - var2) *...
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) ...
var2 = (self.dig_P8 * p) >> 19
pressure = ((p + var1 + var2) >> 8) + (self.d...
# humidity
h = self.t_fine - 76800
h = (((((raw_hum << 14) - (self.dig_H4 << 20) -
(self.dig_H5 * h)) + 16384) >> 15) *
(((((((h * self.dig_H6) >> 10) *
(((h * self.dig_H3) >> 11) + 32768)) >> 1...
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.d...
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
humidity = h >> 12
if humidity < 0:
humidity = 0
if humidity > 100 * 1024:
humidity = 100 * 1024
if result:
result[0] = temp
result[1] = pressure
result[2] = humidity
return result
return array("i", (temp, pressure, humidity))
@property
def sealevel(self):
return self.__sealevel
@sealevel.setter
def sealevel(self, value):
if 30000 < value < 120000: # just ensure some re...
self.__sealevel = value
@property
def altitude(self):
'''
Altitude in m.
'''
from math import pow
try:
p = 44330 * (1.0 - pow((self.read_compensated...
self.__sealevel, 0.190...
except:
p = 0.0
return p
@property
def dew_point(self):
"""
Compute the dew point temperature for the current...
and Humidity measured pair
"""
from math import log
t, p, h = self.read_compensated_data()
t /= 100
h /= 1024
h = (log(h, 10) - 2) / 0.4343 + (17.62 * t) / (24...
return (243.12 * h / (17.62 - h)) * 100
@property
def values(self):
""" human readable values """
t, p, h = self.read_compensated_data()
p = p / 256
h = h / 1024
return ("{}C".format(t / 100), "{:.02f}hPa".forma...
"{:.02f}%".format(h))
class Sensing:
def get_co2(self):
return self.co2_value
def get_motion(self):
return self.motion_value
def get_lx(self):
return self.lx_value
def get_pressure(self):
return self.pressure_value
def get_temperature(self):
return self.temperature_value
def get_humidity(self):
return self.humidity_value
def co2_read(self):
#print("start read co2")
#global o_b, r_b, co2_value
self.o_b = bytearray([0xFF, 0x01, 0x86, 0x00, 0x0...
self.r_b=bytearray([0x00,0x00,0x00,0x00,0x00,0x00...
try:
self.u.write(self.o_b)
time.sleep(10)
result = self.u.readinto(self.r_b,9)
#print(result)
v=int(0x00000 | (self.r_b[2]<<8 | self.r_b[3]))
#print("co2:"+str(v))
self.co2_value = v
except Exception as e:
#print(e)
return
#adcVal = self.co2.read_u16()
#vx = float(adcVal)*(3.3/65536.0)
#if vx == 0:
# #print("A problem has occurred with the sens...
# self.co2_value=0
#elif vx < 0.4:
# #print("Pre-heating the sensor...")
# self.co2_value=0
#else:
# vd=vx-0.4
# concentration=(vd*5000.0)/1.6
# self.co2_value = int(concentration)
def get_smell(self):
return self.smell_value
def get_smell2(self):
return self.smell2_value
def get_particle(self):
self.particle_read()
return self.particle_concentration
motion_counter = 0
motion_accum = 0
def motion_read(self):
#print("start read motion")
#global motion_counter, motion_accum, motion_value
if self.motion_counter < 100:
self.motion_counter = self.motion_counter + 1
self.motion_accum = self.motion_accum + self....
else:
self.motion_value = self.motion_accum
self.motion_counter = 0
self.motion_accum = 0
#print("motion:"+str(self.motion_value))
def lx_read(self):
#global lx_value
self.lx_value = self.lx.read_u16()
#print("lx:"+str(self.lx_value))
def smell_read(self):
#global lx_value
self.smell_value = self.smell.read_u16()
#print("lx:"+str(self.lx_value))
def pressure_read(self):
#global pressure_value
self.pressure_value = self.bme.values[1]
#print("pressure:"+self.pressure_value)
def temperature_read(self):
#global temperature_value
self.temperature_value = self.bme.values[0]
#print("temperature:"+self.temperature_value)
def humidity_read(self):
#global humidity_value
self.humidity_value = self.bme.values[2]
#print("humidity:"+self.humidity_value)
def smell_capture(self):
self.smell_a0.value(1)
time.sleep(0.001)
self.smell_read()
time.sleep(0.001)
self.smell_a0.value(0)
#print("smell="+str(self.smell_value))
def smell2_capture(self): #called by each 1sec.
if self.smell2_step==0:
val=0
self.smell2_out.value(1)
if self.smell2_step==3:
#time.sleep(3)
self.smell2_value=self.smell2_in.read_u16()
if self.smell2_step==5:
#time.sleep(2)
self.smell2_out.value(0)
self.smell2_heater.value(0)
if self.smell2_step==12:
#time.sleep(8)
self.smell2_heater.value(1)
if self.smell2_step==240:
self.smell2_step=0
self.smell2_step=self.smell2_step+1
#print("smell2="+str(self.sm
def particle_read(self):
data=[]
data=self.i2c.readfrom_mem(0x15,0x00,2)
self.particle_concentration=(data[0] & 0xff)<<8|(...
count_minutes=0
def sensor_read_one_minute(self,timer):
self.co2_read()
#self.motion_read()
self.lx_read()
self.temperature_read()
self.pressure_read()
self.humidity_read()
if self.count_minutes>=4:
#self.smell2_capture()
self.count_minutes=0
self.count_minutes=self.count_minutes+1
def sensor_read_five_minute(self,timer):
self.smell2_capture()
def sensor_read_100ms(self,timer):
if self.c100ms>=60:
self.c100ms=0
if self.c100ms%6==0:
self.motion_read()
if self.c100ms%10==0:
self.smell_capture()
self.smell2_capture()
self.c100ms=self.c100ms+1
#print("start sensor read one second")
#def sensor_read_1s(self,timer):
#
def __init__(self):
#print ("start setup")
self.u = UART(1, baudrate=9600, tx=Pin(4), rx=Pin...
#print('exception buf')
#interrupt exception
micropython.alloc_emergency_exception_buf(100)
#print('otimer')
self.one_minute_timer = Timer()
#self.timer_1s = Timer()
self.timer_100ms = Timer()
#print('adc')
self.pir=Pin(14, Pin.IN)
self.lx = ADC(Pin(27))
#self.co2 = ADC(Pin(28))
self.smell = ADC(Pin(26))
self.smell_a0=Pin(15,Pin.OUT)
self.smell2_heater=Pin(12,Pin.OUT)
self.smell2_out=Pin(13,Pin.OUT)
self.smell2_heater.value(1)
self.smell2_out.value(0)
self.smell2_in=ADC(Pin(28))
self.smell2_step=0
#print('i2c')
self.sda=Pin(0)
self.scl=Pin(1)
#print('i2c2')
self.i2c = machine.I2C(0, sda = self.sda, scl = s...
#print('bme280')
self.bme = BME280(i2c = self.i2c)
#print('x')
self.co2_value = 0
self.motion_value = 0
self.smell_value = 0
self.smell2_value = 0
self.lx_value = 0
self.pressure_value = ""
self.temperature_value = ""
self.humidity_value = ""
#self.one_hour_timer.init(period=600000,mode=Time...
self.one_minute_timer.init(period=60000, mode=Tim...
self.timer_100ms.init(period=100, mode=Timer.PERI...
#self.timer_1s.init(period=1000,mode=Timer.PERIOD...
##time.sleep(60)
#print ("end setup")
self.c100ms=0
class Parser:
def __init__(self,line,sensing):
self.line=line
self.sensing=sensing
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if p+keylen>=len(self.line):
return p
q=p
if self.line[q:q+keylen]==key:
#print("key="+key+" have found, q="+str(q+key...
return q+keylen
#print("key="+key+" is not found, p="+str(p))
return p
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',p)
if p!=q:
#print("p_String_Constant p="+str(p)+' '+self...
p=q
fx=self.line[p]
xlen=len(self.line)-p
while fx!='"':
if xlen==0:
return pw
if fx in ['\n','\t','\r']:
return pw
if fx=='\\':
p=p+1
p=p+1
fx=self.line[p]
xlen=len(self.line)-p
q=self.p_key('"',p)
if p!=q:
strc[0]=self.line[pw+1:q-1]
#print("p_string_constant, strc="+str(str...
return q
#print("p_String_Constant, fail to find \"")
return pw
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'/'+str(month[0])+'/'+st...
return p
def reset(self):
machine.reset()
def parse_the_line(self):
p=0
p=self.p_b(p)
q=self.p_key('ex',p)
if q>p:
p=q
p=self.p_b(p)
exp=self.line[p:]
#print(exp)
exec_locals={'self':self}
try:
rtn=eval(exp,globals(),exec_locals)
print(rtn)
except Exception as e:
print("error in "+exp)
print(e)
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
xlen=len(self.line)
if p>=xlen:
return p
while self.line[p]==' ':
p=p+1
if p>=xlen:
p=xlen
break
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.line)
while self.line[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.line[p:q]
#print("p_name p="+str(p)+" q="+str(q)+" name="+s...
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.line)
q=p
while self.line[p] in '0123456789':
q=q+1
if q>=last_p:
break
num[0]=int(self.line[p:q])
#print("p_number p="+str(p)+" q="+str(q)+" num="+...
return q
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
#print("p_qn_equation get "+nx[0]+"="+str...
return q
else:
eqs[nx[0]]=strc[0]
#print("p_an_equation get "+nx[0]+"="+str(str...
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(p,eqs)
if p==q:
return p
p=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.line.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
if __name__ == '__main__':
#
# ex self.sensing.get_co2()
# ex self.sensing.get_motion()
# ex self.sensing.get_lx()
# ex self.sensing.get_pressure()
# ex self.sensing.get_temperature()
# ex self.sensing.get_humidity()
#
sensing=Sensing()
while True:
line=input()
parser=Parser(line,sensing)
parser.parse_the_line()
gc.collect()
time.sleep(0.1)
}}
終了行:
[[FrontPage]]
#code(Python){{
# -*- coding: utf-8 -*-
from machine import UART,Pin,ADC, Timer
import micropython,time,gc
# for analog co2 sensor, gravity:analog infrared co2 sensor
# 20250316
# Updated 2018 and 2020
# This module is based on the below cited resources, whic...
# based on the documentation as provided in the Bosch Dat...
# the sample implementation provided therein.
#
# Final Document: BST-BME280-DS002-15
#
# Authors: Paul Cunnane 2016, Peter Dahlebrg 2016
#
# This module borrows from the Adafruit BME280 Python lib...
# Copyright notices are reproduced below.
#
# Those libraries were written for the Raspberry Pi. This...
# intended for the MicroPython and esp8266 boards.
#
# Copyright (c) 2014 Adafruit Industries
# Author: Tony DiCola
#
# Based on the BMP280 driver with BME280 changes provided...
# David J Taylor, Edinburgh (www.satsignal.eu)
#
# Based on Adafruit_I2C.py created by Kevin Townsend.
#
# Permission is hereby granted, free of charge, to any pe...
# of this software and associated documentation files (th...
# in the Software without restriction, including without ...
# to use, copy, modify, merge, publish, distribute, subli...
# copies of the Software, and to permit persons to whom t...
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice s...
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF A...
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF...
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. I...
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, D...
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OT...
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR...
# THE SOFTWARE.
#
from ustruct import unpack, unpack_from
from array import array
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_STATUS = 0xF3
BME280_REGISTER_CONTROL = 0xF4
MODE_SLEEP = const(0)
MODE_FORCED = const(1)
MODE_NORMAL = const(3)
BME280_TIMEOUT = const(100) # about 1 second timeout
class BME280:
def __init__(self,
mode=BME280_OSAMPLE_8,
address=BME280_I2CADDR,
i2c=None,
**kwargs):
# Check that mode is valid.
if type(mode) is tuple and len(mode) == 3:
self._mode_hum, self._mode_temp, self._mode_p...
elif type(mode) == int:
self._mode_hum, self._mode_temp, self._mode_p...
else:
raise ValueError("Wrong type for the mode par...
for mode in (self._mode_hum, self._mode_temp, sel...
if mode not in [BME280_OSAMPLE_1, BME280_OSAM...
BME280_OSAMPLE_8, BME280_OSAM...
raise ValueError(
'Unexpected mode value {0}. Set mode ...
'BME280_ULTRALOWPOWER, BME280_STANDAR...
'BME280_ULTRAHIGHRES'.format(mode))
self.address = address
if i2c is None:
raise ValueError('An I2C object is required.')
self.i2c = i2c
self.__sealevel = 101325
# load calibration data
dig_88_a1 = self.i2c.readfrom_mem(self.address, 0...
dig_e1_e7 = self.i2c.readfrom_mem(self.address, 0...
self.dig_T1, self.dig_T2, self.dig_T3, self.dig_P...
self.dig_P2, self.dig_P3, self.dig_P4, self.d...
self.dig_P6, self.dig_P7, self.dig_P8, self.d...
_, self.dig_H1 = unpack("<HhhHhhhhhhhhBB", di...
self.dig_H2, self.dig_H3, self.dig_H4,\
self.dig_H5, self.dig_H6 = unpack("<hBbhb", d...
# unfold H4, H5, keeping care of a potential sign
self.dig_H4 = (self.dig_H4 * 16) + (self.dig_H5 &...
self.dig_H5 //= 16
self.t_fine = 0
# temporary data holders which stay allocated
self._l1_barray = bytearray(1)
self._l8_barray = bytearray(8)
self._l3_resultarray = array("i", [0, 0, 0])
self._l1_barray[0] = self._mode_temp << 5 | self....
self.i2c.writeto_mem(self.address, BME280_REGISTE...
bytearray([0x3c | MODE_SLEEP...
def read_raw_data(self, result):
""" Reads the raw (uncompensated) data from the s...
Args:
result: array of length 3 or alike where ...
stored, in temperature, pressure, humidit...
Returns:
None
"""
self._l1_barray[0] = self._mode_hum
self.i2c.writeto_mem(self.address, BME280_REGISTE...
self._l1_barray)
self._l1_barray[0] = self._mode_temp << 5 | self....
self.i2c.writeto_mem(self.address, BME280_REGISTE...
self._l1_barray)
# Wait for conversion to complete
for _ in range(BME280_TIMEOUT):
if self.i2c.readfrom_mem(self.address, BME280...
time.sleep_ms(10) # still busy
else:
break # Sensor ready
else:
raise RuntimeError("Sensor BME280 not ready")
# burst readout from 0xF7 to 0xFE, recommended by...
self.i2c.readfrom_mem_into(self.address, 0xF7, se...
readout = self._l8_barray
# pressure(0xF7): ((msb << 16) | (lsb << 8) | xls...
raw_press = ((readout[0] << 16) | (readout[1] << ...
# temperature(0xFA): ((msb << 16) | (lsb << 8) | ...
raw_temp = ((readout[3] << 16) | (readout[4] << 8...
# humidity(0xFD): (msb << 8) | lsb
raw_hum = (readout[6] << 8) | readout[7]
result[0] = raw_temp
result[1] = raw_press
result[2] = raw_hum
def read_compensated_data(self, result=None):
""" Reads the data from the sensor and returns th...
Args:
result: array of length 3 or alike where ...
stored, in temperature, pressure, humidit...
this to read out the sensor without alloc...
Returns:
array with temperature, pressure, humidit...
from the result parameter if not None
"""
self.read_raw_data(self._l3_resultarray)
raw_temp, raw_press, raw_hum = self._l3_resultarray
# temperature
var1 = (((raw_temp // 8) - (self.dig_T1 * 2)) * s...
var2 = (raw_temp // 16) - self.dig_T1
var2 = (((var2 * var2) // 4096) * self.dig_T3) //...
self.t_fine = var1 + var2
temp = (self.t_fine * 5 + 128) // 256
# pressure
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) << 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
pressure = 0
else:
p = ((((1048576 - raw_press) << 31) - var2) *...
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) ...
var2 = (self.dig_P8 * p) >> 19
pressure = ((p + var1 + var2) >> 8) + (self.d...
# humidity
h = self.t_fine - 76800
h = (((((raw_hum << 14) - (self.dig_H4 << 20) -
(self.dig_H5 * h)) + 16384) >> 15) *
(((((((h * self.dig_H6) >> 10) *
(((h * self.dig_H3) >> 11) + 32768)) >> 1...
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.d...
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
humidity = h >> 12
if humidity < 0:
humidity = 0
if humidity > 100 * 1024:
humidity = 100 * 1024
if result:
result[0] = temp
result[1] = pressure
result[2] = humidity
return result
return array("i", (temp, pressure, humidity))
@property
def sealevel(self):
return self.__sealevel
@sealevel.setter
def sealevel(self, value):
if 30000 < value < 120000: # just ensure some re...
self.__sealevel = value
@property
def altitude(self):
'''
Altitude in m.
'''
from math import pow
try:
p = 44330 * (1.0 - pow((self.read_compensated...
self.__sealevel, 0.190...
except:
p = 0.0
return p
@property
def dew_point(self):
"""
Compute the dew point temperature for the current...
and Humidity measured pair
"""
from math import log
t, p, h = self.read_compensated_data()
t /= 100
h /= 1024
h = (log(h, 10) - 2) / 0.4343 + (17.62 * t) / (24...
return (243.12 * h / (17.62 - h)) * 100
@property
def values(self):
""" human readable values """
t, p, h = self.read_compensated_data()
p = p / 256
h = h / 1024
return ("{}C".format(t / 100), "{:.02f}hPa".forma...
"{:.02f}%".format(h))
class Sensing:
def get_co2(self):
return self.co2_value
def get_motion(self):
return self.motion_value
def get_lx(self):
return self.lx_value
def get_pressure(self):
return self.pressure_value
def get_temperature(self):
return self.temperature_value
def get_humidity(self):
return self.humidity_value
def co2_read(self):
#print("start read co2")
#global o_b, r_b, co2_value
self.o_b = bytearray([0xFF, 0x01, 0x86, 0x00, 0x0...
self.r_b=bytearray([0x00,0x00,0x00,0x00,0x00,0x00...
try:
self.u.write(self.o_b)
time.sleep(10)
result = self.u.readinto(self.r_b,9)
#print(result)
v=int(0x00000 | (self.r_b[2]<<8 | self.r_b[3]))
#print("co2:"+str(v))
self.co2_value = v
except Exception as e:
#print(e)
return
#adcVal = self.co2.read_u16()
#vx = float(adcVal)*(3.3/65536.0)
#if vx == 0:
# #print("A problem has occurred with the sens...
# self.co2_value=0
#elif vx < 0.4:
# #print("Pre-heating the sensor...")
# self.co2_value=0
#else:
# vd=vx-0.4
# concentration=(vd*5000.0)/1.6
# self.co2_value = int(concentration)
def get_smell(self):
return self.smell_value
def get_smell2(self):
return self.smell2_value
def get_particle(self):
self.particle_read()
return self.particle_concentration
motion_counter = 0
motion_accum = 0
def motion_read(self):
#print("start read motion")
#global motion_counter, motion_accum, motion_value
if self.motion_counter < 100:
self.motion_counter = self.motion_counter + 1
self.motion_accum = self.motion_accum + self....
else:
self.motion_value = self.motion_accum
self.motion_counter = 0
self.motion_accum = 0
#print("motion:"+str(self.motion_value))
def lx_read(self):
#global lx_value
self.lx_value = self.lx.read_u16()
#print("lx:"+str(self.lx_value))
def smell_read(self):
#global lx_value
self.smell_value = self.smell.read_u16()
#print("lx:"+str(self.lx_value))
def pressure_read(self):
#global pressure_value
self.pressure_value = self.bme.values[1]
#print("pressure:"+self.pressure_value)
def temperature_read(self):
#global temperature_value
self.temperature_value = self.bme.values[0]
#print("temperature:"+self.temperature_value)
def humidity_read(self):
#global humidity_value
self.humidity_value = self.bme.values[2]
#print("humidity:"+self.humidity_value)
def smell_capture(self):
self.smell_a0.value(1)
time.sleep(0.001)
self.smell_read()
time.sleep(0.001)
self.smell_a0.value(0)
#print("smell="+str(self.smell_value))
def smell2_capture(self): #called by each 1sec.
if self.smell2_step==0:
val=0
self.smell2_out.value(1)
if self.smell2_step==3:
#time.sleep(3)
self.smell2_value=self.smell2_in.read_u16()
if self.smell2_step==5:
#time.sleep(2)
self.smell2_out.value(0)
self.smell2_heater.value(0)
if self.smell2_step==12:
#time.sleep(8)
self.smell2_heater.value(1)
if self.smell2_step==240:
self.smell2_step=0
self.smell2_step=self.smell2_step+1
#print("smell2="+str(self.sm
def particle_read(self):
data=[]
data=self.i2c.readfrom_mem(0x15,0x00,2)
self.particle_concentration=(data[0] & 0xff)<<8|(...
count_minutes=0
def sensor_read_one_minute(self,timer):
self.co2_read()
#self.motion_read()
self.lx_read()
self.temperature_read()
self.pressure_read()
self.humidity_read()
if self.count_minutes>=4:
#self.smell2_capture()
self.count_minutes=0
self.count_minutes=self.count_minutes+1
def sensor_read_five_minute(self,timer):
self.smell2_capture()
def sensor_read_100ms(self,timer):
if self.c100ms>=60:
self.c100ms=0
if self.c100ms%6==0:
self.motion_read()
if self.c100ms%10==0:
self.smell_capture()
self.smell2_capture()
self.c100ms=self.c100ms+1
#print("start sensor read one second")
#def sensor_read_1s(self,timer):
#
def __init__(self):
#print ("start setup")
self.u = UART(1, baudrate=9600, tx=Pin(4), rx=Pin...
#print('exception buf')
#interrupt exception
micropython.alloc_emergency_exception_buf(100)
#print('otimer')
self.one_minute_timer = Timer()
#self.timer_1s = Timer()
self.timer_100ms = Timer()
#print('adc')
self.pir=Pin(14, Pin.IN)
self.lx = ADC(Pin(27))
#self.co2 = ADC(Pin(28))
self.smell = ADC(Pin(26))
self.smell_a0=Pin(15,Pin.OUT)
self.smell2_heater=Pin(12,Pin.OUT)
self.smell2_out=Pin(13,Pin.OUT)
self.smell2_heater.value(1)
self.smell2_out.value(0)
self.smell2_in=ADC(Pin(28))
self.smell2_step=0
#print('i2c')
self.sda=Pin(0)
self.scl=Pin(1)
#print('i2c2')
self.i2c = machine.I2C(0, sda = self.sda, scl = s...
#print('bme280')
self.bme = BME280(i2c = self.i2c)
#print('x')
self.co2_value = 0
self.motion_value = 0
self.smell_value = 0
self.smell2_value = 0
self.lx_value = 0
self.pressure_value = ""
self.temperature_value = ""
self.humidity_value = ""
#self.one_hour_timer.init(period=600000,mode=Time...
self.one_minute_timer.init(period=60000, mode=Tim...
self.timer_100ms.init(period=100, mode=Timer.PERI...
#self.timer_1s.init(period=1000,mode=Timer.PERIOD...
##time.sleep(60)
#print ("end setup")
self.c100ms=0
class Parser:
def __init__(self,line,sensing):
self.line=line
self.sensing=sensing
def p_key(self,key,p):
#print("p_key key="+key+" p="+str(p)+" "+self.htm...
keylen=len(key)
if p+keylen>=len(self.line):
return p
q=p
if self.line[q:q+keylen]==key:
#print("key="+key+" have found, q="+str(q+key...
return q+keylen
#print("key="+key+" is not found, p="+str(p))
return p
def p_String_Constant(self,p,strc):
#print("p_String_Constant p="+str(p)+' '+self.htm...
rx=[""]
xstr=""
pw=p
q=self.p_key('"',p)
if p!=q:
#print("p_String_Constant p="+str(p)+' '+self...
p=q
fx=self.line[p]
xlen=len(self.line)-p
while fx!='"':
if xlen==0:
return pw
if fx in ['\n','\t','\r']:
return pw
if fx=='\\':
p=p+1
p=p+1
fx=self.line[p]
xlen=len(self.line)-p
q=self.p_key('"',p)
if p!=q:
strc[0]=self.line[pw+1:q-1]
#print("p_string_constant, strc="+str(str...
return q
#print("p_String_Constant, fail to find \"")
return pw
def p_Date(self,p,date_str):
#print("p_Date p="+str(p)+' '+self.html[p:p+10])
rx=[""]
xstr=""
pw=p
year=[0]
q=self.p_number(p,year)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
month=[0]
q=self.p_number(p,month)
if p==q:
return pw
p=q
q=self.p_key('/',p)
if p==q:
return pw
p=q
day=[0]
q=self.p_number(p,day)
if p==q:
return False
p=q
q=self.p_b(p)
if p==q:
return pw
p=q
hour=[0]
q=self.p_number(p,hour)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
minute=[0]
q=self.p_number(p,minute)
if p==q:
return pw
p=q
q=self.p_key(':',p)
if p==q:
return pw
p=q
second=[0]
q=self.p_number(p,second)
if p==q:
return pw
p=q
date_str[0]=str(year[0])+'/'+str(month[0])+'/'+st...
return p
def reset(self):
machine.reset()
def parse_the_line(self):
p=0
p=self.p_b(p)
q=self.p_key('ex',p)
if q>p:
p=q
p=self.p_b(p)
exp=self.line[p:]
#print(exp)
exec_locals={'self':self}
try:
rtn=eval(exp,globals(),exec_locals)
print(rtn)
except Exception as e:
print("error in "+exp)
print(e)
def p_b(self,p):
#print("parse_b")
#print("p="+str(p))
xlen=len(self.line)
if p>=xlen:
return p
while self.line[p]==' ':
p=p+1
if p>=xlen:
p=xlen
break
return p
def p_name(self,p,name):
#print("p_name, self.html="+ self.html[p:p+10])
#print("p="+str(p)+"..."+self.html[p:p+10])
q=p
last_p=len(self.line)
while self.line[q] in 'abcdefghijklmnopqrstuvwxyz...
#print("self.html["+str(q)+"]="+self.html[q])
q=q+1
if q>=last_p:
break
name[0]=self.line[p:q]
#print("p_name p="+str(p)+" q="+str(q)+" name="+s...
#print("p="+str(p)+" q="+str(q)+" name="+name[0])
#print("name[0]= "+name[0])
return q
def p_number(self,p,num):
#print("p_number")
#print("p="+str(p))
n=""
last_p=len(self.line)
q=p
while self.line[p] in '0123456789':
q=q+1
if q>=last_p:
break
num[0]=int(self.line[p:q])
#print("p_number p="+str(p)+" q="+str(q)+" num="+...
return q
def p_an_equation(self,p,eqs):
#print("p_an_equation p="+str(p)+' '+self.html[p:...
pw=p
p=self.p_b(p)
nx=[""]
strc=[""]
q=self.p_name(p,nx)
#print("p_an_equation name="+nx[0])
if p==q:
#print("p_an_equation fail to find name")
return pw
p=q
p=self.p_b(p)
q=self.p_key('=',p)
if p==q:
#print("p_an_equation fail to find =")
return pw
p=q
p=self.p_b(p)
q=self.p_String_Constant(p,strc)
if p==q:
num=[0]
q=self.p_number(p,num)
if p==q:
return pw
else:
eqs[nx[0]]=num[0]
#print("p_qn_equation get "+nx[0]+"="+str...
return q
else:
eqs[nx[0]]=strc[0]
#print("p_an_equation get "+nx[0]+"="+str(str...
return q
def p_equations(self,p,eqs):
#print("parse_equations")
#print("p="+str(p))
pw=p
while True:
q=self.p_an_equation(p,eqs)
if p==q:
return p
p=q
def p_get_equations_of_the_tag(self,p,tag_name,eqs):
#print("p_get_equations_of_the_tag <"+tag_name)
#print("p="+str(p)+' '+self.html[p:p+10])
pw=p
try:
q=self.line.index('<'+tag_name,pw)
except:
#print("p_get_equation_of_the_tag fail to fin...
return p
#print("q="+str(q)+' '+self.html[q:q+10])
q=q+len('<'+tag_name)
#print("q="+str(q)+' '+self.html[q:q+10])
if pw==q:
#print("p_get_equation_of_the_tag fail to fin...
return p
pw=q
q=self.p_equations(pw,eqs)
#print("after equations q="+str(q) +' '+self.html...
if pw==q:
return p
pw=q
pw=self.p_b(pw)
#print("pw="+str(pw))
q=self.p_key('>',pw)
#print("after > q="+str(q)+' '+self.html[q:q+10])
if pw!=q:
return q
q=self.p_key('/>',pw)
if pw!=q:
return q
return p
if __name__ == '__main__':
#
# ex self.sensing.get_co2()
# ex self.sensing.get_motion()
# ex self.sensing.get_lx()
# ex self.sensing.get_pressure()
# ex self.sensing.get_temperature()
# ex self.sensing.get_humidity()
#
sensing=Sensing()
while True:
line=input()
parser=Parser(line,sensing)
parser.parse_the_line()
gc.collect()
time.sleep(0.1)
}}
ページ名: