Skip to content

Instantly share code, notes, and snippets.

@hkwi
Last active November 22, 2019 04:47
Show Gist options
  • Save hkwi/b09ccfd501c73e92c38c5a38a5196462 to your computer and use it in GitHub Desktop.
Save hkwi/b09ccfd501c73e92c38c5a38a5196462 to your computer and use it in GitHub Desktop.
weather:bit micro-python implementation take 1

MicroPython Micro:bit で Weather:bit のコードの実装を試みた。

weatherbit.py でメモリ溢れした。

from struct import unpack
from microbit import i2c
bme280_calib = {}
def bme280_read(addr, length):
i2c.write(0x76, bytes([addr]), repeat=True)
return i2c.read(0x76, length)
def bme280_write(*addr_and_data):
assert len(addr_and_data) % 2 == 0
i2c.write(0x76, bytes(addr_and_data))
def bme280_init():
bme280_write(0xF2, 0x01) # ctrlHum
bme280_write(0xF4, 0x27) # ctrlMeas
bme280_write(0xF5, 0x00) # config
t = unpack("<H2hH8h", bme280_read(0x88, 24))
bme280_calib["t"] = t[:3]
bme280_calib["p"] = t[3:]
t = unpack("B", bme280_read(0xA1, 1))
d = bme280_read(0xE1, 7)
t += unpack("<hB", d)
t += ((d[3]<<4 | d[4]&0x0f),
(d[4]>>4 | d[5]<<4))
t += unpack("b", d[6:])
bme280_calib["h"] = t
def bme280_reset():
bme280_write(0xE0, 0xB6)
def bme280_compensate_t(adc_T):
t1, t2, t3 = bme280_calib["t"]
v1 = (((adc_T>>3) - (t1<<1)) * t2)>>11
v2 = (((((adc_T>>4) - t1) * ((adc_T>>4) - t1))>>12) * t3)>>14
t_fine = v1 + v2
return t_fine, (t_fine * 5 + 128) >> 8
def bme280_compensate_p(t_fine, adc_P):
p1, p2, p3, p4, p5, p6, p7, p8, p9 = bme280_calib["p"]
v1 = t_fine - 128000
v2 = v1 * v1 * p6
v2 = v2 + ((v1 * p5)<<17)
v2 = v2 + (p4<<35)
v1 = ((v1 * v1 * p3)>>8) + ((v1 * p2)<<12)
v1 = ((1<<47) + v1) * p1 >> 33
if v1 == 0:
return 0
p = 1048576 - adc_P
p = ((p<<31) - v2) * 3125 // v1
v1 = (p9 * (p>>13) * (p>>13))>>25
v2 = (p8 * p)>>19
return ((p + v1 + v2)>>8) + (p7<<4)
def bme280_compensate_h(t_fine, adc_H):
h1, h2, h3, h4, h5, h6 = bme280_calib["h"]
x1 = t_fine - 76800
x1 = (
((adc_H<<14) - (h4<<20) - h5*x1 + 16384)>>15
) * ((((
(((x1*h6)>>10) * (((x1*h3)>>11) + 32768))>>10
) + 2097152) * h2 + 8192 )>>14)
x1 = x1 - (((((x1>>15) * (x1>>15))>>7) * h1)>>4)
if x1 < 0:
x1 = 0
if x1 > 419430400:
x1 = 419430400
return x1>>12
def bme280_measure():
d = bme280_read(0xF7, 8)
t_fine, T = bme280_compensate_t(d[3]<<12 | d[4]<<4 | d[5]>>4)
P = bme280_compensate_p(t_fine, d[0]<<12 | d[1]<<4 | d[2]>>4)
H = bme280_compensate_h(t_fine, d[6]<<8 | d[7])
return dict(T=T, P=P, H=H, t_fine=t_fine)
import utime
from struct import unpack, unpack_from
from microbit import i2c, pin1, pin2, pin8
class Bme280(object):
def __init__(self, i2c_addr=0x76):
self.i2c_addr = i2c_addr
self.write(0xF2, 0x01) # ctrlHum
self.write(0xF4, 0x27) # ctrlMeas
self.write(0xF5, 0x00) # config
self.prefetch_calibration()
def read(self, addr, length):
i2c.write(self.i2c_addr, bytes([addr]), repeat=True)
return i2c.read(self.i2c_addr, length)
def write(self, *addr_and_data):
assert len(addr_and_data) % 2 == 0
i2c.write(self.i2c_addr, bytes(addr_and_data))
def reset(self):
self.write(0xE0, 0xB6)
def prefetch_calibration(self):
t = unpack("<H2hH8h", self.read(0x88, 24))
self.calib_t = t[:3]
self.calib_p = t[3:]
t = unpack("B", self.read(0xA1, 1))
d = self.read(0xE1, 7)
t += unpack("<hB", d)
t += ((d[3]<<4 | d[4]&0x0f),
(d[4]>>4 | d[5]<<4))
t += unpack_from("b", d, 6)
self.calib_h = t
def compensate_t(adc_T):
"""
@return 0.01 DegC "5123" equals 51.23 DegC.
"""
t1, t2, t3 = self.calib_t
v1 = (((adc_T>>3) - (t1<<1)) * t2)>>11
v2 = (((((adc_T>>4) - t1) * ((adc_T>>4) - t1))>>12) * t3)>>14
t_fine = v1 + v2
return t_fine, (t_fine * 5 + 128) >> 8
def compensate_p(t_fine, adc_P):
"""
Q24.8 format (24 integer bits and 8 fractional)
24674867 represents 24674867/256 = 96386.2 Pa (963.862 hPa)
"""
p1, p2, p3, p4, p5, p6, p7, p8, p9 = self.calib_p
v1 = t_fine - 128000
v2 = v1 * v1 * p6
v2 = v2 + ((v1 * p5)<<17)
v2 = v2 + (p4<<35)
v1 = ((v1 * v1 * p3)>>8) + ((v1 * p2)<<12)
v1 = ((1<<47) + v1) * p1 >> 33
if v1 == 0:
return 0
p = 1048576 - adc_P
p = ((p<<31) - v2) * 3125 // v1
v1 = (p9 * (p>>13) * (p>>13))>>25
v2 = (p8 * p)>>19
return ((p + v1 + v2)>>8) + (p7<<4)
def compensate_h(t_fine, adc_H):
"""
humidity in %RH as unsigned 32 bit integer in Q22.10 format (22 integer and 10 fractional bits).
47445 represents 47445/1024 = 46.333 %RH
"""
h1, h2, h3, h4, h5, h6 = self.calib_h
x1 = t_fine - 76800
x1 = (
((adc_H<<14) - (h4<<20) - h5*x1 + 16384)>>15
) * ((((
(((x1*h6)>>10) * (((x1*h3)>>11) + 32768))>>10
) + 2097152) * h2 + 8192 )>>14)
x1 = x1 - (((((x1>>15) * (x1>>15))>>7) * h1)>>4)
if x1 < 0:
x1 = 0
if x1 > 419430400:
x1 = 419430400
return x1>>12
def measure(self):
d = self.read(0xF7, 8)
t_fine, T = self.compensate_t(d[3]<<12 | d[4]<<4 | d[5]>>4)
P = self.compensate_p(t_fine, d[0]<<12 | d[1]<<4 | d[2]>>4)
H = self.compensate_h(t_fine, d[6]<<8 | d[7])
return dict(T=T, P=P, H=H, t_fine=t_fine)
class Rain(object):
def __init__(self):
self.pin = pin2.read_digital()
self.dumps = 0
def spin(self):
n = pin2.read_digital()
if self.pin == 0 and n == 1:
self.dumps += 1
self.pin = n
def value(self):
return (self.dumps * 11)/1000
class Wind(object):
def __init__(self):
self.pin = pin8.read_digital()
self.turns = 0
self.turn32 = 0
self.window_mph()
def spin(self):
n = pin8.read_digital()
if self.pin == 0 and n == 1:
self.turns += 1
self.turn32 = (self.turn32+1) & 0xFFFFFFFF
self.pin = n
cur = utime.ticks_ms()
if self.old < self.next:
if cur > self.next:
self.window_mph()
else:
if cur < self.old and cur > self.next:
self.window_mph()
def window_mph(self):
self.mph = (self.turns/2) / (1492/1000)
self.turns = 0
self.ms_old = utime.ticks_ms()
self.ms_next = utime.ticks_add(self.ms, 2000)
def wind_direction():
d = pin1.read_analog()
if d < 906 and d > 886:
s = "N"
elif d < 712 and d > 692:
s = "NE"
elif d < 415 and d > 395:
s = "E"
elif d < 498 and d > 478:
s = "SE"
elif d < 584 and d > 564:
s = "S"
elif d < 819 and d > 799:
s = "SW"
elif d < 988 and d > 968:
s = "W"
elif d < 959 and d > 939:
s = "NW"
else:
s = "???"
return {"winDirRaw":d, "windDirStr":s}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment