Created
February 16, 2022 10:58
-
-
Save tomchy/04ac4ff78d6e117d33ab92d9cc1de779 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""ControllerApplication for <protocol_name> protocol based adapters.""" | |
import asyncio | |
import binascii | |
import logging | |
import re | |
from typing import Any, Dict | |
import zigpy.appdb | |
import zigpy.config | |
import zigpy.device | |
import zigpy.endpoint | |
import zigpy.exceptions | |
import zigpy.neighbor | |
import zigpy.state | |
import zigpy.types as t | |
import zigpy.util | |
from <port>.config import CONFIG_SCHEMA, SCHEMA_DEVICE | |
LOGGER = logging.getLogger(__name__) | |
class ControllerApplication(zigpy.application.ControllerApplication): | |
SCHEMA = CONFIG_SCHEMA | |
SCHEMA_DEVICE = SCHEMA_DEVICE | |
def __init__(self, config: Dict[str, Any]): | |
"""Initialize instance.""" | |
super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config)) | |
self._api = None | |
# Enable if Application controller can send miltiple requests over communication channel | |
#self._pending = zigpy.util.Requests() | |
self.version = None | |
async def shutdown(self): | |
"""Perform a complete application shutdown.""" | |
self._api.close() | |
async def startup(self, auto_form=False): | |
"""Perform a complete application startup.""" | |
# Steps to be performed: | |
# 1. Initialize API serialization | |
# - Create self._api instance | |
# - Read protocol, stack and app versions | |
# - Verify protocol version | |
# - Promote self._api object to match the set of supported features | |
# 2. Construct a Zigbee device for the controller application. The following steps may be done inside the | |
# so-called ControllerZigbeeDevice class defined below. | |
# - Create a device instance for the application itself | |
# - Call ZbncpDevice.new(..) in order to create new coordinator device | |
# - Set device role (ZC) | |
# - Set Primary & Secondary channels | |
# - Set Simple Descriptor | |
# - Endpoints | |
# - Set Node descriptor | |
# - Set RX ON When Idle flag | |
# - Set MAC capabilities | |
# - Set Manufacturer code | |
# - Set power descriptor | |
# - Current and available power sources | |
coordinator = await ControllerZigbeeDevice.new( | |
self, | |
self.ieee, | |
self.nwk, | |
self.version, | |
self._config[zigpy.config.CONF_DEVICE][zigpy.config.CONF_DEVICE_PATH], | |
) | |
# 3. Add context listeners | |
# - Reset indication listener | |
# - Device Announcements listener | |
# coordinator.neighbors.add_context_listener(self._dblistener) | |
# - APSDE-Data.Indictation listener | |
# 4. Attach the newly created device to the application | |
# self.devices[self.ieee] = coordinator | |
# 5. Start BDB initialization procedure | |
# - Check if device has already been commissioned | |
# 6. Start network formation or network steering procedure | |
if auto_form: | |
await self.form_network() | |
# 7. Read current NWK data: | |
# - NWK address | |
# - PAN ID | |
# - Extended PAN ID | |
# - Channel mask | |
# - NWK key(s): seq, tx_counter, key | |
# - Read NWK frame counter | |
# - TC address | |
# - TC link key(s) | |
# - Security mode | |
# - Current channel | |
# - NWK update ID | |
# 8. Recreate neighbour table (optional) | |
# if self._api.protocol_version >= PROTO_VER_NEIGBOURS: | |
# await self.restore_neighbours() | |
# asyncio.create_task(self._delayed_neighbour_scan()) | |
async def form_network(self): | |
"""Form a new network based on network configuration from config.""" | |
# Set APS Designated Coordinator flag | |
# Wait for the formation to complete | |
raise NotImplementedError | |
async def update_network( | |
self, | |
*, | |
channel: Optional[t.uint8_t] = None, | |
channels: Optional[t.Channels] = None, | |
extended_pan_id: Optional[t.ExtendedPanId] = None, | |
network_key: Optional[t.KeyData] = None, | |
pan_id: Optional[t.PanId] = None, | |
tc_link_key: Optional[t.KeyData] = None, | |
update_id: int = 0, | |
): | |
"""Update network parameters. | |
:param channel: Radio channel | |
:param channels: Channels mask | |
:param extended_pan_id: Extended pan id | |
:param network_key: network key | |
:param pan_id: Network pan id | |
:param tc_link_key: Trust Center link key | |
:param update_id: nwk_update_id parameter | |
""" | |
raise NotImplementedError | |
async def force_remove(self, dev): | |
"""Forcibly remove device from NCP.""" | |
raise NotImplementedError | |
async def mrequest( | |
self, | |
group_id, | |
profile, | |
cluster, | |
src_ep, | |
sequence, | |
data, | |
*, | |
hops=0, | |
non_member_radius=3, | |
): | |
"""Submit and send data out as a multicast transmission. | |
:param group_id: destination multicast address | |
:param profile: Zigbee Profile ID to use for outgoing message | |
:param cluster: cluster id where the message is being sent | |
:param src_ep: source endpoint id | |
:param sequence: transaction sequence number of the message | |
:param data: Zigbee message payload | |
:param hops: the message will be delivered to all nodes within this number of | |
hops of the sender. A value of zero is converted to MAX_HOPS | |
:param non_member_radius: the number of hops that the message will be forwarded | |
by devices that are not members of the group. A value | |
of 7 or greater is treated as infinite | |
:returns: return a tuple of a status and an error_message. Original requestor | |
has more context to provide a more meaningful error message | |
""" | |
raise NotImplementedError | |
@zigpy.util.retryable_request | |
async def request( | |
self, | |
device, | |
profile, | |
cluster, | |
src_ep, | |
dst_ep, | |
sequence, | |
data, | |
expect_reply=True, | |
use_ieee=False, | |
): | |
"""Submit and send data out as an unicast transmission. | |
:param device: destination device | |
:param profile: Zigbee Profile ID to use for outgoing message | |
:param cluster: cluster id where the message is being sent | |
:param src_ep: source endpoint id | |
:param dst_ep: destination endpoint id | |
:param sequence: transaction sequence number of the message | |
:param data: Zigbee message payload | |
:param expect_reply: True if this is essentially a request | |
:param use_ieee: use EUI64 for destination addressing | |
:returns: return a tuple of a status and an error_message. Original requestor | |
has more context to provide a more meaningful error message | |
""" | |
raise NotImplementedError | |
async def broadcast( | |
self, | |
profile, | |
cluster, | |
src_ep, | |
dst_ep, | |
grpid, | |
radius, | |
sequence, | |
data, | |
broadcast_address, | |
): | |
"""Submit and send data out as an unicast transmission. | |
:param profile: Zigbee Profile ID to use for outgoing message | |
:param cluster: cluster id where the message is being sent | |
:param src_ep: source endpoint id | |
:param dst_ep: destination endpoint id | |
:param: grpid: group id to address the broadcast to | |
:param radius: max radius of the broadcast | |
:param sequence: transaction sequence number of the message | |
:param data: zigbee message payload | |
:param timeout: how long to wait for transmission ACK | |
:param broadcast_address: broadcast address. | |
:returns: return a tuple of a status and an error_message. Original requestor | |
has more context to provide a more meaningful error message | |
""" | |
raise NotImplementedError | |
def handle_tx_confirm(self, req_id, status): | |
# Enable if Application controller can send miltiple requests over communication channel | |
#try: | |
# self._pending[req_id].result.set_result(status) | |
# return | |
#except KeyError: | |
# LOGGER.warning( | |
# "Unexpected transmit confirm for request id %s, Status: %s", | |
# req_id, | |
# status, | |
# ) | |
#except asyncio.InvalidStateError as exc: | |
# LOGGER.debug( | |
# "Invalid state on future - probably duplicate response: %s", exc | |
# ) | |
raise NotImplementedError | |
async def permit_ncp(self, time_s=60): | |
"""Permit joining on NCP.""" | |
assert 0 <= time_s <= 254 | |
raise NotImplementedError | |
def permit_with_key(self, node, code, time_s=60): | |
# Consider using Install codes & zigpy.Application API: | |
# - permit(self, time_s=60, node=None): | |
raise NotImplementedError | |
async def probe( | |
cls, device_config: dict[str, Any] | |
) -> bool | dict[str, int | str | bool]: | |
"""API/Port probe method.""" | |
raise NotImplementedError | |
class ControllerZigbeeDevice(zigpy.device.Device): | |
"""Zigpy Device representing Coordinator.""" | |
def __init__(self, version: int, device_path: str, *args): | |
"""Initialize instance.""" | |
super().__init__(*args) | |
async def add_to_group(self, grp_id: int, name: str = None) -> None: | |
# Call application.multicast methods if needed | |
group = self.application.groups.add_group(grp_id, name) | |
return [0] | |
async def remove_from_group(self, grp_id: int) -> None: | |
# Call application.multicast methods if needed | |
self.application.groups[grp_id].remove_member(self.endpoints[epid]) | |
return [0] | |
@property | |
def manufacturer(self): | |
return "<zigpy Zigbee dongle manufacturer>" | |
@property | |
def model(self): | |
return "<zigpy Zigbee dongle model>" | |
@classmethod | |
async def new(cls, application, ieee, nwk, version: int, device_path: str): | |
"""Create or replace zigpy device.""" | |
dev = cls(version, device_path, application, ieee, nwk) | |
await dev.initialize() | |
return dev |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment