Skip to content

Instantly share code, notes, and snippets.

@rcarmo rcarmo/ssdp.py
Last active Sep 24, 2018

Embed
What would you like to do?
Quick and dirty SSDP/UPNP/Mediaroom discovery
from http.client import HTTPResponse
from io import BytesIO
from config import log
from struct import pack
from socket import AF_INET, SOCK_DGRAM, INADDR_ANY, IPPROTO_IP, IPPROTO_UDP, IP_ADD_MEMBERSHIP, IP_MULTICAST_TTL, SOL_SOCKET, SO_REUSEADDR, SO_REUSEPORT, getaddrinfo, socket, setdefaulttimeout, inet_pton, timeout as SocketTimeout
from utils import etree_to_dict
from xml.etree import ElementTree
from time import time
from dateutil.parser import parse as parse_date
UPNP_PORT=1900
MEDIAROOM_PORT=8082
ADDRESS_GROUP="239.255.255.250"
class SSDPResponse(object):
class _wrapper(BytesIO):
def makefile(self, *args, **kw):
return self
def __init__(self, response, addr):
r = HTTPResponse(self._wrapper(response))
r.begin()
self.location = r.getheader("location")
self.usn = r.getheader("usn")
self.st = r.getheader("st")
self.cache = r.getheader("cache-control").split("=")[1]
def __repr__(self):
return "<SSDPResponse({location}, {st}, {usn})>".format(**self.__dict__)
class MediaroomResponse(object):
class _wrapper(BytesIO):
def makefile(self, *args, **kw):
return self
def __init__(self, data, addr):
while data[-1:] == '\0':
data = data[:-1]
while data[:6] != b"NOTIFY":
data = data[1:]
self.data = data
log.debug(data)
self.location = "{}:{}".format(*addr)
for line in map(lambda x: x.strip(), data.decode().split('\n')):
if line.startswith("x-type"):
self.type = line[line.find(":")+2:]
elif line.startswith("x-filter"):
self.filter = line[line.find(":")+2:]
elif line.startswith("x-lastUserActivity"):
self.last_user_activity = parse_date(line[line.find(":")+2:])
elif line.startswith("x-device"):
self.device = line[line.find(":")+2:]
elif line.startswith("x-debug") or\
line.startswith("x-location") or\
line.startswith("NOTIFY") or\
line.startswith("\r"):
pass
elif line.startswith("<"):
self.status = etree_to_dict(ElementTree.fromstring(line))['node']
self.tune = self.status['activities'].get('tune')
def __repr__(self):
return "<MediaroomResponse({location}, {last_user_activity}, {tune})>".format(**self.__dict__)
def discover(service='upnp:rootdevice', timeout=5, retries=1, mx=3, port=1900):
group = (ADDRESS_GROUP, port)
message = "\r\n".join([
'M-SEARCH * HTTP/1.1',
'HOST:{0}:{1}',
'MAN:"ssdp:discover"',
'ST:{st}','MX:{mx}','',''])
setdefaulttimeout(timeout)
responses = {}
for _ in range(retries):
sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setsockopt(IPPROTO_IP, IP_MULTICAST_TTL, 2)
try:
sock.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
except:
log.debug("cannot reuse port in this OS")
pass
if port == MEDIAROOM_PORT:
# build a multicast listener instead
addrinfo = getaddrinfo(ADDRESS_GROUP, None)[0]
group = inet_pton(addrinfo[0], addrinfo[4][0])
membership = group + pack('=I', INADDR_ANY)
sock.setsockopt(IPPROTO_IP, IP_ADD_MEMBERSHIP, membership)
sock.bind(('', port))
until = time() + timeout
while True:
if time() >= until:
break
try:
response = MediaroomResponse(*sock.recvfrom(1024))
responses[response.location] = response
except SocketTimeout:
break
else:
message_bytes = message.format(*group, st=service, mx=mx).encode('utf-8')
sock.sendto(message_bytes, group)
while True:
try:
response = SSDPResponse(*sock.recvfrom(1024))
responses[response.location] = response
except SocketTimeout:
break
return list(responses.values())
if __name__ == '__main__':
#print(discover('urn:dial-multiscreen-org:service:dial:1'))
print(discover('upnp:rootdevice', retries=1, mx=2, port=MEDIAROOM_PORT))
#print(discover('upnp:rootdevice', retries=5, mx=2, port=UPNP_PORT))
#print(discover('urn:schemas-upnp-org:device:tvdevice:1'))
from collections import defaultdict
def etree_to_dict(t, strip_namespaces=True):
if "}" in t.tag and strip_namespaces == True:
t.tag = t.tag.split('}',1)[1]
d = {t.tag: {} if t.attrib else None}
children = list(t)
if children:
dd = defaultdict(list)
for dc in map(etree_to_dict, children):
for k, v in dc.items():
dd[k].append(v)
d = {t.tag: {k: v[0] if len(v) == 1 else v
for k, v in dd.items()}}
if t.attrib:
d[t.tag].update(('@' + k, v)
for k, v in t.attrib.items())
if t.text:
text = t.text.strip()
if children or t.attrib:
if text:
d[t.tag]['#text'] = text
else:
d[t.tag] = text
return d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.