Last active
November 14, 2020 04:13
-
-
Save vpcf/e46606f8cd07ae7054d6c4525ab7d90d to your computer and use it in GitHub Desktop.
RS-BTWATTCH2用 測定クライアント試作 - Windows 10 1709以降 + Python 3 + Bleak
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time, datetime | |
from bleak import BleakClient, discover | |
from functools import reduce | |
def crc8(payload: bytearray): | |
polynomial = 0x85 | |
def crc1(crc, times=0): | |
if times >= 8: | |
return crc | |
else: | |
if crc & 0x80: | |
return crc1((crc << 1 ^ polynomial) & 0xff, times+1) | |
else: | |
return crc1(crc << 1, times+1) | |
return reduce(lambda x, y: crc1(y & 0xff ^ x), payload, 0x00) | |
class BTWATTCH2: | |
def __init__(self, address=None): | |
self.address = address | |
self.client = None | |
self.loop = asyncio.get_event_loop() | |
self.services = self.loop.run_until_complete(self.setup()) | |
self.Tx = self.services.get_characteristic('6e400002-b5a3-f393-e0a9-e50e24dcca9e') # Nordic UART Tx | |
self.Rx = self.services.get_characteristic('6e400003-b5a3-f393-e0a9-e50e24dcca9e') # Nordic UART Rx | |
self.enable_notify() | |
async def setup(self): | |
if self.address: | |
self.client = BleakClient(self.address) | |
else: | |
ble_devices = await discover() | |
btwattch2s = [d for d in ble_devices if "BTWATTCH2" in d.name] | |
self.address = btwattch2s[0].address | |
self.client = BleakClient(btwattch2s[0]) | |
await self.client.connect() | |
return await self.client.get_services() | |
def enable_notify(self): | |
async def _enable_notify(): | |
await self.client.start_notify(self.Rx, self.format_message()) | |
return self.loop.run_until_complete(_enable_notify()) | |
def disable_notify(self): | |
async def _disable_notify(): | |
await self.client.stop_notify(self.Rx) | |
return self.loop.run_until_complete(_disable_notify()) | |
def cmd(self, payload: bytearray): | |
header = b'\xaa' | |
pld_length = len(payload).to_bytes(2, 'big') | |
return header + pld_length + payload + crc8(payload).to_bytes(1, 'big') | |
def write(self, payload: bytearray): | |
async def _write(payload): | |
command = self.cmd(payload) | |
await self.client.write_gatt_char(self.Tx, command, True) | |
return self.loop.run_until_complete(_write(payload)) | |
def set_rtc(self): | |
d = datetime.datetime.now().timetuple() | |
payload = 0x01, d.tm_sec, d.tm_min, d.tm_hour, d.tm_mday, d.tm_mon-1, d.tm_year-1900, d.tm_wday | |
self.write(bytearray(payload)) | |
def LED_blink(self): | |
self.write(bytearray.fromhex("3e0102020f")) | |
def on(self): | |
self.write(bytearray.fromhex("a701")) | |
def off(self): | |
self.write(bytearray.fromhex("a700")) | |
def unknown1(self): | |
self.write(bytearray.fromhex("0700007800")) | |
self.write(bytearray.fromhex("07f8693000")) | |
self.write(bytearray.fromhex("07d0461800")) | |
self.write(bytearray.fromhex("07a8231800")) | |
self.write(bytearray.fromhex("0780003c00")) | |
def unknown2(self): | |
self.write(bytearray.fromhex("a0")) | |
def unknown3(self): | |
self.write(bytearray.fromhex("04")) | |
def unknown4(self): | |
self.write(bytearray.fromhex("03")) | |
def unknown5(self): | |
self.write(bytearray.fromhex("0200")) | |
def measure(self): | |
async def _measure(): | |
payload = bytearray.fromhex("08") | |
command = self.cmd(payload) | |
while True: | |
time.sleep(1 - datetime.datetime.now().microsecond/1e6) | |
await self.client.write_gatt_char(self.Tx, command) | |
return self.loop.run_until_complete(_measure()) | |
def format_message(self): | |
cache = bytearray() | |
def _format_message(sender: int, data: bytearray): | |
if len(data) == 20: | |
nonlocal cache | |
cache = cache + data | |
else: | |
data = cache + data | |
if data[3] == 0x08: | |
voltage = int.from_bytes(data[5:11], 'little') / (16**6) | |
current = int.from_bytes(data[11:17], 'little') / (32**6) * 1000 | |
wattage = int.from_bytes(data[17:23], 'little') / (16**6) | |
timestamp = datetime.datetime(1900+data[28], data[27]+1, *data[26:22:-1]) | |
print( | |
"{0},{1:.3f}W,{2:.3f}V,{3:.3f}mA".format(timestamp, wattage, voltage, current) | |
) | |
elif data[3] == 0x01: | |
pass # RTC timer initialized successfully | |
else: | |
pass # to be implemented | |
cache.clear() | |
return _format_message | |
# BDアドレスをstringで渡してインスタンス生成(引数無しの場合、ペアリング済みのBTWATTCH2の1番目に接続) | |
wattchecker = BTWATTCH2('dd:c8:ba:34:71:60') | |
# 時刻初期化 | |
wattchecker.set_rtc() | |
# 1秒おきに測定 | |
wattchecker.measure() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment