Skip to content

Instantly share code, notes, and snippets.

@elupus
Last active February 25, 2021 00:47
Show Gist options
  • Save elupus/d52ef3f7c3f0706638d7beba5e7ab58a to your computer and use it in GitHub Desktop.
Save elupus/d52ef3f7c3f0706638d7beba5e7ab58a to your computer and use it in GitHub Desktop.
import asyncio
from asyncio.events import AbstractEventLoop
import logging
import socket
import struct
from typing import Optional, Tuple
USER_AGENT = "HassTest"
def create_listen_socket(addrinfo_iface, addrinfo_mcast):
sock = socket.socket(addrinfo_mcast[0], socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
group_bin = socket.inet_pton(addrinfo_mcast[0], addrinfo_mcast[4][0])
iface_bin = socket.inet_pton(addrinfo_iface[0], addrinfo_iface[4][0])
print(addrinfo_mcast[4])
print(addrinfo_iface[4])
bind_address = (addrinfo_mcast[4][0], *addrinfo_iface[4][1:])
print(bind_address)
sock.bind(bind_address)
if addrinfo_mcast[0] == socket.AF_INET: # IPv4
mreq = group_bin + iface_bin
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
else:
ifn = struct.pack("@I", addrinfo_iface[4][3])
mreq = group_bin + ifn
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
return sock
def create_search_socket(addrinfo):
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
if addrinfo[0] == socket.AF_INET6:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IP_MULTICAST_TTL, 2)
#sock.connect(addrinfo[4])
return sock
class SsdpProtocol(asyncio.DatagramProtocol):
def __init__(self, addrinfo):
self.addrinfo = addrinfo
self.transport = None
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print('Received data from {!r}\n{}\n\n'.format(addr, data.decode("utf8")))
def error_received(self, exc):
print('Error %s', exc)
def close(self):
print('Closed')
self.transport.close()
def msearch(self, st, mx: Optional[int] = None, host: Optional[str] = None, address: Optional[Tuple[str, int]] = None):
data = [
f'M-SEARCH * HTTP/1.1',
f'ST: {st}',
f'MAN: "ssdp:discover"',
f'USER-AGENT: {USER_AGENT}'
]
if mx is not None:
data.append(f'MX: {mx:d}')
if host is None:
if self.addrinfo[0] == socket.AF_INET6:
host = f"[{self.addrinfo[4][0]}]:{self.addrinfo[4][1]}"
else:
host = f"{self.addrinfo[4][0]}:{self.addrinfo[4][1]}"
data.append(f'HOST: {host}')
data.append('')
data.append('')
print(data)
if address is None:
address = self.addrinfo[4]
print(address)
self.transport.sendto("\r\n".join(data).encode('utf-8'), address)
@staticmethod
async def create_searcher(loop: AbstractEventLoop, remote: Tuple[str, int]):
addrinfo = socket.getaddrinfo(remote[0], remote[1], type=socket.SOCK_DGRAM)
print(addrinfo)
sock = create_search_socket(addrinfo[0])
_, protocol = await loop.create_datagram_endpoint(
lambda: SsdpProtocol(addrinfo[0]),
sock=sock,
)
return protocol
@staticmethod
async def create_listener(loop: AbstractEventLoop, local: Tuple[str, int], remote: Tuple[str, int]):
addrinfo_mcast = socket.getaddrinfo(remote[0], remote[1], flags=socket.AI_PASSIVE, type=socket.SOCK_DGRAM)
addrinfo_iface = socket.getaddrinfo(local[0], local[0], family=addrinfo_mcast[0][0], flags=socket.AI_PASSIVE, type=socket.SOCK_DGRAM)
sock = create_listen_socket(addrinfo_iface[0], addrinfo_mcast[0])
_, protocol = await loop.create_datagram_endpoint(
lambda: SsdpProtocol(addrinfo_mcast[0]),
sock=sock,
)
return protocol
async def listener(loop):
#await SsdpProtocol.create_listener(loop, ("localhost", 1900), ("239.255.255.250", 1900))
#protocol = await SsdpProtocol.create_listener(loop, ("192.168.16.116", 1900), ("239.255.255.250", 1900))
#protocol.msearch("ssdp:all", 0, "239.255.255.250:1900")
#await SsdpProtocol.create_listener(loop, ("::1%en0", 1900), "FF02::C")
pass
async def searcher(loop):
protocol = await SsdpProtocol.create_searcher(loop, ("239.255.255.250", 1900))
#protocol = await SsdpProtocol.create_searcher(loop, ("FF02::C", 1900))
#protocol = await SsdpProtocol.create_searcher(loop, ("255.255.255.255", 1900))
protocol.msearch("ssdp:all", 0, address=("192.168.16.36", 1900))
#protocol.msearch("ssdp:all", 0, address=("FF02::C", 1900, 0, 1))
#protocol.msearch("ssdp:all", 5)
#protocol.msearch("upnp:rootdevice", 2, "239.255.255.250:1900")
return protocol
def run():
loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)
#protocol = loop.run_until_complete(listener(loop))
protocol = loop.run_until_complete(searcher(loop))
loop.run_forever()
loop.close()
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment