Skip to content

Instantly share code, notes, and snippets.

@pp81381
Last active December 3, 2022 21:13
Show Gist options
  • Save pp81381/ff373ec910563a4b17cbe51a3f373ea6 to your computer and use it in GitHub Desktop.
Save pp81381/ff373ec910563a4b17cbe51a3f373ea6 to your computer and use it in GitHub Desktop.
UPNP example
import argparse
import asyncio
import attr
import logging
from typing import List
from async_upnp_client.search import async_search
from async_upnp_client.ssdp import SSDP_PORT
from async_upnp_client.utils import CaseInsensitiveDict
from async_upnp_client.aiohttp import AiohttpRequester
from async_upnp_client.client_factory import UpnpFactory
class UpnpExampleError(Exception):
pass
@attr.s
class Collector:
responses = attr.ib(type=List[CaseInsensitiveDict], factory=list)
async def append(self, headers: CaseInsensitiveDict) -> None:
self.responses.append(headers)
async def example(target: str) -> None:
search_target = "upnp:rootdevice"
collector = Collector()
await async_search(
search_target=search_target,
target=(target, SSDP_PORT),
async_callback=collector.append,
)
if len(collector.responses) == 0:
raise UpnpExampleError("No response")
elif len(collector.responses) > 1:
raise UpnpExampleError("More than one response")
headers = collector.responses[0]
location = headers.get("location", None)
udn = headers.get("_udn", None)
print(f"location: {location}")
print(f"udn from header: {udn}")
if location is None:
raise UpnpExampleError("No device location URL available")
requester = AiohttpRequester() # In hass we'd probably use AiohttpSessionRequester (see samsungtv component)
factory = UpnpFactory(requester)
device = await factory.async_create_device(location)
print(f"udn from device: {device.udn}")
print(f"serial number: {device.serial_number}")
logging.basicConfig(level=logging.INFO, format='%(asctime)s:%(levelname)s:%(name)s:%(lineno)d:%(message)s')
parser = argparse.ArgumentParser(description='Communicate with arcam receivers.')
parser.add_argument('--host', required=True)
args = parser.parse_args()
asyncio.run(example(args.host))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment