|
"""Most of the code is not implemented yet. |
|
|
|
All methods decorated with @abc.abstractmethod are not implemented. |
|
""" |
|
from __future__ import annotations |
|
|
|
import abc |
|
from enum import Enum |
|
from typing import Any, Awaitable, Callable, Generic, Mapping, TypeVar |
|
|
|
from ..functional.subscription import PubSub, ServiceProtocol, SubscriptionProtocol |
|
|
|
|
|
class StationEventType(str, Enum): |
|
"""All events which can be emitted by an IoT station.""" |
|
|
|
STARTED = "STARTED" |
|
STOPPED = "STOPPED" |
|
DISCOVERY_ENABLED = "DISCOVERY_ENABLED" |
|
DISCOVERY_DISABLED = "DISCOVERY_DISABLED" |
|
DISCOVERY_STARTED = "DISCOVERY_STARTED" |
|
DISCOVERY_STOPPED = "DISCOVERY_STOPPED" |
|
DEVICE_DISCOVERED = "DEVICE_DISCOVERED" |
|
DEVICE_CONNECTED = "DEVICE_CONNECTED" |
|
DEVICE_DISCONNECTED = "DEVICE_DISCONNECTED" |
|
PROPS_CHANGED = "PROPS_CHANGED" |
|
|
|
|
|
class DeviceEventType(str, Enum): |
|
CONNECTED = "CONNECTED" |
|
DISCONNECTED = "DISCONNECTED" |
|
TIME_SET = "TIME_SET" |
|
TIME_SYNCED = "SYNCED" |
|
NOTIFIED = "NOTIFIED" |
|
NOTIFY_STARTED = "NOTIFY_STARTED" |
|
NOTIFY_STOPPED = "NOTIFY_STOPPED" |
|
PROPS_CHANGED = "PROPS_CHANGED" |
|
|
|
|
|
class DeviceRequestType(str, Enum): |
|
"""Enumeration of requests types available for IoT devices.""" |
|
|
|
RESET = "RESET" |
|
DISCONNECT = "DISCONNECT" |
|
START_NOTIFY = "START_NOTIFY" |
|
STOP_NOTIFY = "STOP_NOTIFY" |
|
SET_TIME = "SET_TIME" |
|
SYNC_TIME = "SYNC_TIME" |
|
READ_VALUE = "READ_VALUE" |
|
WRITE_VALUE = "WRITE_VALUE" |
|
|
|
|
|
class StationRequestType(str, Enum): |
|
"""Enumeration of requests types available for stations.""" |
|
|
|
START = "START" |
|
STOP = "STOP" |
|
CONNECT = "CONNECT" |
|
DISCONNECT = "DISCONNECT" |
|
START_DISCOVERY = "START_DISCOVERY" |
|
STOP_DISCOVERY = "STOP_DISCOVERY" |
|
ENABLE_DISCOVERY = "ENABLE_DISCOVERY" |
|
DISABLE_DISCOVERY = "DISABLE_DISCOVERY" |
|
|
|
|
|
class StationQueryType(str, Enum): |
|
"""Enumeration of query types available for stations.""" |
|
|
|
GET_PROPS = "GET_PROPS" |
|
GET_KNOWN_DEVICES = "GET_KNOWN_DEVICES" |
|
GET_CONNECTED_DEVICES = "GET_CONNECTED_DEVICES" |
|
GET_DISCOVERED_DEVICES = "GET_DISCOVERED_DEVICES" |
|
LIST_KNOWN_DEVICES = "LIST_KNOWN_DEVICES" |
|
LIST_CONNECTED_DEVICES = "LIST_CONNECTED_DEVICES" |
|
LIST_DISCOVERED_DEVICES = "LIST_DISCOVERED_DEVICES" |
|
GET_DEVICE_PROPS = "GET_DEVICE_PROPS" |
|
GET_DEVICE_KEYS = "GET_DEVICE_KEYS" |
|
|
|
|
|
class HubQueryType(str, Enum): |
|
"""Enumeration of allowed queries for hub""" |
|
|
|
LIST_DEVICES = "LIST_DEVICES" |
|
LIST_STATIONS = "LIST_STATIONS" |
|
FIND_DEVICES = "FIND_DEVICES" |
|
FIND_STATIONS = "FIND_STATIONS" |
|
GET_DEVICE = "GET_DEVICE" |
|
GET_STATION = "GET_STATION" |
|
|
|
|
|
class StationEvent: |
|
"""An event emitted by an IoT station""" |
|
|
|
def __init__(self, data: Mapping[str, Any], type: StationEventType) -> None: |
|
self.data = data |
|
self.type = type |
|
|
|
|
|
class DeviceEvent: |
|
"""An event emitted by an IoT device.""" |
|
|
|
def __init__(self, data: Mapping[str, Any], type: StationRequestType) -> None: |
|
self.data = data |
|
self.type = type |
|
|
|
|
|
class DeviceRequest: |
|
"""A request to send to an IoT device.""" |
|
|
|
def __init__( |
|
self, |
|
type: DeviceRequestType, |
|
options: Mapping[str, Any] | None = None, |
|
deadline: float | None = None, |
|
) -> None: |
|
self.type = type |
|
self.options = options |
|
self.deadline = deadline |
|
|
|
|
|
class StationRequest: |
|
"""A request to send to an IoT station.""" |
|
|
|
def __init__( |
|
self, |
|
type: StationRequestType, |
|
options: Mapping[str, Any] | None = None, |
|
deadline: float | None = None, |
|
) -> None: |
|
self.type = type |
|
self.options = options |
|
self.deadline = deadline |
|
|
|
|
|
class StationQuery: |
|
"""A query to send to a station.""" |
|
|
|
def __init__( |
|
self, |
|
type: StationQueryType, |
|
options: Mapping[str, Any] | None = None, |
|
deadline: float | None = None, |
|
) -> None: |
|
self.type = type |
|
self.options = options |
|
self.deadline = deadline |
|
|
|
|
|
class HubQuery: |
|
def __init__( |
|
self, |
|
type: HubQueryType, |
|
options: Mapping[str, Any] | None = None, |
|
deadline: float | None = None, |
|
) -> None: |
|
self.type = type |
|
self.options = options |
|
self.deadline = deadline |
|
|
|
|
|
DevicePropsT = TypeVar("DevicePropsT") |
|
StationPropsT = TypeVar("StationPropsT") |
|
|
|
|
|
class IoTDeviceBackend(Generic[DevicePropsT]): |
|
"""Device backend interface.""" |
|
|
|
@abc.abstractmethod |
|
def get_props(self) -> DevicePropsT: |
|
"""Get device properties. Backend implementations are expected to update |
|
properties asynchonously, but expose them synchronously.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_keys(self) -> list[str]: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def disconnect(self) -> None: |
|
"""Force disconnection of device to any station it might be connected to.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def set_time(self) -> None: |
|
"""Set time on device to current time. |
|
|
|
Implementation may vary between backends. |
|
""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def sync_time(self) -> None: |
|
"""Sync time on device to current time. |
|
|
|
Implementation may vary between backends. |
|
""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def read_value(self, key: str) -> bytearray: |
|
"""Read a value as bytearray under given key.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def write_value(self, key: str, value: bytearray) -> int: |
|
"""Write a value and return number of bytes written under given key.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def enable_notifications(self, key: str) -> None: |
|
"""Start notifications for given key.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def disable_notifications(self, key: str) -> None: |
|
"""Stop notifications for given key.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def reset(self) -> None: |
|
... |
|
|
|
|
|
class IoTStationBackend(Generic[StationPropsT, DevicePropsT]): |
|
"""Station backend interface.""" |
|
|
|
@abc.abstractmethod |
|
def get_props(self) -> StationPropsT: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_device(self, device: str) -> IoTDeviceBackend[DevicePropsT]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def list_discovered_devices(self) -> list[str]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def list_connected_devices(self) -> list[str]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_discovered_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]: |
|
"""Discovered devices, excluding connected devices.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_connected_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]: |
|
"""Connected devices, excluding discovered devices.""" |
|
... |
|
|
|
@abc.abstractmethod |
|
async def start(self) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def stop(self) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def connect(self, device: str) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def disconnect(self, device: str) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def start_discovery(self) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def stop_discovery(self) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def enable_discovery(self) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def disable_discovery(self) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
def set_workstation_event_sink( |
|
self, |
|
type: StationEventType | None, |
|
sink: Callable[[StationEvent], None] | None, |
|
) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
def set_device_event_sink( |
|
self, |
|
type: DeviceEventType | None, |
|
sink: Callable[[DeviceEvent], None] | None, |
|
) -> None: |
|
... |
|
|
|
|
|
class IoTGateway(Generic[StationPropsT, DevicePropsT]): |
|
@abc.abstractmethod |
|
def get_name(self) -> str: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_pubsub(self) -> PubSub: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_backend(self) -> IoTStationBackend[StationPropsT, DevicePropsT]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_station_service(self) -> ServiceProtocol: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_device_service(self, device: str) -> ServiceProtocol: |
|
... |
|
|
|
@abc.abstractmethod |
|
def set_station_service(self, service: ServiceProtocol | None) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
def set_device_service(self, device: str, service: ServiceProtocol) -> None: |
|
... |
|
|
|
def create_station_requests_handler( |
|
self, |
|
) -> Callable[[StationRequest], Awaitable[None]]: |
|
async def handler(request: StationRequest) -> None: |
|
"""Handle a request directed to the station.""" |
|
backend = self.get_backend() |
|
TYPE = StationRequestType(request.type) |
|
if TYPE == StationRequestType.CONNECT: |
|
device = request.options["device"] |
|
return await backend.connect(device=device) |
|
if TYPE == StationRequestType.DISCONNECT: |
|
device = request.options["device"] |
|
return await backend.disconnect(device=device) |
|
if TYPE == StationRequestType.START_DISCOVERY: |
|
return await backend.start_discovery() |
|
if TYPE == StationRequestType.STOP_DISCOVERY: |
|
return await backend.stop_discovery() |
|
if TYPE == StationRequestType.ENABLE_DISCOVERY: |
|
return await backend.enable_discovery() |
|
if TYPE == StationRequestType.DISABLE_DISCOVERY: |
|
return await backend.disable_discovery() |
|
if TYPE == StationRequestType.START: |
|
return await backend.start() |
|
if TYPE == StationRequestType.STOP: |
|
return await backend.stop() |
|
raise NotImplementedError(f"Request type not supported yet: {TYPE}") |
|
|
|
return handler |
|
|
|
def create_device_requests_handler( |
|
self, device: str |
|
) -> Callable[[DeviceRequest], Awaitable[None]]: |
|
async def handler(request: DeviceRequest) -> None: |
|
"""Handle a request directed to a device.""" |
|
backend = self.get_backend() |
|
device_backend = backend.get_device(device) |
|
TYPE = request.type |
|
if TYPE == DeviceRequestType.DISCONNECT: |
|
return await device_backend.disconnect() |
|
if TYPE == DeviceRequestType.SET_TIME: |
|
return await device_backend.set_time() |
|
if TYPE == DeviceRequestType.SYNC_TIME: |
|
return await device_backend.sync_time() |
|
if TYPE == DeviceRequestType.START_NOTIFY: |
|
key = request.options["key"] |
|
return await device_backend.enable_notifications(key) |
|
if TYPE == DeviceRequestType.STOP_NOTIFY: |
|
key = request.options["key"] |
|
return await device_backend.disable_notifications(key) |
|
if TYPE == DeviceRequestType.READ_VALUE: |
|
target = request.options["key"] |
|
return await device_backend.read_value(target) |
|
if TYPE == DeviceRequestType.WRITE_VALUE: |
|
target = request.options["key"] |
|
value = request.options["value"] |
|
return await device_backend.write_value(target, value) |
|
if TYPE == DeviceRequestType.RESET: |
|
return await device_backend.reset() |
|
|
|
return handler |
|
|
|
def create_station_query_handler(self) -> Callable[[StationQuery], Awaitable[Any]]: |
|
backend = self.get_backend() |
|
|
|
async def handler(query: StationQuery) -> Any: |
|
TYPE = StationQueryType(query.type) |
|
if TYPE == StationQueryType.GET_CONNECTED_DEVICES: |
|
return await backend.get_connected_devices() |
|
if TYPE == StationQueryType.GET_DISCOVERED_DEVICES: |
|
return await backend.get_discovered_devices() |
|
if TYPE == StationQueryType.LIST_CONNECTED_DEVICES: |
|
return await backend.list_connected_devices() |
|
if TYPE == StationQueryType.LIST_DISCOVERED_DEVICES: |
|
return await backend.list_discovered_devices() |
|
if TYPE == StationQueryType.GET_PROPS: |
|
return await backend.get_props() |
|
if TYPE == StationQueryType.GET_DEVICE_PROPS: |
|
device = backend.get_device(query.options["device"]) |
|
return device.get_props() |
|
if TYPE == StationQueryType.GET_DEVICE_KEYS: |
|
device = backend.get_device(query.options["device"]) |
|
return device.get_keys() |
|
raise NotImplementedError(f"Query type is not supported yet: {TYPE}") |
|
|
|
return handler |
|
|
|
def handle_device_event(self, event: DeviceEvent) -> None: |
|
pubsub = self.get_pubsub() |
|
pubsub.publish_no_wait("PUBLISH_EVENT_SUBJECT", event) |
|
|
|
def handle_station_event(self, event: StationEvent) -> None: |
|
pubsub = self.get_pubsub() |
|
pubsub.publish_no_wait("PUBLISH_STATION_SUBJECT", event) |
|
|
|
async def register_station(self, subject: str, device_subject: str) -> None: |
|
try: |
|
self.get_station_service() |
|
raise Exception(f"A subscription already exist for this station") |
|
except KeyError: |
|
pass |
|
pubsub = self.get_pubsub() |
|
backend = self.get_backend() |
|
# Register workstation request handler |
|
handler = self.create_station_requests_handler() |
|
subject = subject.format(station=self.get_name()) |
|
service = await pubsub.create_service(subject, handler) |
|
self.set_station_service(service) |
|
# Register query handler |
|
query_handler = self.create_station_query_handler() |
|
await pubsub.subscribe("QUERY_SUBJECT", query_handler) |
|
# Register connected devices request handlers |
|
for device in backend.list_connected_devices(): |
|
await self.register_device(device, device_subject) |
|
|
|
async def unregister_workstation(self) -> None: |
|
backend = self.get_backend() |
|
# First unregister devices |
|
for device in backend.list_known_devices(): |
|
await self.unregister_device(device) |
|
# Then unregister station |
|
try: |
|
service = self.get_station_service() |
|
await service.stop() |
|
except KeyError: |
|
pass |
|
self.set_station_service(None) |
|
|
|
async def register_device(self, device: str, subject: str) -> None: |
|
try: |
|
current_sid = self.get_device_service(device) |
|
raise Exception(f"A subscription already exist for device: {current_sid}") |
|
except KeyError: |
|
pass |
|
pubsub = self.get_pubsub() |
|
handler = self.create_device_requests_handler(device) |
|
subject = subject.format(device=device) |
|
service = await pubsub.subscribe(subject, handler) |
|
self.set_device_service(device, service) |
|
|
|
async def unregister_device(self, device: str) -> None: |
|
try: |
|
service = self.get_device_service(device) |
|
except KeyError: |
|
return |
|
await service.stop() |
|
self.set_device_service(None) |
|
|
|
async def start(self) -> None: |
|
pubsub = self.get_pubsub() |
|
backend = self.get_backend() |
|
await pubsub.connect() |
|
backend.set_device_event_sink(self.handle_device_event) |
|
backend.set_workstation_event_sink(self.handle_station_event) |
|
await self.register_station( |
|
"WORKSTATION_SUBJECT.{station}.>", "DEVICE_SUBJECT.{device}.>" |
|
) |
|
|
|
async def stop(self) -> None: |
|
pubsub = self.get_pubsub() |
|
backend = self.get_backend() |
|
await self.unregister_workstation() |
|
backend.set_device_event_sink(None) |
|
backend.set_workstation_event_sink(None) |
|
await backend.disconnect() |
|
await pubsub.disconnect() |
|
|
|
|
|
class IoTHub(Generic[StationPropsT, DevicePropsT]): |
|
@abc.abstractmethod |
|
def get_pubsub(self) -> PubSub: |
|
... |
|
|
|
@abc.abstractmethod |
|
def set_service(self, service: ServiceProtocol | None) -> None: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_service(self) -> ServiceProtocol: |
|
... |
|
|
|
@abc.abstractmethod |
|
def list_stations(self) -> list[str]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def list_devices(self) -> list[str]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def find_devices(self, filter: Mapping[str, Any], limit: int) -> list[DevicePropsT]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def find_stations( |
|
self, filter: Mapping[str, Any], limit: int |
|
) -> list[StationPropsT]: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_device(self, device: str) -> DevicePropsT: |
|
... |
|
|
|
@abc.abstractmethod |
|
def get_station(self, station: str) -> StationPropsT: |
|
... |
|
|
|
@abc.abstractmethod |
|
async def process_device_event(self, event: DeviceEvent) -> None: |
|
# Decide what to do with the event |
|
... |
|
|
|
@abc.abstractmethod |
|
async def process_station_event(self, event: StationEvent) -> None: |
|
# Decide what to do with the event |
|
... |
|
|
|
async def process_event(self, event: StationEvent | DeviceEvent) -> None: |
|
if isinstance(event, DeviceEvent): |
|
await self.process_device_event(event) |
|
else: |
|
await self.process_station_event(event) |
|
|
|
async def process_query(self, query: HubQuery) -> Any: |
|
# This is in-memory information retrieval, I.E, almost pure logic |
|
TYPE = HubQueryType(query.type) |
|
if TYPE == HubQueryType.FIND_DEVICES: |
|
return self.find_devices(**query.options) |
|
if TYPE == HubQueryType.FIND_STATIONS: |
|
return self.find_stations(**query.options) |
|
if TYPE == HubQueryType.GET_DEVICE: |
|
return self.get_device(query.options["device"]) |
|
if TYPE == HubQueryType.GET_STATION: |
|
return self.get_station(query.options["station"]) |
|
if TYPE == HubQueryType.LIST_DEVICES: |
|
return self.list_devices(**query.options) |
|
if TYPE == HubQueryType.LIST_STATIONS: |
|
return self.list_stations(**query.options) |
|
|
|
async def start(self) -> None: |
|
try: |
|
self.get_service() |
|
raise Exception("A subscription already exists") |
|
except KeyError: |
|
pass |
|
pubsub = self.get_pubsub() |
|
await pubsub.connect() |
|
service = await pubsub.subscribe("COMMON_PREFIX", self.process_event) |
|
self.set_service(service) |
|
|
|
async def stop(self) -> None: |
|
pubsub = self.get_pubsub() |
|
try: |
|
service = self.get_service() |
|
await service.stop() |
|
except KeyError: |
|
pass |
|
await pubsub.disconnect() |
|
|
|
|
|
class IoTHubClient: |
|
"""Should depend only on pub/sub + subjects used in Gateway + Hub""" |
|
|
|
async def list_devices(self) -> list[str]: |
|
"""List existing devices""" |
|
# FIXME: Implement using PUB/SUB |
|
... |
|
|
|
async def list_stations(self) -> list[str]: |
|
"""List existing stations""" |
|
# FIXME: Implement using PUB/SUB |
|
... |
|
|
|
async def device_request(self, device: str, request: DeviceRequest) -> Any: |
|
"""Send a request to a device""" |
|
# FIXME: Implement using PUB/SUB |
|
... |
|
|
|
async def station_query(self, query: StationQuery) -> Any: |
|
"""Send a query to a station""" |
|
# FIXME: Implement using PUB/SUB |
|
... |
|
|
|
async def station_request(self, station: str, request: StationRequest) -> Any: |
|
"""Send a request to a station""" |
|
# FIXME: Implement using PUB/SUB |
|
... |
|
|
|
async def subscribe_to_device_events( |
|
self, |
|
device: str | None = None, |
|
type: DeviceEventType | None = None, |
|
key: str | None = None, |
|
) -> SubscriptionProtocol[DeviceEventType]: |
|
# FIXME: Implement using PUB/SUB |
|
... |
|
|
|
async def subscribe_to_station_events( |
|
self, station: str | None = None, type: StationEventType | None = None |
|
) -> SubscriptionProtocol[StationEventType]: |
|
# FIXME: Implement using PUB/SUB |
|
... |
|
|
|
def get_device(self, device: str) -> RemoteDevice: |
|
return RemoteDevice(self, device) |
|
|
|
def get_station(self, station: str) -> RemoteStation: |
|
return RemoteStation(self, station) |
|
|
|
|
|
|
|
class RemoteStation: |
|
def __init__(self, client: IoTHubClient, station: str) -> None: |
|
self.client = client |
|
self.station = station |
|
|
|
async def subscribe_to_events( |
|
self, |
|
type: StationEventType | None = None, |
|
) -> SubscriptionProtocol[StationEventType]: |
|
"""Return an event subscription which can be used to iterate over station events.""" |
|
return await self.client.subscribe_to_station_events( |
|
station=self.station, type=type |
|
) |
|
|
|
async def list_discovered_devices(self) -> list[str]: |
|
return await self.client.station_query( |
|
StationQuery(StationQueryType.LIST_DISCOVERED_DEVICES) |
|
) |
|
|
|
async def list_connected_devices(self) -> list[str]: |
|
return await self.client.station_query( |
|
StationQuery(StationQueryType.LIST_CONNECTED_DEVICES) |
|
) |
|
|
|
async def get_discovered_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]: |
|
"""Discovered devices, excluding connected devices.""" |
|
return await self.client.station_query( |
|
StationQuery(StationQueryType.GET_DISCOVERED_DEVICES) |
|
) |
|
|
|
async def get_connected_devices(self) -> dict[str, IoTDeviceBackend[DevicePropsT]]: |
|
"""Connected devices, excluding discovered devices.""" |
|
return await self.client.station_query( |
|
StationQuery(StationQueryType.GET_CONNECTED_DEVICES) |
|
) |
|
|
|
async def get_device_props(self, device: str) -> DevicePropsT: |
|
return await self.client.station_query( |
|
StationQuery(StationQueryType.GET_DEVICE_PROPS, {"device": device}) |
|
) |
|
|
|
async def get_device_keys(self, device: str) -> list[str]: |
|
return await self.client.station_query( |
|
StationQuery(StationQueryType.GET_DEVICE_KEYS, {"device": device}) |
|
) |
|
|
|
async def start(self) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.START) |
|
) |
|
|
|
async def stop(self) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.STOP) |
|
) |
|
|
|
async def connect_device(self, device: str) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.CONNECT, {"device": device}) |
|
) |
|
|
|
async def disconnect_device(self, device: str) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.DISCONNECT, {"device": device}) |
|
) |
|
|
|
async def start_discovery(self) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.START_DISCOVERY) |
|
) |
|
|
|
async def stop_discovery(self) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.STOP_DISCOVERY) |
|
) |
|
|
|
async def enable_discovery(self) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.ENABLE_DISCOVERY) |
|
) |
|
|
|
async def disable_discovery(self) -> None: |
|
return await self.client.station_request( |
|
StationRequest(StationRequestType.DISABLE_DISCOVERY) |
|
) |
|
|
|
|
|
class RemoteDevice: |
|
def __init__(self, client: IoTHubClient, device: str) -> None: |
|
self.client = client |
|
self.device = device |
|
|
|
async def connect(self, station: str) -> None: |
|
"""Connect a station to the device""" |
|
await self.client.station_request( |
|
station, StationRequest(StationRequestType.CONNECT, {"device": self.device}) |
|
) |
|
|
|
async def disconnect(self) -> None: |
|
"""Force disconnection of device to any station it might be connected to.""" |
|
await self.client.device_request( |
|
self.device, DeviceRequest(DeviceRequestType.DISCONNECT) |
|
) |
|
|
|
async def set_time(self) -> None: |
|
"""Set time on device to current time. |
|
|
|
Implementation may vary between backends. |
|
""" |
|
await self.client.device_request( |
|
self.device, DeviceRequest(DeviceRequestType.SET_TIME) |
|
) |
|
|
|
async def sync_time(self) -> None: |
|
"""Sync time on device to current time. |
|
|
|
Implementation may vary between backends. |
|
""" |
|
await self.client.device_request( |
|
self.device, DeviceRequest(DeviceRequestType.SYNC_TIME) |
|
) |
|
|
|
async def read_value(self, key: str) -> bytearray: |
|
"""Read a value as bytearray under given key.""" |
|
await self.client.device_request( |
|
self.device, DeviceRequest(DeviceRequestType.READ_VALUE, {"key": key}) |
|
) |
|
|
|
async def write_value(self, key: str, value: bytearray) -> int: |
|
"""Write a value and return number of bytes written under given key.""" |
|
await self.client.device_request( |
|
self.device, |
|
DeviceRequest(DeviceRequestType.WRITE_VALUE, {"key": key, "value": value}), |
|
) |
|
|
|
async def enable_notifications(self, key: str) -> None: |
|
"""Start notifications for given key.""" |
|
await self.client.device_request( |
|
self.device, DeviceRequest(DeviceRequestType.START_NOTIFY, {"key": key}) |
|
) |
|
|
|
async def disable_notifications(self, key: str) -> None: |
|
"""Stop notifications for given key.""" |
|
await self.client.device_request( |
|
self.device, DeviceRequest(DeviceRequestType.STOP_NOTIFY, {"key": key}) |
|
) |
|
|
|
async def reset(self) -> None: |
|
await self.client.device_request( |
|
self.device, DeviceRequest(DeviceRequestType.RESET) |
|
) |
|
|
|
async def subscribe_to_events( |
|
self, |
|
type: DeviceEventType | None = None, |
|
) -> SubscriptionProtocol[DeviceEvent]: |
|
"""Return an event subscription which can be used to iterate over device events.""" |
|
return await self.client.subscribe_to_device_events( |
|
device=self.device, type=type |
|
) |
|
|
|
async def subscribe_to_notifications( |
|
self, |
|
key: str, |
|
) -> SubscriptionProtocol[bytearray]: |
|
"""Return a notification subscription which can be used to iterate over received messages.""" |
|
return await self.client.subscribe_to_device_events( |
|
device=self.device, type=DeviceEventType.NOTIFIED, key=key |
|
) |