Created
November 29, 2023 13:26
-
-
Save TinyKitten/e685c6744c167ed4cf6c6a9ec638cf97 to your computer and use it in GitHub Desktop.
Badger 2040 W BLE Peripheral(for TrainLCD App)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from micropython import const | |
import struct | |
import bluetooth | |
# Advertising payloads are repeated packets of the following form: | |
# 1 byte data length (N + 1) | |
# 1 byte type (see constants below) | |
# N bytes type-specific data | |
_ADV_TYPE_FLAGS = const(0x01) | |
_ADV_TYPE_NAME = const(0x09) | |
_ADV_TYPE_UUID16_COMPLETE = const(0x3) | |
_ADV_TYPE_UUID32_COMPLETE = const(0x5) | |
_ADV_TYPE_UUID128_COMPLETE = const(0x7) | |
_ADV_TYPE_UUID16_MORE = const(0x2) | |
_ADV_TYPE_UUID32_MORE = const(0x4) | |
_ADV_TYPE_UUID128_MORE = const(0x6) | |
_ADV_TYPE_APPEARANCE = const(0x19) | |
_ADV_MAX_PAYLOAD = const(31) | |
# Generate a payload to be passed to gap_advertise(adv_data=...). | |
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): | |
payload = bytearray() | |
def _append(adv_type, value): | |
nonlocal payload | |
payload += struct.pack("BB", len(value) + 1, adv_type) + value | |
_append( | |
_ADV_TYPE_FLAGS, | |
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), | |
) | |
if name: | |
_append(_ADV_TYPE_NAME, name) | |
if services: | |
for uuid in services: | |
b = bytes(uuid) | |
if len(b) == 2: | |
_append(_ADV_TYPE_UUID16_COMPLETE, b) | |
elif len(b) == 4: | |
_append(_ADV_TYPE_UUID32_COMPLETE, b) | |
elif len(b) == 16: | |
_append(_ADV_TYPE_UUID128_COMPLETE, b) | |
# See org.bluetooth.characteristic.gap.appearance.xml | |
if appearance: | |
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance)) | |
if len(payload) > _ADV_MAX_PAYLOAD: | |
raise ValueError("advertising payload too large") | |
return payload | |
def decode_field(payload, adv_type): | |
i = 0 | |
result = [] | |
while i + 1 < len(payload): | |
if payload[i + 1] == adv_type: | |
result.append(payload[i + 2 : i + payload[i] + 1]) | |
i += 1 + payload[i] | |
return result | |
def decode_name(payload): | |
n = decode_field(payload, _ADV_TYPE_NAME) | |
return str(n[0], "utf-8") if n else "" | |
def decode_services(payload): | |
services = [] | |
for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE): | |
services.append(bluetooth.UUID(struct.unpack("<h", u)[0])) | |
for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE): | |
services.append(bluetooth.UUID(struct.unpack("<d", u)[0])) | |
for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE): | |
services.append(bluetooth.UUID(u)) | |
return services |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bluetooth | |
import random | |
import struct | |
import time | |
from ble_advertising import advertising_payload | |
from micropython import const | |
_IRQ_CENTRAL_CONNECT = const(1) | |
_IRQ_CENTRAL_DISCONNECT = const(2) | |
_IRQ_GATTS_WRITE = const(3) | |
_FLAG_READ = const(0x0002) | |
_FLAG_WRITE_NO_RESPONSE = const(0x0004) | |
_FLAG_WRITE = const(0x0008) | |
_FLAG_NOTIFY = const(0x0010) | |
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") | |
_UART_TX = ( | |
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"), | |
_FLAG_READ | _FLAG_NOTIFY, | |
) | |
_UART_RX = ( | |
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"), | |
_FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE, | |
) | |
_UART_SERVICE = ( | |
_UART_UUID, | |
(_UART_TX, _UART_RX), | |
) | |
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128) | |
class BLESimplePeripheral: | |
def __init__(self, ble, name="bgr2040", rxbuf=256): | |
self._ble = ble | |
self._ble.active(True) | |
self._ble.irq(self._irq) | |
((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services((_UART_SERVICE,)) | |
# Increase the size of the rx buffer and enable append mode. | |
self._ble.gatts_set_buffer(self._rx_handle, rxbuf, True) | |
self._connections = set() | |
self._rx_buffer = bytearray() | |
self._handler = None | |
# Optionally add services=[_UART_UUID], but this is likely to make the payload too large. | |
self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER) | |
self._advertise() | |
def irq(self, handler): | |
self._handler = handler | |
def _irq(self, event, data): | |
# Track connections so we can send notifications. | |
if event == _IRQ_CENTRAL_CONNECT: | |
conn_handle, _, _ = data | |
self._connections.add(conn_handle) | |
elif event == _IRQ_CENTRAL_DISCONNECT: | |
conn_handle, _, _ = data | |
if conn_handle in self._connections: | |
self._connections.remove(conn_handle) | |
# Start advertising again to allow a new connection. | |
self._advertise() | |
elif event == _IRQ_GATTS_WRITE: | |
conn_handle, value_handle = data | |
if conn_handle in self._connections and value_handle == self._rx_handle: | |
self._rx_buffer += self._ble.gatts_read(self._rx_handle) | |
if self._handler: | |
self._handler() | |
def any(self): | |
return len(self._rx_buffer) | |
def read(self, sz=None): | |
if not sz: | |
sz = len(self._rx_buffer) | |
result = self._rx_buffer[0:sz] | |
self._rx_buffer = self._rx_buffer[sz:] | |
return result | |
def write(self, data): | |
for conn_handle in self._connections: | |
self._ble.gatts_notify(conn_handle, self._tx_handle, data) | |
def close(self): | |
for conn_handle in self._connections: | |
self._ble.gap_disconnect(conn_handle) | |
self._connections.clear() | |
def is_connected(self): | |
return len(self._connections) > 0 | |
def _advertise(self, interval_us=500000): | |
self._ble.gap_advertise(interval_us, adv_data=self._payload) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import micropython | |
import badger2040 | |
from badger2040 import WIDTH, HEIGHT, UPDATE_FAST | |
import bluetooth | |
from ble_peripheral import BLESimplePeripheral | |
import network | |
import time | |
TEXT_SIZE = 2 | |
LINE_HEIGHT = 20 | |
SPACER_HEIGHT = 4 | |
PADDING = 5 | |
HEADER_HEIGHT = 44 | |
choiced_station_count = 0 | |
required_page_count = 0 | |
scroll_index = 0 | |
state = { | |
"stopping_state": "Now", | |
"station": { | |
"station_name": "Waiting for payload", | |
"station_number": "N/A", | |
"lines": "N/A" | |
}, | |
"passing_station": None | |
} | |
wlan = network.WLAN(network.STA_IF) | |
wlan.disconnect() | |
wlan.active(False) | |
wlan.deinit() | |
wlan = None | |
display = badger2040.Badger2040() | |
display.led(128) | |
display.set_update_speed(UPDATE_FAST) | |
display.set_font("bitmap8") | |
def draw_header(): | |
display.set_pen(0) | |
display.rectangle(0, 0, WIDTH, HEADER_HEIGHT) | |
display.set_pen(15) | |
display.text(state["stopping_state"], PADDING, PADDING, WIDTH - PADDING, 2) | |
display.text(state["station"]["station_name"], WIDTH - display.measure_text(state["station"]["station_name"], 2) - PADDING, 4, WIDTH - PADDING, 2) | |
display.text("Number", PADDING, 24, WIDTH - PADDING, 2) | |
display.text(state["station"]["station_number"], WIDTH - display.measure_text(state["station"]["station_number"], 2) - PADDING, 24, WIDTH - PADDING, 2) | |
def init_display(): | |
display.set_pen(15) | |
display.clear() | |
def show_passing_station(passing_station): | |
passing_panel_height = int(HEADER_HEIGHT / 2) | |
display.set_pen(0) | |
display.rectangle(0, HEIGHT - passing_panel_height, WIDTH, passing_panel_height) | |
display.set_pen(15) | |
display.text("Passing", PADDING, PADDING + HEIGHT - passing_panel_height, WIDTH - PADDING, 2) | |
display.text(passing_station["station_name"], WIDTH - display.measure_text(passing_station["station_name"], 2) - PADDING, PADDING + HEIGHT - passing_panel_height, WIDTH - PADDING, 2) | |
def update_display(): | |
init_display() | |
display.set_pen(0) | |
y = (SPACER_HEIGHT + LINE_HEIGHT * -scroll_index) | |
if scroll_index == 0: | |
y += 44 | |
else: | |
y -= (scroll_index * 10) + 2 | |
joined_lines_str = state["station"]["lines"] | |
global required_page_count | |
if required_page_count < 1: | |
required_page_count = display.measure_text(joined_lines_str) / (HEIGHT - HEADER_HEIGHT) / 3 | |
display.text("Transfers:", PADDING, y, WIDTH - PADDING, TEXT_SIZE) | |
y += LINE_HEIGHT | |
display.text(joined_lines_str, PADDING, y, WIDTH - PADDING, TEXT_SIZE) | |
draw_header() | |
if state["passing_station"] != None: | |
show_passing_station(state["passing_station"]) | |
display.update() | |
ble = bluetooth.BLE() | |
p = BLESimplePeripheral(ble) | |
def on_rx(): | |
v = p.read() | |
decoded_value = v.decode('utf-8') | |
[stopping_state, station_name, station_number, lines, passing_station_name] = decoded_value.split('\n') | |
passing_station = None | |
if passing_station_name != "": | |
passing_station = { | |
"station_name": passing_station_name, | |
"station_number": "", | |
"lines": "" | |
} | |
global state | |
state = { | |
"stopping_state": stopping_state, | |
"station": { | |
"station_name": station_name, | |
"station_number": station_number, | |
"lines": lines | |
}, | |
"passing_station": passing_station | |
} | |
scroll_index = 0 | |
update_display() | |
p.irq(handler=on_rx) | |
i = 0 | |
update_display() | |
while True: | |
try: | |
display.keepalive() | |
if p.is_connected(): | |
if display.pressed(badger2040.BUTTON_UP) and required_page_count > scroll_index: | |
scroll_index += 1 | |
update_display() | |
if display.pressed(badger2040.BUTTON_DOWN) and scroll_index > 0: | |
scroll_index -= 1 | |
update_display() | |
# Short burst of queued notifications. | |
for _ in range(3): | |
data = str(i) + "_" | |
p.write(data) | |
i += 1 | |
time.sleep_ms(100) | |
if display.pressed(badger2040.BUTTON_UP) and required_page_count - scroll_index > 1: | |
scroll_index += 1 | |
update_display() | |
if display.pressed(badger2040.BUTTON_DOWN) and scroll_index > 0: | |
scroll_index -= 1 | |
update_display() | |
# display.halt() | |
except Exception as e: | |
print(e) | |
continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment