-
-
Save Yatekii/f6a762b48bec9570af34c9b91fa3860a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
from bluezero.adapter import Adapter | |
from bluezero.central import Central | |
from bluezero import dbus_tools | |
from bluezero import constants | |
from bluezero import async_tools | |
import time | |
from gi.repository import GLib | |
from zigfred import Zigfred, Fleet, protocol | |
class Stopwatch: | |
def __init__(self): | |
self._timestamp = time.time() | |
def elapsed(self, label=None): | |
elapsed = time.time() - self._timestamp | |
self._timestamp = time.time() | |
if label is not None: | |
print("{}: {}".format(elapsed, label)) | |
else: | |
print("{}".format(elapsed)) | |
return elapsed | |
class App: | |
def __init__(self): | |
self.stopwatch = Stopwatch() | |
self.zigfred = None | |
self.fleet = None | |
def on_message(self, message): | |
print(message, flush=True) | |
self.stopwatch.elapsed("Message received") | |
def on_device_found(self, device): | |
self.zigfred = device | |
self.stopwatch.elapsed("Found a zigfred") | |
self.zigfred.on_message = self.on_message | |
self.zigfred.connect() | |
self.stopwatch.elapsed("Connected") | |
if self.zigfred.connected: | |
print("We are connected!", flush=True) | |
def idle_z(zigfred): | |
self.stopwatch.elapsed("Ask for status") | |
message = protocol.zigfred_pb2.Message() | |
message.code = protocol.zigfred_pb2.Message.MessageCode.COMMAND | |
message.command.id = 1 | |
message.command.code = protocol.zigfred_pb2.Command.CommandCode.STATUS | |
c = message.SerializeToString() | |
self.stopwatch.elapsed("Serialized") | |
zigfred.message(c) | |
zigfred.open_relay() | |
zigfred.close_relay() | |
zigfred.set_dimmer(0) | |
zigfred.set_dimmer(5) | |
self.stopwatch.elapsed("Sent message") | |
GLib.timeout_add(300, idle_z, self.zigfred) | |
else: | |
print("We failed to connect!") | |
print("Status done", flush=True) | |
def idle(self, eventloop): | |
print(self.fleet.start_discovery()) | |
print("Started scanning") | |
def run(self): | |
print("Running ...") | |
self.fleet = Fleet("B8:27:EB:2D:36:3D") | |
self.fleet.on_device_found = self.on_device_found | |
GLib.idle_add(self.idle, self.fleet) | |
self.fleet.run_async() | |
if __name__ == "__main__": | |
app = App() | |
app.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
from bluezero import central | |
from bluezero import tools | |
from bluezero import constants | |
from bluezero import adapter | |
from .protocol.zigfred_pb2 import Message | |
UUID = "34d6{}-7201-4e50-88c0-5cce4f93cbbc" | |
TX = UUID.format("0002") | |
RX = UUID.format("0003") | |
DIMMER = UUID.format("0004") | |
RELAY = UUID.format("0005") | |
ZIGFRED_API = UUID.format("0001") | |
MAX_MESSAGE_LENGTH = 244 | |
class Zigfred: | |
""" | |
Class to communicate with Zigfred and read its data via BLE. | |
""" | |
def __init__(self, device_address, adapter_address=None): | |
""" | |
Initialization of an instance of a remote zigfred | |
:param device_addr: Discovered microbit device with this address | |
:param adapter_addr: Optional unless you have more than one adapter | |
on your machine | |
""" | |
# At the start we do not do anything when a message is received. | |
self._on_message = None | |
self._zigfred = central.Central( | |
adapter_addr=adapter_address, device_addr=device_address | |
) | |
# zigfred's TX is our RX side and zigfred's RX is our TX! | |
self._zigfred_rx = self._zigfred.add_characteristic(ZIGFRED_API, TX) | |
self._zigfred_tx = self._zigfred.add_characteristic(ZIGFRED_API, RX) | |
self._zigfred_dimmer = self._zigfred.add_characteristic(ZIGFRED_API, DIMMER) | |
self._zigfred_relay = self._zigfred.add_characteristic(ZIGFRED_API, RELAY) | |
def _uart_read(self, iface, changed_props, invalidated_props): | |
# If this is not a characteristic notification, skip. | |
if iface != constants.GATT_CHRC_IFACE: | |
return | |
# If the canged property was the "Value" property, we take the received data and decode it. | |
if "Value" in changed_props: | |
message = Message() | |
message.ParseFromString(bytes(changed_props["Value"][1:])) | |
if self._on_message is not None: | |
self._on_message(message) | |
@property | |
def name(self): | |
"""Read the devices Bluetooth address""" | |
return self._zigfred.rmt_device.name | |
@property | |
def address(self): | |
"""Read the devices Bluetooth address""" | |
return self._zigfred.rmt_device.address | |
@property | |
def connected(self): | |
"""Indicate whether the remote device is currently connected.""" | |
return self._zigfred.connected | |
def connect(self): | |
""" | |
Connect to the specified zigfred for this instance | |
""" | |
self._zigfred.connect() | |
# Resolving GATT can take a few tries, so do this spinning. | |
while not self._zigfred_rx.resolve_gatt(): | |
sleep(0.1) | |
# We register a callback for when we receive data and enable notifications for when data comes in. | |
self._zigfred_tx.add_characteristic_cb(self._uart_read) | |
self._zigfred_tx.start_notify() | |
def disconnect(self): | |
""" | |
Disconnect from the zigfred | |
""" | |
self._zigfred.disconnect() | |
@property | |
def on_message(self): | |
return self._on_message | |
@on_message.setter | |
def on_message(self, callback): | |
""" | |
Registers the given callback to execute when a new zigfred protocol message is received. | |
Parameters | |
---------- | |
callback : function(device_address, message) | |
The function that is called when a new message arrives. | |
It is given the device address that received the message as well as the received message. | |
""" | |
self._on_message = callback | |
def message(self, message): | |
data = [0x02 | 0x04] | |
text = "" | |
if len(message) > MAX_MESSAGE_LENGTH: | |
text = message[:MAX_MESSAGE_LENGTH] | |
else: | |
text = message | |
for letter in text: | |
data.append(letter) | |
self._zigfred_rx.value = data | |
def open_relay(self): | |
self._zigfred_relay.value = 0x00 | |
def close_relay(self): | |
self._zigfred_relay.value = 0x01 | |
def set_dimmer(self, value): | |
self._zigfred_dimmer.value = value | |
@property | |
def on_disconnect(self): | |
"""Add callback for on_disconnect action""" | |
return self._zigfred.dongle.on_disconnect | |
@on_disconnect.setter | |
def on_disconnect(self, callback): | |
self._zigfred.dongle.on_disconnect = callback | |
def run_async(self): | |
""" | |
Puts the code into asynchronous mode | |
""" | |
self._zigfred.run() | |
def quit_async(self): | |
""" | |
Stops asynchronous mode | |
""" | |
self._zigfred.quit() | |
class Fleet: | |
def __init__(self, adapter_address=None): | |
self._adapter = adapter.Adapter(adapter_addr=adapter_address) | |
for device in central.Central.available(): | |
if device.name == "zigfred" and not device.paired: | |
self._adapter.remove_device(device.remote_device_path) | |
# Always show found devices that are nearby, even if it was found before. | |
self._adapter.show_duplicates() | |
@property | |
def on_connect(self): | |
"""Add callback for on_disconnect action""" | |
return self._adapter.on_connect | |
@on_connect.setter | |
def on_connect(self, callback): | |
self._adapter.on_connect = callback | |
@property | |
def on_device_found(self): | |
"""Add callback for on_disconnect action""" | |
return self._adapter.on_device_found | |
@on_device_found.setter | |
def on_device_found(self, callback): | |
def wrapper(device): | |
if device.name == "zigfred": | |
callback(Zigfred(device.address)) | |
self._adapter.on_device_found = wrapper | |
def start_discovery(self): | |
""" | |
Start discovery of nearby Bluetooth devices. | |
""" | |
self._adapter.start_discovery() | |
def stop_discovery(self): | |
"""Stop scanning of nearby Bluetooth devices.""" | |
self._adapter.stop_discovery() | |
def run_async(self): | |
""" | |
Puts the code into asynchronous mode | |
""" | |
self._adapter.run() | |
def quit_async(self): | |
""" | |
Stops asynchronous mode | |
""" | |
self._adapter.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment