Skip to content

Instantly share code, notes, and snippets.

@vpcf
Last active November 14, 2020 04:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vpcf/e46606f8cd07ae7054d6c4525ab7d90d to your computer and use it in GitHub Desktop.
Save vpcf/e46606f8cd07ae7054d6c4525ab7d90d to your computer and use it in GitHub Desktop.
RS-BTWATTCH2用 測定クライアント試作 - Windows 10 1709以降 + Python 3 + Bleak
import asyncio
import time, datetime
from bleak import BleakClient, discover
from functools import reduce
def crc8(payload: bytearray):
polynomial = 0x85
def crc1(crc, times=0):
if times >= 8:
return crc
else:
if crc & 0x80:
return crc1((crc << 1 ^ polynomial) & 0xff, times+1)
else:
return crc1(crc << 1, times+1)
return reduce(lambda x, y: crc1(y & 0xff ^ x), payload, 0x00)
class BTWATTCH2:
def __init__(self, address=None):
self.address = address
self.client = None
self.loop = asyncio.get_event_loop()
self.services = self.loop.run_until_complete(self.setup())
self.Tx = self.services.get_characteristic('6e400002-b5a3-f393-e0a9-e50e24dcca9e') # Nordic UART Tx
self.Rx = self.services.get_characteristic('6e400003-b5a3-f393-e0a9-e50e24dcca9e') # Nordic UART Rx
self.enable_notify()
async def setup(self):
if self.address:
self.client = BleakClient(self.address)
else:
ble_devices = await discover()
btwattch2s = [d for d in ble_devices if "BTWATTCH2" in d.name]
self.address = btwattch2s[0].address
self.client = BleakClient(btwattch2s[0])
await self.client.connect()
return await self.client.get_services()
def enable_notify(self):
async def _enable_notify():
await self.client.start_notify(self.Rx, self.format_message())
return self.loop.run_until_complete(_enable_notify())
def disable_notify(self):
async def _disable_notify():
await self.client.stop_notify(self.Rx)
return self.loop.run_until_complete(_disable_notify())
def cmd(self, payload: bytearray):
header = b'\xaa'
pld_length = len(payload).to_bytes(2, 'big')
return header + pld_length + payload + crc8(payload).to_bytes(1, 'big')
def write(self, payload: bytearray):
async def _write(payload):
command = self.cmd(payload)
await self.client.write_gatt_char(self.Tx, command, True)
return self.loop.run_until_complete(_write(payload))
def set_rtc(self):
d = datetime.datetime.now().timetuple()
payload = 0x01, d.tm_sec, d.tm_min, d.tm_hour, d.tm_mday, d.tm_mon-1, d.tm_year-1900, d.tm_wday
self.write(bytearray(payload))
def LED_blink(self):
self.write(bytearray.fromhex("3e0102020f"))
def on(self):
self.write(bytearray.fromhex("a701"))
def off(self):
self.write(bytearray.fromhex("a700"))
def unknown1(self):
self.write(bytearray.fromhex("0700007800"))
self.write(bytearray.fromhex("07f8693000"))
self.write(bytearray.fromhex("07d0461800"))
self.write(bytearray.fromhex("07a8231800"))
self.write(bytearray.fromhex("0780003c00"))
def unknown2(self):
self.write(bytearray.fromhex("a0"))
def unknown3(self):
self.write(bytearray.fromhex("04"))
def unknown4(self):
self.write(bytearray.fromhex("03"))
def unknown5(self):
self.write(bytearray.fromhex("0200"))
def measure(self):
async def _measure():
payload = bytearray.fromhex("08")
command = self.cmd(payload)
while True:
time.sleep(1 - datetime.datetime.now().microsecond/1e6)
await self.client.write_gatt_char(self.Tx, command)
return self.loop.run_until_complete(_measure())
def format_message(self):
cache = bytearray()
def _format_message(sender: int, data: bytearray):
if len(data) == 20:
nonlocal cache
cache = cache + data
else:
data = cache + data
if data[3] == 0x08:
voltage = int.from_bytes(data[5:11], 'little') / (16**6)
current = int.from_bytes(data[11:17], 'little') / (32**6) * 1000
wattage = int.from_bytes(data[17:23], 'little') / (16**6)
timestamp = datetime.datetime(1900+data[28], data[27]+1, *data[26:22:-1])
print(
"{0},{1:.3f}W,{2:.3f}V,{3:.3f}mA".format(timestamp, wattage, voltage, current)
)
elif data[3] == 0x01:
pass # RTC timer initialized successfully
else:
pass # to be implemented
cache.clear()
return _format_message
# BDアドレスをstringで渡してインスタンス生成(引数無しの場合、ペアリング済みのBTWATTCH2の1番目に接続)
wattchecker = BTWATTCH2('dd:c8:ba:34:71:60')
# 時刻初期化
wattchecker.set_rtc()
# 1秒おきに測定
wattchecker.measure()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment