-
-
Save cookejames/8671ab9690bf2730a28318961a2a8786 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This example finds and connects to a BLE temperature sensor (e.g. the one in ble_temperature.py). | |
import bluetooth | |
import random | |
import struct | |
import time | |
import micropython | |
from ble_advertising import decode_services, decode_name | |
from micropython import const | |
_IRQ_CENTRAL_CONNECT = const(1) | |
_IRQ_CENTRAL_DISCONNECT = const(2) | |
_IRQ_GATTS_WRITE = const(3) | |
_IRQ_GATTS_READ_REQUEST = const(4) | |
_IRQ_SCAN_RESULT = const(5) | |
_IRQ_SCAN_DONE = const(6) | |
_IRQ_PERIPHERAL_CONNECT = const(7) | |
_IRQ_PERIPHERAL_DISCONNECT = const(8) | |
_IRQ_GATTC_SERVICE_RESULT = const(9) | |
_IRQ_GATTC_SERVICE_DONE = const(10) | |
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11) | |
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12) | |
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13) | |
_IRQ_GATTC_DESCRIPTOR_DONE = const(14) | |
_IRQ_GATTC_READ_RESULT = const(15) | |
_IRQ_GATTC_READ_DONE = const(16) | |
_IRQ_GATTC_WRITE_DONE = const(17) | |
_IRQ_GATTC_NOTIFY = const(18) | |
_IRQ_GATTC_INDICATE = const(19) | |
_ADV_IND = const(0x00) | |
_ADV_DIRECT_IND = const(0x01) | |
_ADV_SCAN_IND = const(0x02) | |
_ADV_NONCONN_IND = const(0x03) | |
# org.bluetooth.service.environmental_sensing | |
_ENV_SENSE_UUID = bluetooth.UUID(0x181A) | |
# org.bluetooth.characteristic.temperature | |
_TEMP_UUID = bluetooth.UUID(0x2A6E) | |
_TEMP_CHAR = ( | |
_TEMP_UUID, | |
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, | |
) | |
_ENV_SENSE_SERVICE = ( | |
_ENV_SENSE_UUID, | |
(_TEMP_CHAR,), | |
) | |
# org.bluetooth.characteristic.gap.appearance.xml | |
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768) | |
class BLETemperatureCentral: | |
def __init__(self, ble): | |
self._ble = ble | |
self._ble.active(True) | |
self._ble.irq(self._irq) | |
self._reset() | |
def _reset(self): | |
# Cached name and address from a successful scan. | |
self._name = None | |
self._addr_type = None | |
self._addr = None | |
# Cached value (if we have one) | |
self._value = None | |
# Callbacks for completion of various operations. | |
# These reset back to None after being invoked. | |
self._scan_callback = None | |
self._conn_callback = None | |
self._read_callback = None | |
# Persistent callback for when new data is notified from the device. | |
self._notify_callback = None | |
# Connected device. | |
self._conn_handle = None | |
self._start_handle = None | |
self._end_handle = None | |
self._value_handle = None | |
def _irq(self, event, data): | |
if event == _IRQ_SCAN_RESULT: | |
addr_type, addr, adv_type, rssi, adv_data = data | |
if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _ENV_SENSE_UUID in decode_services( | |
adv_data | |
): | |
# Found a potential device, remember it and stop scanning. | |
self._addr_type = addr_type | |
self._addr = bytes( | |
addr | |
) # Note: addr buffer is owned by caller so need to copy it. | |
self._name = decode_name(adv_data) or "?" | |
self._ble.gap_scan(None) | |
elif event == _IRQ_SCAN_DONE: | |
if self._scan_callback: | |
if self._addr: | |
# Found a device during the scan (and the scan was explicitly stopped). | |
self._scan_callback(self._addr_type, self._addr, self._name) | |
self._scan_callback = None | |
else: | |
# Scan timed out. | |
self._scan_callback(None, None, None) | |
elif event == _IRQ_PERIPHERAL_CONNECT: | |
# Connect successful. | |
conn_handle, addr_type, addr = data | |
if addr_type == self._addr_type and addr == self._addr: | |
self._conn_handle = conn_handle | |
self._ble.gattc_discover_services(self._conn_handle) | |
elif event == _IRQ_PERIPHERAL_DISCONNECT: | |
# Disconnect (either initiated by us or the remote end). | |
conn_handle, _, _ = data | |
if conn_handle == self._conn_handle: | |
# If it was initiated by us, it'll already be reset. | |
self._reset() | |
elif event == _IRQ_GATTC_SERVICE_RESULT: | |
# Connected device returned a service. | |
conn_handle, start_handle, end_handle, uuid = data | |
if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID: | |
self._start_handle, self._end_handle = start_handle, end_handle | |
elif event == _IRQ_GATTC_SERVICE_DONE: | |
# Service query complete. | |
if self._start_handle and self._end_handle: | |
self._ble.gattc_discover_characteristics( | |
self._conn_handle, self._start_handle, self._end_handle | |
) | |
else: | |
print("Failed to find environmental sensing service.") | |
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: | |
# Connected device returned a characteristic. | |
conn_handle, def_handle, value_handle, properties, uuid = data | |
if conn_handle == self._conn_handle and uuid == _TEMP_UUID: | |
self._value_handle = value_handle | |
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: | |
# Characteristic query complete. | |
if self._value_handle: | |
# We've finished connecting and discovering device, fire the connect callback. | |
if self._conn_callback: | |
self._conn_callback() | |
else: | |
print("Failed to find temperature characteristic.") | |
elif event == _IRQ_GATTC_READ_RESULT: | |
# A read completed successfully. | |
conn_handle, value_handle, char_data = data | |
if conn_handle == self._conn_handle and value_handle == self._value_handle: | |
self._update_value(char_data) | |
if self._read_callback: | |
self._read_callback(self._value) | |
self._read_callback = None | |
elif event == _IRQ_GATTC_READ_DONE: | |
# Read completed (no-op). | |
conn_handle, value_handle, status = data | |
elif event == _IRQ_GATTC_NOTIFY: | |
# The ble_temperature.py demo periodically notifies its value. | |
conn_handle, value_handle, notify_data = data | |
if conn_handle == self._conn_handle and value_handle == self._value_handle: | |
self._update_value(notify_data) | |
if self._notify_callback: | |
self._notify_callback(self._value) | |
# Returns true if we've successfully connected and discovered characteristics. | |
def is_connected(self): | |
return self._conn_handle is not None and self._value_handle is not None | |
# Find a device advertising the environmental sensor service. | |
def scan(self, callback=None): | |
self._addr_type = None | |
self._addr = None | |
self._scan_callback = callback | |
self._ble.gap_scan(2000, 30000, 30000) | |
# Connect to the specified device (otherwise use cached address from a scan). | |
def connect(self, addr_type=None, addr=None, callback=None): | |
self._addr_type = addr_type or self._addr_type | |
self._addr = addr or self._addr | |
self._conn_callback = callback | |
if self._addr_type is None or self._addr is None: | |
return False | |
self._ble.gap_connect(self._addr_type, self._addr) | |
return True | |
# Disconnect from current device. | |
def disconnect(self): | |
if not self._conn_handle: | |
return | |
self._ble.gap_disconnect(self._conn_handle) | |
self._reset() | |
# Issues an (asynchronous) read, will invoke callback with data. | |
def read(self, callback): | |
if not self.is_connected(): | |
return | |
self._read_callback = callback | |
self._ble.gattc_read(self._conn_handle, self._value_handle) | |
# Sets a callback to be invoked when the device notifies us. | |
def on_notify(self, callback): | |
self._notify_callback = callback | |
def _update_value(self, data): | |
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius. | |
self._value = struct.unpack("<h", data)[0] / 100 | |
return self._value | |
def value(self): | |
return self._value | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Helpers for generating BLE advertising payloads. | |
from micropython import const | |
import struct | |
import bluetooth | |
# Advertising payloads are repeated packets of the following form: | |
# 1 byte data length (N + 1) | |
# 1 byte type (see constants below) | |
# N bytes type-specific data | |
_ADV_TYPE_FLAGS = const(0x01) | |
_ADV_TYPE_NAME = const(0x09) | |
_ADV_TYPE_UUID16_COMPLETE = const(0x3) | |
_ADV_TYPE_UUID32_COMPLETE = const(0x5) | |
_ADV_TYPE_UUID128_COMPLETE = const(0x7) | |
_ADV_TYPE_UUID16_MORE = const(0x2) | |
_ADV_TYPE_UUID32_MORE = const(0x4) | |
_ADV_TYPE_UUID128_MORE = const(0x6) | |
_ADV_TYPE_APPEARANCE = const(0x19) | |
# Generate a payload to be passed to gap_advertise(adv_data=...). | |
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): | |
payload = bytearray() | |
def _append(adv_type, value): | |
nonlocal payload | |
payload += struct.pack("BB", len(value) + 1, adv_type) + value | |
_append( | |
_ADV_TYPE_FLAGS, | |
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), | |
) | |
if name: | |
_append(_ADV_TYPE_NAME, name) | |
if services: | |
for uuid in services: | |
b = bytes(uuid) | |
if len(b) == 2: | |
_append(_ADV_TYPE_UUID16_COMPLETE, b) | |
elif len(b) == 4: | |
_append(_ADV_TYPE_UUID32_COMPLETE, b) | |
elif len(b) == 16: | |
_append(_ADV_TYPE_UUID128_COMPLETE, b) | |
# See org.bluetooth.characteristic.gap.appearance.xml | |
if appearance: | |
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance)) | |
return payload | |
def decode_field(payload, adv_type): | |
i = 0 | |
result = [] | |
while i + 1 < len(payload): | |
if payload[i + 1] == adv_type: | |
result.append(payload[i + 2 : i + payload[i] + 1]) | |
i += 1 + payload[i] | |
return result | |
def decode_name(payload): | |
n = decode_field(payload, _ADV_TYPE_NAME) | |
return str(n[0], "utf-8") if n else "" | |
def decode_services(payload): | |
services = [] | |
for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE): | |
services.append(bluetooth.UUID(struct.unpack("<h", u)[0])) | |
for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE): | |
services.append(bluetooth.UUID(struct.unpack("<d", u)[0])) | |
for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE): | |
services.append(bluetooth.UUID(u)) | |
return services | |
def demo(): | |
payload = advertising_payload( | |
name="micropython", | |
services=[bluetooth.UUID(0x181A), bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")], | |
) | |
print(payload) | |
print(decode_name(payload)) | |
print(decode_services(payload)) | |
if __name__ == "__main__": | |
demo() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bluetooth | |
from ble import BLETemperatureCentral | |
import time | |
from umqtt.simple import MQTTClient | |
import config | |
import micropython | |
import gc | |
CACERT_PATH = "certs/aws-root-ca.pem" | |
KEY_PATH = "certs/private.pem.key" | |
CERT_PATH = "certs/certificate.pem.crt" | |
class Test: | |
def __init__(self) -> None: | |
micropython.mem_info() | |
self.central = None | |
self.mqtt = None | |
def close(self): | |
print("Closing") | |
micropython.mem_info() | |
if self.central: | |
self.central.disconnect() | |
if self.mqtt: | |
self.mqtt.disconnect() | |
def run(self): | |
micropython.mem_info() | |
self.mqtt = self.connect_mqtt(config.IOT_ENDPOINT, config.IOT_CLIENT_ID) | |
micropython.mem_info() | |
self.central = self.init_bt() | |
while True: | |
if self.central: | |
while self.central.is_connected(): | |
print(self.central.value()) | |
time.sleep(1) | |
time.sleep(1) | |
def init_bt(self): | |
ble = bluetooth.BLE() | |
central = BLETemperatureCentral(ble) | |
not_found = False | |
def on_scan(addr_type, addr, name): | |
if addr_type is not None: | |
print("Found sensor:", addr_type, addr, name) | |
central.connect() | |
else: | |
nonlocal not_found | |
not_found = True | |
print("No sensor found.") | |
central.scan(callback=on_scan) | |
# Wait for connection... | |
while not central.is_connected(): | |
time.sleep_ms(100) | |
if not_found: | |
return | |
print("Connected") | |
return central | |
def connect_mqtt(self, endpoint, client_id): | |
with open(KEY_PATH, "r") as f: | |
key = f.read() | |
with open(CERT_PATH, "r") as f: | |
cert = f.read() | |
client = MQTTClient( | |
client_id=client_id, | |
server=endpoint, | |
port=8883, | |
keepalive=1200, | |
ssl=True, | |
ssl_params={"key": key, "cert": cert, "server_side": False}, | |
) | |
print("Connecting to MQTT") | |
client.connect() | |
print("MQTT Connected") | |
return client | |
if __name__ == "__main__": | |
test = Test() | |
try: | |
test.run() | |
finally: | |
test.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment