Skip to content

Instantly share code, notes, and snippets.

@Yatekii

Yatekii/run.py Secret

Created January 21, 2021 11:43
Show Gist options
  • Save Yatekii/f6a762b48bec9570af34c9b91fa3860a to your computer and use it in GitHub Desktop.
Save Yatekii/f6a762b48bec9570af34c9b91fa3860a to your computer and use it in GitHub Desktop.
import sys
import os
from bluezero.adapter import Adapter
from bluezero.central import Central
from bluezero import dbus_tools
from bluezero import constants
from bluezero import async_tools
import time
from gi.repository import GLib
from zigfred import Zigfred, Fleet, protocol
class Stopwatch:
def __init__(self):
self._timestamp = time.time()
def elapsed(self, label=None):
elapsed = time.time() - self._timestamp
self._timestamp = time.time()
if label is not None:
print("{}: {}".format(elapsed, label))
else:
print("{}".format(elapsed))
return elapsed
class App:
def __init__(self):
self.stopwatch = Stopwatch()
self.zigfred = None
self.fleet = None
def on_message(self, message):
print(message, flush=True)
self.stopwatch.elapsed("Message received")
def on_device_found(self, device):
self.zigfred = device
self.stopwatch.elapsed("Found a zigfred")
self.zigfred.on_message = self.on_message
self.zigfred.connect()
self.stopwatch.elapsed("Connected")
if self.zigfred.connected:
print("We are connected!", flush=True)
def idle_z(zigfred):
self.stopwatch.elapsed("Ask for status")
message = protocol.zigfred_pb2.Message()
message.code = protocol.zigfred_pb2.Message.MessageCode.COMMAND
message.command.id = 1
message.command.code = protocol.zigfred_pb2.Command.CommandCode.STATUS
c = message.SerializeToString()
self.stopwatch.elapsed("Serialized")
zigfred.message(c)
zigfred.open_relay()
zigfred.close_relay()
zigfred.set_dimmer(0)
zigfred.set_dimmer(5)
self.stopwatch.elapsed("Sent message")
GLib.timeout_add(300, idle_z, self.zigfred)
else:
print("We failed to connect!")
print("Status done", flush=True)
def idle(self, eventloop):
print(self.fleet.start_discovery())
print("Started scanning")
def run(self):
print("Running ...")
self.fleet = Fleet("B8:27:EB:2D:36:3D")
self.fleet.on_device_found = self.on_device_found
GLib.idle_add(self.idle, self.fleet)
self.fleet.run_async()
if __name__ == "__main__":
app = App()
app.run()
from time import sleep
from bluezero import central
from bluezero import tools
from bluezero import constants
from bluezero import adapter
from .protocol.zigfred_pb2 import Message
UUID = "34d6{}-7201-4e50-88c0-5cce4f93cbbc"
TX = UUID.format("0002")
RX = UUID.format("0003")
DIMMER = UUID.format("0004")
RELAY = UUID.format("0005")
ZIGFRED_API = UUID.format("0001")
MAX_MESSAGE_LENGTH = 244
class Zigfred:
"""
Class to communicate with Zigfred and read its data via BLE.
"""
def __init__(self, device_address, adapter_address=None):
"""
Initialization of an instance of a remote zigfred
:param device_addr: Discovered microbit device with this address
:param adapter_addr: Optional unless you have more than one adapter
on your machine
"""
# At the start we do not do anything when a message is received.
self._on_message = None
self._zigfred = central.Central(
adapter_addr=adapter_address, device_addr=device_address
)
# zigfred's TX is our RX side and zigfred's RX is our TX!
self._zigfred_rx = self._zigfred.add_characteristic(ZIGFRED_API, TX)
self._zigfred_tx = self._zigfred.add_characteristic(ZIGFRED_API, RX)
self._zigfred_dimmer = self._zigfred.add_characteristic(ZIGFRED_API, DIMMER)
self._zigfred_relay = self._zigfred.add_characteristic(ZIGFRED_API, RELAY)
def _uart_read(self, iface, changed_props, invalidated_props):
# If this is not a characteristic notification, skip.
if iface != constants.GATT_CHRC_IFACE:
return
# If the canged property was the "Value" property, we take the received data and decode it.
if "Value" in changed_props:
message = Message()
message.ParseFromString(bytes(changed_props["Value"][1:]))
if self._on_message is not None:
self._on_message(message)
@property
def name(self):
"""Read the devices Bluetooth address"""
return self._zigfred.rmt_device.name
@property
def address(self):
"""Read the devices Bluetooth address"""
return self._zigfred.rmt_device.address
@property
def connected(self):
"""Indicate whether the remote device is currently connected."""
return self._zigfred.connected
def connect(self):
"""
Connect to the specified zigfred for this instance
"""
self._zigfred.connect()
# Resolving GATT can take a few tries, so do this spinning.
while not self._zigfred_rx.resolve_gatt():
sleep(0.1)
# We register a callback for when we receive data and enable notifications for when data comes in.
self._zigfred_tx.add_characteristic_cb(self._uart_read)
self._zigfred_tx.start_notify()
def disconnect(self):
"""
Disconnect from the zigfred
"""
self._zigfred.disconnect()
@property
def on_message(self):
return self._on_message
@on_message.setter
def on_message(self, callback):
"""
Registers the given callback to execute when a new zigfred protocol message is received.
Parameters
----------
callback : function(device_address, message)
The function that is called when a new message arrives.
It is given the device address that received the message as well as the received message.
"""
self._on_message = callback
def message(self, message):
data = [0x02 | 0x04]
text = ""
if len(message) > MAX_MESSAGE_LENGTH:
text = message[:MAX_MESSAGE_LENGTH]
else:
text = message
for letter in text:
data.append(letter)
self._zigfred_rx.value = data
def open_relay(self):
self._zigfred_relay.value = 0x00
def close_relay(self):
self._zigfred_relay.value = 0x01
def set_dimmer(self, value):
self._zigfred_dimmer.value = value
@property
def on_disconnect(self):
"""Add callback for on_disconnect action"""
return self._zigfred.dongle.on_disconnect
@on_disconnect.setter
def on_disconnect(self, callback):
self._zigfred.dongle.on_disconnect = callback
def run_async(self):
"""
Puts the code into asynchronous mode
"""
self._zigfred.run()
def quit_async(self):
"""
Stops asynchronous mode
"""
self._zigfred.quit()
class Fleet:
def __init__(self, adapter_address=None):
self._adapter = adapter.Adapter(adapter_addr=adapter_address)
for device in central.Central.available():
if device.name == "zigfred" and not device.paired:
self._adapter.remove_device(device.remote_device_path)
# Always show found devices that are nearby, even if it was found before.
self._adapter.show_duplicates()
@property
def on_connect(self):
"""Add callback for on_disconnect action"""
return self._adapter.on_connect
@on_connect.setter
def on_connect(self, callback):
self._adapter.on_connect = callback
@property
def on_device_found(self):
"""Add callback for on_disconnect action"""
return self._adapter.on_device_found
@on_device_found.setter
def on_device_found(self, callback):
def wrapper(device):
if device.name == "zigfred":
callback(Zigfred(device.address))
self._adapter.on_device_found = wrapper
def start_discovery(self):
"""
Start discovery of nearby Bluetooth devices.
"""
self._adapter.start_discovery()
def stop_discovery(self):
"""Stop scanning of nearby Bluetooth devices."""
self._adapter.stop_discovery()
def run_async(self):
"""
Puts the code into asynchronous mode
"""
self._adapter.run()
def quit_async(self):
"""
Stops asynchronous mode
"""
self._adapter.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment