Skip to content

Instantly share code, notes, and snippets.

@charbonnierg
Last active December 9, 2022 15:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save charbonnierg/5a2b7439051a1cd12c38667c56f063c8 to your computer and use it in GitHub Desktop.
Save charbonnierg/5a2b7439051a1cd12c38667c56f063c8 to your computer and use it in GitHub Desktop.
Python IoT Hub

Python IoT Hub

This gist is an attempt to design a Python API to implement a distributed IoT solution composed of:

  • Several IoT Device and Station backends (Examples: (BLEDevice, BLEStation), (OPCUADevice, OPCUAStation), ...)
  • A single IoT Gateway implementation, which can be deployed as different instances, potentially running different backends.
  • A single IoT Hub service running somewhere in the cloud
  • A single IoT Hub client implementation which can be used by Python developers
  • Two wrappers RemoteDevice and RemoteStation useful when interacting with IoT devices and stations
"""Most of the code is not implemented yet.
All methods decorated with @abc.abstractmethod are not implemented.
"""
from __future__ import annotations
import abc
from enum import Enum
from typing import Any, Awaitable, Callable, Generic, Mapping, TypeVar
from ..functional.subscription import PubSub, ServiceProtocol, SubscriptionProtocol
class StationEventType(str, Enum):
"""All events which can be emitted by an IoT station."""
STARTED = "STARTED"
STOPPED = "STOPPED"
DISCOVERY_ENABLED = "DISCOVERY_ENABLED"
DISCOVERY_DISABLED = "DISCOVERY_DISABLED"
DISCOVERY_STARTED = "DISCOVERY_STARTED"
DISCOVERY_STOPPED = "DISCOVERY_STOPPED"
DEVICE_DISCOVERED = "DEVICE_DISCOVERED"
DEVICE_CONNECTED = "DEVICE_CONNECTED"
DEVICE_DISCONNECTED = "DEVICE_DISCONNECTED"
PROPS_CHANGED = "PROPS_CHANGED"
class DeviceEventType(str, Enum):
CONNECTED = "CONNECTED"
DISCONNECTED = "DISCONNECTED"
TIME_SET = "TIME_SET"
TIME_SYNCED = "SYNCED"
NOTIFIED = "NOTIFIED"
NOTIFY_STARTED = "NOTIFY_STARTED"
NOTIFY_STOPPED = "NOTIFY_STOPPED"
PROPS_CHANGED = "PROPS_CHANGED"
class DeviceRequestType(str, Enum):
"""Enumeration of requests types available for IoT devices."""
RESET = "RESET"
DISCONNECT = "DISCONNECT"
START_NOTIFY = "START_NOTIFY"
STOP_NOTIFY = "STOP_NOTIFY"
SET_TIME = "SET_TIME"
SYNC_TIME = "SYNC_TIME"
READ_VALUE = "READ_VALUE"
WRITE_VALUE = "WRITE_VALUE"
class StationRequestType(str, Enum):
"""Enumeration of requests types available for stations."""
START = "START"
STOP = "STOP"
CONNECT = "CONNECT"
DISCONNECT = "DISCONNECT"
START_DISCOVERY = "START_DISCOVERY"
STOP_DISCOVERY = "STOP_DISCOVERY"
ENABLE_DISCOVERY = "ENABLE_DISCOVERY"
DISABLE_DISCOVERY = "DISABLE_DISCOVERY"
class StationQueryType(str, Enum):
"""Enumeration of query types available for stations."""
GET_PROPS = "GET_PROPS"
GET_KNOWN_DEVICES = "GET_KNOWN_DEVICES"
GET_CONNECTED_DEVICES = "GET_CONNECTED_DEVICES"
GET_DISCOVERED_DEVICES = "GET_DISCOVERED_DEVICES"
LIST_KNOWN_DEVICES = "LIST_KNOWN_DEVICES"
LIST_CONNECTED_DEVICES = "LIST_CONNECTED_DEVICES"
LIST_DISCOVERED_DEVICES = "LIST_DISCOVERED_DEVICES"
GET_DEVICE_PROPS = "GET_DEVICE_PROPS"
GET_DEVICE_KEYS = "GET_DEVICE_KEYS"
class HubQueryType(str, Enum):
"""Enumeration of allowed queries for hub"""
LIST_DEVICES = "LIST_DEVICES"
LIST_STATIONS = "LIST_STATIONS"
FIND_DEVICES = "FIND_DEVICES"
FIND_STATIONS = "FIND_STATIONS"
GET_DEVICE = "GET_DEVICE"
GET_STATION = "GET_STATION"
class StationEvent:
"""An event emitted by an IoT station"""
def __init__(self, data: Mapping[str, Any], type: StationEventType) -> None:
self.data = data
self.type = type
class DeviceEvent:
"""An event emitted by an IoT device."""
def __init__(self, data: Mapping[str, Any], type: StationRequestType) -> None:
self.data = data
self.type = type
class DeviceRequest:
"""A request to send to an IoT device."""
def __init__(
self,
type: DeviceRequestType,
options: Mapping[str, Any] | None = None,
deadline: float | None = None,
) -> None:
self.type = type
self.options = options
self.deadline = deadline
class StationRequest:
"""A request to send to an IoT station."""
def __init__(
self,
type: StationRequestType,
options: Mapping[str, Any] | None = None,
deadline: float | None = None,
) -> None:
self.type = type
self.options = options
self.deadline = deadline
class StationQuery:
"""A query to send to a station."""
def __init__(
self,
type: StationQueryType,
options: Mapping[str, Any] | None = None,
deadline: float | None = None,
) -> None:
self.type = type
self.options = options
self.deadline = deadline
class HubQuery:
def __init__(
self,
type: HubQueryType,
options: Mapping[str, Any] | None = None,
deadline: float | None = None,
) -> None:
self.type = type
self.options = options
self.deadline = deadline
DevicePropsT = TypeVar("DevicePropsT")
StationPropsT = TypeVar("StationPropsT")
class IoTDeviceBackend(Generic[DevicePropsT]):
"""Device backend interface."""
@abc.abstractmethod
def get_props(self) -> DevicePropsT:
"""Get device properties. Backend implementations are expected to update
properties asynchonously, but expose them synchronously."""
...
@abc.abstractmethod
def get_keys(self) -> list[str]:
...
@abc.abstractmethod
async def disconnect(self) -> None:
"""Force disconnection of device to any station it might be connected to."""
...
@abc.abstractmethod
async def set_time(self) -> None:
"""Set time on device to current time.
Implementation may vary between backends.
"""
...
@abc.abstractmethod
async def sync_time(self) -> None:
"""Sync time on device to current time.
Implementation may vary between backends.
"""
...
@abc.abstractmethod
async def read_value(self, key: str) -> bytearray:
"""Read a value as bytearray under given key."""
...
@abc.abstractmethod
async def write_value(self, key: str, value: bytearray) -> int:
"""Write a value and return number of bytes written under given key."""
...
@abc.abstractmethod
async def enable_notifications(self, key: str) -> None:
"""Start notifications for given key."""
...
@abc.abstractmethod
async def disable_notifications(self, key: str) -> None:
"""Stop notifications for given key."""
...
@abc.abstractmethod
async def reset(self) -> None:
...
class IoTStationBackend(Generic[StationPropsT, DevicePropsT]):
"""Station backend interface."""
@abc.abstractmethod
def get_props(self) -> StationPropsT:
...
@abc.abstractmethod
def get_device(self, device: str) -> IoTDeviceBackend[DevicePropsT]:
...
@abc.abstractmethod
def list_discovered_devices(self) -> list[str]:
...
@abc.abstractmethod
def list_connected_devices(self) -> list[str]:
...
@abc.abstractmethod
def get_discovered_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]:
"""Discovered devices, excluding connected devices."""
...
@abc.abstractmethod
def get_connected_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]:
"""Connected devices, excluding discovered devices."""
...
@abc.abstractmethod
async def start(self) -> None:
...
@abc.abstractmethod
async def stop(self) -> None:
...
@abc.abstractmethod
async def connect(self, device: str) -> None:
...
@abc.abstractmethod
async def disconnect(self, device: str) -> None:
...
@abc.abstractmethod
async def start_discovery(self) -> None:
...
@abc.abstractmethod
async def stop_discovery(self) -> None:
...
@abc.abstractmethod
async def enable_discovery(self) -> None:
...
@abc.abstractmethod
async def disable_discovery(self) -> None:
...
@abc.abstractmethod
def set_workstation_event_sink(
self,
type: StationEventType | None,
sink: Callable[[StationEvent], None] | None,
) -> None:
...
@abc.abstractmethod
def set_device_event_sink(
self,
type: DeviceEventType | None,
sink: Callable[[DeviceEvent], None] | None,
) -> None:
...
class IoTGateway(Generic[StationPropsT, DevicePropsT]):
@abc.abstractmethod
def get_name(self) -> str:
...
@abc.abstractmethod
def get_pubsub(self) -> PubSub:
...
@abc.abstractmethod
def get_backend(self) -> IoTStationBackend[StationPropsT, DevicePropsT]:
...
@abc.abstractmethod
def get_station_service(self) -> ServiceProtocol:
...
@abc.abstractmethod
def get_device_service(self, device: str) -> ServiceProtocol:
...
@abc.abstractmethod
def set_station_service(self, service: ServiceProtocol | None) -> None:
...
@abc.abstractmethod
def set_device_service(self, device: str, service: ServiceProtocol) -> None:
...
def create_station_requests_handler(
self,
) -> Callable[[StationRequest], Awaitable[None]]:
async def handler(request: StationRequest) -> None:
"""Handle a request directed to the station."""
backend = self.get_backend()
TYPE = StationRequestType(request.type)
if TYPE == StationRequestType.CONNECT:
device = request.options["device"]
return await backend.connect(device=device)
if TYPE == StationRequestType.DISCONNECT:
device = request.options["device"]
return await backend.disconnect(device=device)
if TYPE == StationRequestType.START_DISCOVERY:
return await backend.start_discovery()
if TYPE == StationRequestType.STOP_DISCOVERY:
return await backend.stop_discovery()
if TYPE == StationRequestType.ENABLE_DISCOVERY:
return await backend.enable_discovery()
if TYPE == StationRequestType.DISABLE_DISCOVERY:
return await backend.disable_discovery()
if TYPE == StationRequestType.START:
return await backend.start()
if TYPE == StationRequestType.STOP:
return await backend.stop()
raise NotImplementedError(f"Request type not supported yet: {TYPE}")
return handler
def create_device_requests_handler(
self, device: str
) -> Callable[[DeviceRequest], Awaitable[None]]:
async def handler(request: DeviceRequest) -> None:
"""Handle a request directed to a device."""
backend = self.get_backend()
device_backend = backend.get_device(device)
TYPE = request.type
if TYPE == DeviceRequestType.DISCONNECT:
return await device_backend.disconnect()
if TYPE == DeviceRequestType.SET_TIME:
return await device_backend.set_time()
if TYPE == DeviceRequestType.SYNC_TIME:
return await device_backend.sync_time()
if TYPE == DeviceRequestType.START_NOTIFY:
key = request.options["key"]
return await device_backend.enable_notifications(key)
if TYPE == DeviceRequestType.STOP_NOTIFY:
key = request.options["key"]
return await device_backend.disable_notifications(key)
if TYPE == DeviceRequestType.READ_VALUE:
target = request.options["key"]
return await device_backend.read_value(target)
if TYPE == DeviceRequestType.WRITE_VALUE:
target = request.options["key"]
value = request.options["value"]
return await device_backend.write_value(target, value)
if TYPE == DeviceRequestType.RESET:
return await device_backend.reset()
return handler
def create_station_query_handler(self) -> Callable[[StationQuery], Awaitable[Any]]:
backend = self.get_backend()
async def handler(query: StationQuery) -> Any:
TYPE = StationQueryType(query.type)
if TYPE == StationQueryType.GET_CONNECTED_DEVICES:
return await backend.get_connected_devices()
if TYPE == StationQueryType.GET_DISCOVERED_DEVICES:
return await backend.get_discovered_devices()
if TYPE == StationQueryType.LIST_CONNECTED_DEVICES:
return await backend.list_connected_devices()
if TYPE == StationQueryType.LIST_DISCOVERED_DEVICES:
return await backend.list_discovered_devices()
if TYPE == StationQueryType.GET_PROPS:
return await backend.get_props()
if TYPE == StationQueryType.GET_DEVICE_PROPS:
device = backend.get_device(query.options["device"])
return device.get_props()
if TYPE == StationQueryType.GET_DEVICE_KEYS:
device = backend.get_device(query.options["device"])
return device.get_keys()
raise NotImplementedError(f"Query type is not supported yet: {TYPE}")
return handler
def handle_device_event(self, event: DeviceEvent) -> None:
pubsub = self.get_pubsub()
pubsub.publish_no_wait("PUBLISH_EVENT_SUBJECT", event)
def handle_station_event(self, event: StationEvent) -> None:
pubsub = self.get_pubsub()
pubsub.publish_no_wait("PUBLISH_STATION_SUBJECT", event)
async def register_station(self, subject: str, device_subject: str) -> None:
try:
self.get_station_service()
raise Exception(f"A subscription already exist for this station")
except KeyError:
pass
pubsub = self.get_pubsub()
backend = self.get_backend()
# Register workstation request handler
handler = self.create_station_requests_handler()
subject = subject.format(station=self.get_name())
service = await pubsub.create_service(subject, handler)
self.set_station_service(service)
# Register query handler
query_handler = self.create_station_query_handler()
await pubsub.subscribe("QUERY_SUBJECT", query_handler)
# Register connected devices request handlers
for device in backend.list_connected_devices():
await self.register_device(device, device_subject)
async def unregister_workstation(self) -> None:
backend = self.get_backend()
# First unregister devices
for device in backend.list_known_devices():
await self.unregister_device(device)
# Then unregister station
try:
service = self.get_station_service()
await service.stop()
except KeyError:
pass
self.set_station_service(None)
async def register_device(self, device: str, subject: str) -> None:
try:
current_sid = self.get_device_service(device)
raise Exception(f"A subscription already exist for device: {current_sid}")
except KeyError:
pass
pubsub = self.get_pubsub()
handler = self.create_device_requests_handler(device)
subject = subject.format(device=device)
service = await pubsub.subscribe(subject, handler)
self.set_device_service(device, service)
async def unregister_device(self, device: str) -> None:
try:
service = self.get_device_service(device)
except KeyError:
return
await service.stop()
self.set_device_service(None)
async def start(self) -> None:
pubsub = self.get_pubsub()
backend = self.get_backend()
await pubsub.connect()
backend.set_device_event_sink(self.handle_device_event)
backend.set_workstation_event_sink(self.handle_station_event)
await self.register_station(
"WORKSTATION_SUBJECT.{station}.>", "DEVICE_SUBJECT.{device}.>"
)
async def stop(self) -> None:
pubsub = self.get_pubsub()
backend = self.get_backend()
await self.unregister_workstation()
backend.set_device_event_sink(None)
backend.set_workstation_event_sink(None)
await backend.disconnect()
await pubsub.disconnect()
class IoTHub(Generic[StationPropsT, DevicePropsT]):
@abc.abstractmethod
def get_pubsub(self) -> PubSub:
...
@abc.abstractmethod
def set_service(self, service: ServiceProtocol | None) -> None:
...
@abc.abstractmethod
def get_service(self) -> ServiceProtocol:
...
@abc.abstractmethod
def list_stations(self) -> list[str]:
...
@abc.abstractmethod
def list_devices(self) -> list[str]:
...
@abc.abstractmethod
def find_devices(self, filter: Mapping[str, Any], limit: int) -> list[DevicePropsT]:
...
@abc.abstractmethod
def find_stations(
self, filter: Mapping[str, Any], limit: int
) -> list[StationPropsT]:
...
@abc.abstractmethod
def get_device(self, device: str) -> DevicePropsT:
...
@abc.abstractmethod
def get_station(self, station: str) -> StationPropsT:
...
@abc.abstractmethod
async def process_device_event(self, event: DeviceEvent) -> None:
# Decide what to do with the event
...
@abc.abstractmethod
async def process_station_event(self, event: StationEvent) -> None:
# Decide what to do with the event
...
async def process_event(self, event: StationEvent | DeviceEvent) -> None:
if isinstance(event, DeviceEvent):
await self.process_device_event(event)
else:
await self.process_station_event(event)
async def process_query(self, query: HubQuery) -> Any:
# This is in-memory information retrieval, I.E, almost pure logic
TYPE = HubQueryType(query.type)
if TYPE == HubQueryType.FIND_DEVICES:
return self.find_devices(**query.options)
if TYPE == HubQueryType.FIND_STATIONS:
return self.find_stations(**query.options)
if TYPE == HubQueryType.GET_DEVICE:
return self.get_device(query.options["device"])
if TYPE == HubQueryType.GET_STATION:
return self.get_station(query.options["station"])
if TYPE == HubQueryType.LIST_DEVICES:
return self.list_devices(**query.options)
if TYPE == HubQueryType.LIST_STATIONS:
return self.list_stations(**query.options)
async def start(self) -> None:
try:
self.get_service()
raise Exception("A subscription already exists")
except KeyError:
pass
pubsub = self.get_pubsub()
await pubsub.connect()
service = await pubsub.subscribe("COMMON_PREFIX", self.process_event)
self.set_service(service)
async def stop(self) -> None:
pubsub = self.get_pubsub()
try:
service = self.get_service()
await service.stop()
except KeyError:
pass
await pubsub.disconnect()
class IoTHubClient:
"""Should depend only on pub/sub + subjects used in Gateway + Hub"""
async def list_devices(self) -> list[str]:
"""List existing devices"""
# FIXME: Implement using PUB/SUB
...
async def list_stations(self) -> list[str]:
"""List existing stations"""
# FIXME: Implement using PUB/SUB
...
async def device_request(self, device: str, request: DeviceRequest) -> Any:
"""Send a request to a device"""
# FIXME: Implement using PUB/SUB
...
async def station_query(self, query: StationQuery) -> Any:
"""Send a query to a station"""
# FIXME: Implement using PUB/SUB
...
async def station_request(self, station: str, request: StationRequest) -> Any:
"""Send a request to a station"""
# FIXME: Implement using PUB/SUB
...
async def subscribe_to_device_events(
self,
device: str | None = None,
type: DeviceEventType | None = None,
key: str | None = None,
) -> SubscriptionProtocol[DeviceEventType]:
# FIXME: Implement using PUB/SUB
...
async def subscribe_to_station_events(
self, station: str | None = None, type: StationEventType | None = None
) -> SubscriptionProtocol[StationEventType]:
# FIXME: Implement using PUB/SUB
...
def get_device(self, device: str) -> RemoteDevice:
return RemoteDevice(self, device)
def get_station(self, station: str) -> RemoteStation:
return RemoteStation(self, station)
class RemoteStation:
def __init__(self, client: IoTHubClient, station: str) -> None:
self.client = client
self.station = station
async def subscribe_to_events(
self,
type: StationEventType | None = None,
) -> SubscriptionProtocol[StationEventType]:
"""Return an event subscription which can be used to iterate over station events."""
return await self.client.subscribe_to_station_events(
station=self.station, type=type
)
async def list_discovered_devices(self) -> list[str]:
return await self.client.station_query(
StationQuery(StationQueryType.LIST_DISCOVERED_DEVICES)
)
async def list_connected_devices(self) -> list[str]:
return await self.client.station_query(
StationQuery(StationQueryType.LIST_CONNECTED_DEVICES)
)
async def get_discovered_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]:
"""Discovered devices, excluding connected devices."""
return await self.client.station_query(
StationQuery(StationQueryType.GET_DISCOVERED_DEVICES)
)
async def get_connected_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]:
"""Connected devices, excluding discovered devices."""
return await self.client.station_query(
StationQuery(StationQueryType.GET_CONNECTED_DEVICES)
)
async def get_device_props(self, device: str) -> DevicePropsT:
return await self.client.station_query(
StationQuery(StationQueryType.GET_DEVICE_PROPS, {"device": device})
)
async def get_device_keys(self, device: str) -> list[str]:
return await self.client.station_query(
StationQuery(StationQueryType.GET_DEVICE_KEYS, {"device": device})
)
async def start(self) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.START)
)
async def stop(self) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.STOP)
)
async def connect_device(self, device: str) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.CONNECT, {"device": device})
)
async def disconnect_device(self, device: str) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.DISCONNECT, {"device": device})
)
async def start_discovery(self) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.START_DISCOVERY)
)
async def stop_discovery(self) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.STOP_DISCOVERY)
)
async def enable_discovery(self) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.ENABLE_DISCOVERY)
)
async def disable_discovery(self) -> None:
return await self.client.station_request(
StationRequest(StationRequestType.DISABLE_DISCOVERY)
)
class RemoteDevice:
def __init__(self, client: IoTHubClient, device: str) -> None:
self.client = client
self.device = device
async def connect(self, station: str) -> None:
"""Connect a station to the device"""
await self.client.station_request(
station, StationRequest(StationRequestType.CONNECT, {"device": self.device})
)
async def disconnect(self) -> None:
"""Force disconnection of device to any station it might be connected to."""
await self.client.device_request(
self.device, DeviceRequest(DeviceRequestType.DISCONNECT)
)
async def set_time(self) -> None:
"""Set time on device to current time.
Implementation may vary between backends.
"""
await self.client.device_request(
self.device, DeviceRequest(DeviceRequestType.SET_TIME)
)
async def sync_time(self) -> None:
"""Sync time on device to current time.
Implementation may vary between backends.
"""
await self.client.device_request(
self.device, DeviceRequest(DeviceRequestType.SYNC_TIME)
)
async def read_value(self, key: str) -> bytearray:
"""Read a value as bytearray under given key."""
await self.client.device_request(
self.device, DeviceRequest(DeviceRequestType.READ_VALUE, {"key": key})
)
async def write_value(self, key: str, value: bytearray) -> int:
"""Write a value and return number of bytes written under given key."""
await self.client.device_request(
self.device,
DeviceRequest(DeviceRequestType.WRITE_VALUE, {"key": key, "value": value}),
)
async def enable_notifications(self, key: str) -> None:
"""Start notifications for given key."""
await self.client.device_request(
self.device, DeviceRequest(DeviceRequestType.START_NOTIFY, {"key": key})
)
async def disable_notifications(self, key: str) -> None:
"""Stop notifications for given key."""
await self.client.device_request(
self.device, DeviceRequest(DeviceRequestType.STOP_NOTIFY, {"key": key})
)
async def reset(self) -> None:
await self.client.device_request(
self.device, DeviceRequest(DeviceRequestType.RESET)
)
async def subscribe_to_events(
self,
type: DeviceEventType | None = None,
) -> SubscriptionProtocol[DeviceEvent]:
"""Return an event subscription which can be used to iterate over device events."""
return await self.client.subscribe_to_device_events(
device=self.device, type=type
)
async def subscribe_to_notifications(
self,
key: str,
) -> SubscriptionProtocol[bytearray]:
"""Return a notification subscription which can be used to iterate over received messages."""
return await self.client.subscribe_to_device_events(
device=self.device, type=DeviceEventType.NOTIFIED, key=key
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment