Last active
November 22, 2016 05:01
-
-
Save relevitt/a918492bf4626e89a23e to your computer and use it in GitHub Desktop.
SoCo Events module rewritten to use twisted
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Classes to handle Sonos UPnP Events and Subscriptions.""" | |
#from __future__ import unicode_literals | |
import logging | |
import socket | |
import time | |
from . import config | |
from .compat import Queue | |
from .data_structures import from_didl_string | |
from .exceptions import SoCoException | |
from .utils import camel_to_underscore | |
from .xml import XML | |
from .events_twisted_utils import httpRequest | |
from .events_twisted_utils import serialised | |
from .events_twisted_utils import printError | |
from twisted.internet import reactor | |
from twisted.internet import task | |
from twisted.web.server import Site | |
from twisted.web.resource import Resource | |
import twisted.internet.error | |
log = logging.getLogger(__name__) # pylint: disable=C0103 | |
def parse_event_xml(xml_event): | |
"""Parse the body of a UPnP event. | |
Args: | |
xml_event (bytes): bytes containing the body of the event encoded | |
with utf-8. | |
Returns: | |
dict: A dict with keys representing the evented variables. The | |
relevant value will usually be a string representation of the | |
variable's value, but may on occasion be: | |
* a dict (eg when the volume changes, the value will itself be a | |
dict containing the volume for each channel: | |
:code:`{'Volume': {'LF': '100', 'RF': '100', 'Master': '36'}}`) | |
* an instance of a `DidlObject` subclass (eg if it represents | |
track metadata). | |
Example: | |
************************************************************** | |
* This version of events.py has been modified to work with * | |
* twisted.internet. It assumes the application has a running * | |
* twisted reactor. * | |
************************************************************** | |
Run this code, and change your volume, tracks etc:: | |
from twisted.internet import reactor | |
import soco | |
from pprint import pprint | |
from soco.events import event_listener | |
def print_event(event): | |
try: | |
pprint (event.variables) | |
except Exception as e: | |
print 'There was an error in print_event:', e | |
def main(): | |
# pick a device at random and use it to get the group coordinator | |
device = soco.discover().pop() | |
device = device.group.coordinator | |
print (device.player_name) | |
sub = device.renderingControl.subscribe() | |
sub2 = device.avTransport.subscribe() | |
sub.callback = print_event | |
sub2.callback = print_event | |
def before_shutdown(): | |
sub.unsubscribe() | |
sub2.unsubscribe() | |
event_listener.stop() | |
reactor.addSystemEventTrigger('before', 'shutdown', before_shutdown) | |
if __name__=='__main__': | |
reactor.callWhenRunning(main) | |
reactor.run() | |
""" | |
result = {} | |
tree = XML.fromstring(xml_event) | |
# property values are just under the propertyset, which | |
# uses this namespace | |
properties = tree.findall( | |
'{urn:schemas-upnp-org:event-1-0}property') | |
for prop in properties: | |
for variable in prop: | |
# Special handling for a LastChange event specially. For details on | |
# LastChange events, see | |
# http://upnp.org/specs/av/UPnP-av-RenderingControl-v1-Service.pdf | |
# and http://upnp.org/specs/av/UPnP-av-AVTransport-v1-Service.pdf | |
if variable.tag == "LastChange": | |
last_change_tree = XML.fromstring( | |
variable.text.encode('utf-8')) | |
# We assume there is only one InstanceID tag. This is true for | |
# Sonos, as far as we know. | |
# InstanceID can be in one of two namespaces, depending on | |
# whether we are looking at an avTransport event or a | |
# renderingControl event, so we need to look for both | |
instance = last_change_tree.find( | |
"{urn:schemas-upnp-org:metadata-1-0/AVT/}InstanceID") | |
if instance is None: | |
instance = last_change_tree.find( | |
"{urn:schemas-upnp-org:metadata-1-0/RCS/}InstanceID") | |
if instance is None: | |
instance = last_change_tree.find("{urn:schemas-sonos-com:metadata-1-0/Queue/}QueueID") | |
# Look at each variable within the LastChange event | |
for last_change_var in instance: | |
tag = last_change_var.tag | |
# Remove any namespaces from the tags | |
if tag.startswith('{'): | |
tag = tag.split('}', 1)[1] | |
# Un-camel case it | |
tag = camel_to_underscore(tag) | |
# Now extract the relevant value for the variable. | |
# The UPnP specs suggest that the value of any variable | |
# evented via a LastChange Event will be in the 'val' | |
# attribute, but audio related variables may also have a | |
# 'channel' attribute. In addition, it seems that Sonos | |
# sometimes uses a text value instead: see | |
# http://forums.sonos.com/showthread.php?t=34663 | |
value = last_change_var.get('val') | |
if value is None: | |
value = last_change_var.text | |
# If DIDL metadata is returned, convert it to a music | |
# library data structure | |
if value.startswith('<DIDL-Lite'): | |
value = from_didl_string(value)[0] | |
channel = last_change_var.get('channel') | |
if channel is not None: | |
if result.get(tag) is None: | |
result[tag] = {} | |
result[tag][channel] = value | |
else: | |
result[tag] = value | |
else: | |
result[camel_to_underscore(variable.tag)] = variable.text | |
return result | |
class Event(object): | |
"""A read-only object representing a received event. | |
The values of the evented variables can be accessed via the ``variables`` | |
dict, or as attributes on the instance itself. You should treat all | |
attributes as read-only. | |
Args: | |
sid (str): the subscription id. | |
seq (str): the event sequence number for that subscription. | |
timestamp (str): the time that the event was received (from Python's | |
`time.time` function). | |
service (str): the service which is subscribed to the event. | |
variables (dict, optional): contains the ``{names: values}`` of the | |
evented variables. Defaults to `None`. | |
Raises: | |
AttributeError: Not all attributes are returned with each event. An | |
`AttributeError` will be raised if you attempt to access as an | |
attribute a variable which was not returned in the event. | |
Example: | |
>>> print event.variables['transport_state'] | |
'STOPPED' | |
>>> print event.transport_state | |
'STOPPED' | |
""" | |
# pylint: disable=too-few-public-methods, too-many-arguments | |
def __init__(self, sid, seq, service, timestamp, variables=None): | |
# Initialisation has to be done like this, because __setattr__ is | |
# overridden, and will not allow direct setting of attributes | |
self.__dict__['sid'] = sid | |
self.__dict__['seq'] = seq | |
self.__dict__['timestamp'] = timestamp | |
self.__dict__['service'] = service | |
self.__dict__['variables'] = variables if variables is not None else {} | |
def __getattr__(self, name): | |
if name in self.variables: | |
return self.variables[name] | |
else: | |
raise AttributeError('No such attribute: %s' % name) | |
def __setattr__(self, name, value): | |
"""Disable (most) attempts to set attributes. | |
This is not completely foolproof. It just acts as a warning! See | |
`object.__setattr__`. | |
""" | |
raise TypeError('Event object does not support attribute assignment') | |
class EventNotifyHandler(Resource): | |
"""Handles HTTP ``NOTIFY`` Verbs sent to the listener server.""" | |
isLeaf = True | |
def render_NOTIFY(self, request): # pylint: disable=invalid-name | |
"""Serve a ``NOTIFY`` request. | |
A ``NOTIFY`` request will be sent by a Sonos device when a state | |
variable changes. See the `UPnP Spec §4.3 [pdf] | |
<http://upnp.org/specs/arch/UPnP-arch | |
-DeviceArchitecture-v1.1.pdf>`_ for details. | |
""" | |
timestamp = time.time() | |
seq = request.requestHeaders.getRawHeaders('Seq')[0] # Event sequence number | |
sid = request.requestHeaders.getRawHeaders('Sid')[0] # Event Subscription Identifier | |
content = request.content.read() | |
# find the relevant subscription from the sid | |
if sid in _sid_to_subscription.keys(): | |
subscription = _sid_to_subscription[sid] | |
service = subscription.service | |
log.info( | |
"Event %s received for %s service at %s", seq, | |
service.service_id, timestamp) | |
log.debug("Event content: %s", content) | |
variables = parse_event_xml(content) | |
# Build the Event object | |
event = Event(sid, seq, service, timestamp, variables) | |
# pass the event details on to the service so it can update its cache. | |
# pylint: disable=protected-access | |
service._update_cache_on_event(event) | |
# Pass the event to the subscription for handling | |
subscription.notify(event) | |
return 'OK' | |
def log_message(self, fmt, *args): | |
# Divert standard webserver logging to the debug log | |
log.debug(fmt, *args) | |
class EventListener(object): | |
"""The Event Listener. | |
Runs an http server which is an endpoint for ``NOTIFY`` | |
requests from Sonos devices. | |
""" | |
def __init__(self): | |
super(EventListener, self).__init__() | |
#: `bool`: Indicates whether the server is currently running | |
self.is_running = False | |
#: `tuple`: The address (ip, port) on which the server is | |
#: configured to listen. | |
# Empty for the moment. (It is set in `start`) | |
self.address = () | |
def start(self, any_zone): | |
"""Start the event listener listening on the local machine at port 1400 | |
(default) | |
Make sure that your firewall allows connections to this port | |
Args: | |
any_zone (SoCo): Any Sonos device on the network. It does not | |
matter which device. It is used only to find a local IP address | |
reachable by the Sonos net. | |
Note: | |
The port on which the event listener attempts to listen is configurable. | |
See `config.EVENT_LISTENER_PORT`. If this port is unavailable, the event | |
listener will attempt to listen on the next available port, within a | |
range of 100 from config.EVENT_LISTENER_PORT. | |
""" | |
self.is_running = True | |
factory = Site(EventNotifyHandler()) | |
for port in range(config.EVENT_LISTENER_PORT, config.EVENT_LISTENER_PORT + 100): | |
try: | |
self.port = reactor.listenTCP(port, factory) | |
log.info("Event listener started") | |
# Find our local network IP address which is accessible to the | |
# Sonos net, see http://stackoverflow.com/q/166506 | |
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
temp_sock.connect((any_zone.ip_address, port)) | |
ip_address = temp_sock.getsockname()[0] | |
temp_sock.close() | |
self.address = (ip_address, port) | |
log.info("Event listener running on %s", self.address) | |
break | |
except twisted.internet.error.CannotListenError, e: | |
print "Port %d is busy: %s" % (port, e) | |
continue | |
def stop(self): | |
"""Stop the event listener.""" | |
if not self.is_running: | |
return | |
self.is_running = False | |
port, self.port = self.port, None | |
port.stopListening() | |
log.info("Event listener stopped") | |
class Subscription(object): | |
"""A class representing the subscription to a UPnP event.""" | |
# pylint: disable=too-many-instance-attributes | |
def __init__(self, service, event_queue=None): | |
""" | |
Args: | |
service (Service): The SoCo `Service` to which the subscription | |
should be made. | |
event_queue (:class:`~queue.Queue`): A queue on which received | |
events will be put. If not specified, a queue will be | |
created and used. | |
""" | |
super(Subscription, self).__init__() | |
self.service = service | |
#: `str`: A unique ID for this subscription | |
self.sid = None | |
#: `int`: The amount of time in seconds until the subscription expires. | |
self.timeout = None | |
#: `bool`: An indication of whether the subscription is subscribed. | |
self.is_subscribed = False | |
#: :class:`~queue.Queue`: The queue on which events are placed. | |
self.events = Queue() if event_queue is None else event_queue | |
#: `int`: The period (seconds) for which the subscription is requested | |
self.requested_timeout = None | |
# A flag to make sure that an unsubscribed instance is not | |
# resubscribed | |
self._has_been_unsubscribed = False | |
# The time when the subscription was made | |
self._timestamp = None | |
# Used to keep track of the auto_renew loop | |
self._auto_renew_loop = None | |
# A callback function to be called whenever an event is received. | |
# If self.callback is set and is callable, the events queue (self.events) won't be used. | |
# The callback function will be called with an Event object as the only argument. | |
self.callback = None | |
@serialised | |
def subscribe(self, requested_timeout=None, auto_renew=False): | |
"""Subscribe to the service. | |
If requested_timeout is provided, a subscription valid for that number | |
of seconds will be requested, but not guaranteed. Check | |
`timeout` on return to find out what period of validity is | |
actually allocated. | |
Note: | |
SoCo will try to unsubscribe any subscriptions which are still | |
subscribed on program termination, but it is good practice for | |
you to clean up by making sure that you call :meth:`unsubscribe` | |
yourself. | |
Args: | |
requested_timeout(int, optional): The timeout to be requested. | |
auto_renew (bool, optional): If `True`, renew the subscription | |
automatically shortly before timeout. Default `False`. | |
""" | |
# TIMEOUT is provided for in the UPnP spec, but it is not clear if | |
# Sonos pays any attention to it. A timeout of 86400 secs always seems | |
# to be allocated | |
self.requested_timeout = requested_timeout | |
if self._has_been_unsubscribed: | |
raise SoCoException( | |
'Cannot resubscribe instance once unsubscribed') | |
service = self.service | |
# The event listener must be running, so start it if not | |
if not event_listener.is_running: | |
event_listener.start(service.soco) | |
# an event subscription looks like this: | |
# SUBSCRIBE publisher path HTTP/1.1 | |
# HOST: publisher host:publisher port | |
# CALLBACK: <delivery URL> | |
# NT: upnp:event | |
# TIMEOUT: Second-requested subscription duration (optional) | |
# pylint: disable=unbalanced-tuple-unpacking | |
ip_address, port = event_listener.address | |
headers = { | |
'Callback': ['<http://{0}:{1}>'.format(ip_address, port)], | |
'NT': ['upnp:event'] | |
} | |
if requested_timeout is not None: | |
headers["TIMEOUT"] = ["Second-{0}".format(requested_timeout)] | |
d = httpRequest( | |
'SUBSCRIBE', service.base_url + service.event_subscription_url, | |
headers) | |
def process(response): | |
self.sid = response.headers.getRawHeaders('Sid')[0] | |
timeout = response.headers.getRawHeaders('Timeout')[0] | |
# According to the spec, timeout can be "infinite" or "second-123" | |
# where 123 is a number of seconds. Sonos uses "Second-123" (with a | |
# capital letter) | |
if timeout.lower() == 'infinite': | |
self.timeout = None | |
else: | |
self.timeout = int(timeout.lstrip('Second-')) | |
self._timestamp = time.time() | |
self.is_subscribed = True | |
log.info( | |
"Subscribed to %s, sid: %s", | |
service.base_url + service.event_subscription_url, self.sid) | |
# Add the sid to the sid to subscription mapping so it can be looked up | |
# by sid | |
_sid_to_subscription[self.sid] = self | |
# Register this subscription to be unsubscribed at exit if still alive | |
reactor.addSystemEventTrigger('before', 'shutdown', self.unsubscribe) | |
# Set up auto_renew | |
if not auto_renew: | |
return | |
# Autorenew just before expiry, say at 85% of self.timeout seconds | |
interval = self.timeout * 85 / 100 | |
self._auto_renew_loop = task.LoopingCall(self.renew) | |
self._auto_renew_loop.start(interval) | |
d.addCallback(process) | |
d.addErrback(printError) | |
return d | |
@serialised | |
def renew(self, requested_timeout=None): | |
"""Renew the event subscription. | |
You should not try to renew a subscription which has been | |
unsubscribed, or once it has expired. | |
Args: | |
requested_timeout (int, optional): The period for which a renewal | |
request should be made. If None (the default), use the timeout | |
requested on subscription. | |
""" | |
log.info("Autorenewing subscription %s", self.sid) | |
if self._has_been_unsubscribed: | |
raise SoCoException( | |
'Cannot renew subscription once unsubscribed') | |
if not self.is_subscribed: | |
raise SoCoException( | |
'Cannot renew subscription before subscribing') | |
if self.time_left == 0: | |
raise SoCoException( | |
'Cannot renew subscription after expiry') | |
# SUBSCRIBE publisher path HTTP/1.1 | |
# HOST: publisher host:publisher port | |
# SID: uuid:subscription UUID | |
# TIMEOUT: Second-requested subscription duration (optional) | |
headers = { | |
'SID': [self.sid] | |
} | |
if requested_timeout is None: | |
requested_timeout = self.requested_timeout | |
if requested_timeout is not None: | |
headers["TIMEOUT"] = ["Second-{0}".format(requested_timeout)] | |
d = httpRequest( | |
'SUBSCRIBE', self.service.base_url + self.service.event_subscription_url, | |
headers) | |
def process(response): | |
timeout = response.headers.getRawHeaders('Timeout')[0] | |
# According to the spec, timeout can be "infinite" or "second-123" | |
# where 123 is a number of seconds. Sonos uses "Second-123" (with a | |
# a capital letter) | |
if timeout.lower() == 'infinite': | |
self.timeout = None | |
else: | |
self.timeout = int(timeout.lstrip('Second-')) | |
self._timestamp = time.time() | |
self.is_subscribed = True | |
log.info( | |
"Renewed subscription to %s, sid: %s", | |
self.service.base_url + self.service.event_subscription_url, | |
self.sid) | |
d.addCallback(process) | |
d.addErrback(printError) | |
return d | |
@serialised | |
def unsubscribe(self): | |
"""Unsubscribe from the service's events. | |
Once unsubscribed, a Subscription instance should not be reused | |
""" | |
# Trying to unsubscribe if already unsubscribed, or not yet | |
# subscribed, fails silently | |
if self._has_been_unsubscribed or not self.is_subscribed: | |
return | |
self.is_subscribed = False | |
# Cancel any auto renew | |
if self._auto_renew_loop: | |
self._auto_renew_loop.stop() | |
self._auto_renew_loop = None | |
self._timestamp = None | |
self.events = None | |
self.callback = None | |
# remove queue from sid to subscription mapping | |
try: | |
del _sid_to_subscription[self.sid] | |
except KeyError: | |
pass | |
self._has_been_unsubscribed = True | |
log.info( | |
"Unsubscribed from %s, sid: %s", | |
self.service.base_url + self.service.event_subscription_url, | |
self.sid) | |
return self._cancel_subscription() | |
def _cancel_subscription(self): | |
# Send an unsubscribe request like this: | |
# UNSUBSCRIBE publisher path HTTP/1.1 | |
# HOST: publisher host:publisher port | |
# SID: uuid:subscription UUID | |
headers = { | |
'SID': [self.sid] | |
} | |
d = httpRequest( | |
'UNSUBSCRIBE', self.service.base_url + self.service.event_subscription_url, | |
headers) | |
# If the network is down, keep trying every minute | |
def success(result): | |
self.service = None | |
self.sid = None | |
def error(e): | |
if e.check(twisted.internet.error.ConnectError): | |
task.deferLater(reactor, 60, self._cancel_subscription) | |
else: | |
printError(e) | |
d.addCallback(success) | |
d.addErrback(error) | |
return d | |
@serialised | |
def notify(self, event): | |
if self._has_been_unsubscribed: | |
return | |
if self.callback and hasattr(self.callback, '__call__'): | |
self.callback(event) | |
else: | |
self.events.put(event) | |
@property | |
def time_left(self): | |
""" | |
`int`: The amount of time left until the subscription expires (seconds) | |
If the subscription is unsubscribed (or not yet subscribed), | |
`time_left` is 0. | |
""" | |
if self._timestamp is None: | |
return 0 | |
else: | |
time_left = self.timeout - (time.time() - self._timestamp) | |
return time_left if time_left > 0 else 0 | |
# pylint: disable=C0103 | |
event_listener = EventListener() | |
# Used to store a mapping of sids to subsciption instances | |
_sid_to_subscription = {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from collections import deque | |
from twisted.internet import defer | |
from twisted.internet import reactor | |
from twisted.web.client import Agent, BrowserLikeRedirectAgent | |
from twisted.web.http_headers import Headers | |
from twisted.internet.error import ConnectError | |
class Serialiser(): | |
def __init__(self): | |
self.queue = deque([]) | |
def serialise(self, fn, *args, **kwargs): | |
def execute(result, fn, *args, **kwargs): | |
return fn(*args, **kwargs) | |
def next(result): | |
self.queue.popleft() | |
self.callnext() | |
return result | |
d = defer.Deferred() | |
d.addCallback(execute, fn, *args, **kwargs) | |
d.addErrback(printError) | |
d.addCallback(next) | |
self.queue.append(d) | |
if len(self.queue) == 1: | |
self.callnext() | |
return d | |
def callnext(self): | |
if len(self.queue): | |
d = self.queue[0] | |
d.callback(None) | |
serialise = Serialiser().serialise | |
def serialised(fn): | |
def inner(self, *args, **kwargs): | |
return serialise(fn, self, *args, **kwargs) | |
inner.fn = fn | |
return inner | |
def httpRequest(method, url, headers={}): | |
agent = BrowserLikeRedirectAgent(Agent(reactor)) | |
d = agent.request( | |
method, | |
url.encode('latin-1'), | |
Headers(headers) | |
) | |
return d | |
def printError(e): | |
if e.check(ConnectError): | |
print 'Connection error:', e.value | |
return | |
print '------------------------error.printError: traceback------------------------' | |
print e.getTraceback() | |
print '---------------------------------------------------------------------------' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment