Skip to content

Instantly share code, notes, and snippets.

@tomchy
Created February 16, 2022 10:58
Show Gist options
  • Save tomchy/04ac4ff78d6e117d33ab92d9cc1de779 to your computer and use it in GitHub Desktop.
Save tomchy/04ac4ff78d6e117d33ab92d9cc1de779 to your computer and use it in GitHub Desktop.
"""ControllerApplication for <protocol_name> protocol based adapters."""
import asyncio
import binascii
import logging
import re
from typing import Any, Dict
import zigpy.appdb
import zigpy.config
import zigpy.device
import zigpy.endpoint
import zigpy.exceptions
import zigpy.neighbor
import zigpy.state
import zigpy.types as t
import zigpy.util
from <port>.config import CONFIG_SCHEMA, SCHEMA_DEVICE
LOGGER = logging.getLogger(__name__)
class ControllerApplication(zigpy.application.ControllerApplication):
SCHEMA = CONFIG_SCHEMA
SCHEMA_DEVICE = SCHEMA_DEVICE
def __init__(self, config: Dict[str, Any]):
"""Initialize instance."""
super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config))
self._api = None
# Enable if Application controller can send miltiple requests over communication channel
#self._pending = zigpy.util.Requests()
self.version = None
async def shutdown(self):
"""Perform a complete application shutdown."""
self._api.close()
async def startup(self, auto_form=False):
"""Perform a complete application startup."""
# Steps to be performed:
# 1. Initialize API serialization
# - Create self._api instance
# - Read protocol, stack and app versions
# - Verify protocol version
# - Promote self._api object to match the set of supported features
# 2. Construct a Zigbee device for the controller application. The following steps may be done inside the
# so-called ControllerZigbeeDevice class defined below.
# - Create a device instance for the application itself
# - Call ZbncpDevice.new(..) in order to create new coordinator device
# - Set device role (ZC)
# - Set Primary & Secondary channels
# - Set Simple Descriptor
# - Endpoints
# - Set Node descriptor
# - Set RX ON When Idle flag
# - Set MAC capabilities
# - Set Manufacturer code
# - Set power descriptor
# - Current and available power sources
coordinator = await ControllerZigbeeDevice.new(
self,
self.ieee,
self.nwk,
self.version,
self._config[zigpy.config.CONF_DEVICE][zigpy.config.CONF_DEVICE_PATH],
)
# 3. Add context listeners
# - Reset indication listener
# - Device Announcements listener
# coordinator.neighbors.add_context_listener(self._dblistener)
# - APSDE-Data.Indictation listener
# 4. Attach the newly created device to the application
# self.devices[self.ieee] = coordinator
# 5. Start BDB initialization procedure
# - Check if device has already been commissioned
# 6. Start network formation or network steering procedure
if auto_form:
await self.form_network()
# 7. Read current NWK data:
# - NWK address
# - PAN ID
# - Extended PAN ID
# - Channel mask
# - NWK key(s): seq, tx_counter, key
# - Read NWK frame counter
# - TC address
# - TC link key(s)
# - Security mode
# - Current channel
# - NWK update ID
# 8. Recreate neighbour table (optional)
# if self._api.protocol_version >= PROTO_VER_NEIGBOURS:
# await self.restore_neighbours()
# asyncio.create_task(self._delayed_neighbour_scan())
async def form_network(self):
"""Form a new network based on network configuration from config."""
# Set APS Designated Coordinator flag
# Wait for the formation to complete
raise NotImplementedError
async def update_network(
self,
*,
channel: Optional[t.uint8_t] = None,
channels: Optional[t.Channels] = None,
extended_pan_id: Optional[t.ExtendedPanId] = None,
network_key: Optional[t.KeyData] = None,
pan_id: Optional[t.PanId] = None,
tc_link_key: Optional[t.KeyData] = None,
update_id: int = 0,
):
"""Update network parameters.
:param channel: Radio channel
:param channels: Channels mask
:param extended_pan_id: Extended pan id
:param network_key: network key
:param pan_id: Network pan id
:param tc_link_key: Trust Center link key
:param update_id: nwk_update_id parameter
"""
raise NotImplementedError
async def force_remove(self, dev):
"""Forcibly remove device from NCP."""
raise NotImplementedError
async def mrequest(
self,
group_id,
profile,
cluster,
src_ep,
sequence,
data,
*,
hops=0,
non_member_radius=3,
):
"""Submit and send data out as a multicast transmission.
:param group_id: destination multicast address
:param profile: Zigbee Profile ID to use for outgoing message
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param sequence: transaction sequence number of the message
:param data: Zigbee message payload
:param hops: the message will be delivered to all nodes within this number of
hops of the sender. A value of zero is converted to MAX_HOPS
:param non_member_radius: the number of hops that the message will be forwarded
by devices that are not members of the group. A value
of 7 or greater is treated as infinite
:returns: return a tuple of a status and an error_message. Original requestor
has more context to provide a more meaningful error message
"""
raise NotImplementedError
@zigpy.util.retryable_request
async def request(
self,
device,
profile,
cluster,
src_ep,
dst_ep,
sequence,
data,
expect_reply=True,
use_ieee=False,
):
"""Submit and send data out as an unicast transmission.
:param device: destination device
:param profile: Zigbee Profile ID to use for outgoing message
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param dst_ep: destination endpoint id
:param sequence: transaction sequence number of the message
:param data: Zigbee message payload
:param expect_reply: True if this is essentially a request
:param use_ieee: use EUI64 for destination addressing
:returns: return a tuple of a status and an error_message. Original requestor
has more context to provide a more meaningful error message
"""
raise NotImplementedError
async def broadcast(
self,
profile,
cluster,
src_ep,
dst_ep,
grpid,
radius,
sequence,
data,
broadcast_address,
):
"""Submit and send data out as an unicast transmission.
:param profile: Zigbee Profile ID to use for outgoing message
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param dst_ep: destination endpoint id
:param: grpid: group id to address the broadcast to
:param radius: max radius of the broadcast
:param sequence: transaction sequence number of the message
:param data: zigbee message payload
:param timeout: how long to wait for transmission ACK
:param broadcast_address: broadcast address.
:returns: return a tuple of a status and an error_message. Original requestor
has more context to provide a more meaningful error message
"""
raise NotImplementedError
def handle_tx_confirm(self, req_id, status):
# Enable if Application controller can send miltiple requests over communication channel
#try:
# self._pending[req_id].result.set_result(status)
# return
#except KeyError:
# LOGGER.warning(
# "Unexpected transmit confirm for request id %s, Status: %s",
# req_id,
# status,
# )
#except asyncio.InvalidStateError as exc:
# LOGGER.debug(
# "Invalid state on future - probably duplicate response: %s", exc
# )
raise NotImplementedError
async def permit_ncp(self, time_s=60):
"""Permit joining on NCP."""
assert 0 <= time_s <= 254
raise NotImplementedError
def permit_with_key(self, node, code, time_s=60):
# Consider using Install codes & zigpy.Application API:
# - permit(self, time_s=60, node=None):
raise NotImplementedError
async def probe(
cls, device_config: dict[str, Any]
) -> bool | dict[str, int | str | bool]:
"""API/Port probe method."""
raise NotImplementedError
class ControllerZigbeeDevice(zigpy.device.Device):
"""Zigpy Device representing Coordinator."""
def __init__(self, version: int, device_path: str, *args):
"""Initialize instance."""
super().__init__(*args)
async def add_to_group(self, grp_id: int, name: str = None) -> None:
# Call application.multicast methods if needed
group = self.application.groups.add_group(grp_id, name)
return [0]
async def remove_from_group(self, grp_id: int) -> None:
# Call application.multicast methods if needed
self.application.groups[grp_id].remove_member(self.endpoints[epid])
return [0]
@property
def manufacturer(self):
return "<zigpy Zigbee dongle manufacturer>"
@property
def model(self):
return "<zigpy Zigbee dongle model>"
@classmethod
async def new(cls, application, ieee, nwk, version: int, device_path: str):
"""Create or replace zigpy device."""
dev = cls(version, device_path, application, ieee, nwk)
await dev.initialize()
return dev
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment