Skip to content

Instantly share code, notes, and snippets.

@relevitt
Last active November 22, 2016 05:01
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save relevitt/a918492bf4626e89a23e to your computer and use it in GitHub Desktop.
Save relevitt/a918492bf4626e89a23e to your computer and use it in GitHub Desktop.
SoCo Events module rewritten to use twisted
# -*- coding: utf-8 -*-
"""Classes to handle Sonos UPnP Events and Subscriptions."""
#from __future__ import unicode_literals
import logging
import socket
import time
from . import config
from .compat import Queue
from .data_structures import from_didl_string
from .exceptions import SoCoException
from .utils import camel_to_underscore
from .xml import XML
from .events_twisted_utils import httpRequest
from .events_twisted_utils import serialised
from .events_twisted_utils import printError
from twisted.internet import reactor
from twisted.internet import task
from twisted.web.server import Site
from twisted.web.resource import Resource
import twisted.internet.error
log = logging.getLogger(__name__) # pylint: disable=C0103
def parse_event_xml(xml_event):
"""Parse the body of a UPnP event.
Args:
xml_event (bytes): bytes containing the body of the event encoded
with utf-8.
Returns:
dict: A dict with keys representing the evented variables. The
relevant value will usually be a string representation of the
variable's value, but may on occasion be:
* a dict (eg when the volume changes, the value will itself be a
dict containing the volume for each channel:
:code:`{'Volume': {'LF': '100', 'RF': '100', 'Master': '36'}}`)
* an instance of a `DidlObject` subclass (eg if it represents
track metadata).
Example:
**************************************************************
* This version of events.py has been modified to work with *
* twisted.internet. It assumes the application has a running *
* twisted reactor. *
**************************************************************
Run this code, and change your volume, tracks etc::
from twisted.internet import reactor
import soco
from pprint import pprint
from soco.events import event_listener
def print_event(event):
try:
pprint (event.variables)
except Exception as e:
print 'There was an error in print_event:', e
def main():
# pick a device at random and use it to get the group coordinator
device = soco.discover().pop()
device = device.group.coordinator
print (device.player_name)
sub = device.renderingControl.subscribe()
sub2 = device.avTransport.subscribe()
sub.callback = print_event
sub2.callback = print_event
def before_shutdown():
sub.unsubscribe()
sub2.unsubscribe()
event_listener.stop()
reactor.addSystemEventTrigger('before', 'shutdown', before_shutdown)
if __name__=='__main__':
reactor.callWhenRunning(main)
reactor.run()
"""
result = {}
tree = XML.fromstring(xml_event)
# property values are just under the propertyset, which
# uses this namespace
properties = tree.findall(
'{urn:schemas-upnp-org:event-1-0}property')
for prop in properties:
for variable in prop:
# Special handling for a LastChange event specially. For details on
# LastChange events, see
# http://upnp.org/specs/av/UPnP-av-RenderingControl-v1-Service.pdf
# and http://upnp.org/specs/av/UPnP-av-AVTransport-v1-Service.pdf
if variable.tag == "LastChange":
last_change_tree = XML.fromstring(
variable.text.encode('utf-8'))
# We assume there is only one InstanceID tag. This is true for
# Sonos, as far as we know.
# InstanceID can be in one of two namespaces, depending on
# whether we are looking at an avTransport event or a
# renderingControl event, so we need to look for both
instance = last_change_tree.find(
"{urn:schemas-upnp-org:metadata-1-0/AVT/}InstanceID")
if instance is None:
instance = last_change_tree.find(
"{urn:schemas-upnp-org:metadata-1-0/RCS/}InstanceID")
if instance is None:
instance = last_change_tree.find("{urn:schemas-sonos-com:metadata-1-0/Queue/}QueueID")
# Look at each variable within the LastChange event
for last_change_var in instance:
tag = last_change_var.tag
# Remove any namespaces from the tags
if tag.startswith('{'):
tag = tag.split('}', 1)[1]
# Un-camel case it
tag = camel_to_underscore(tag)
# Now extract the relevant value for the variable.
# The UPnP specs suggest that the value of any variable
# evented via a LastChange Event will be in the 'val'
# attribute, but audio related variables may also have a
# 'channel' attribute. In addition, it seems that Sonos
# sometimes uses a text value instead: see
# http://forums.sonos.com/showthread.php?t=34663
value = last_change_var.get('val')
if value is None:
value = last_change_var.text
# If DIDL metadata is returned, convert it to a music
# library data structure
if value.startswith('<DIDL-Lite'):
value = from_didl_string(value)[0]
channel = last_change_var.get('channel')
if channel is not None:
if result.get(tag) is None:
result[tag] = {}
result[tag][channel] = value
else:
result[tag] = value
else:
result[camel_to_underscore(variable.tag)] = variable.text
return result
class Event(object):
"""A read-only object representing a received event.
The values of the evented variables can be accessed via the ``variables``
dict, or as attributes on the instance itself. You should treat all
attributes as read-only.
Args:
sid (str): the subscription id.
seq (str): the event sequence number for that subscription.
timestamp (str): the time that the event was received (from Python's
`time.time` function).
service (str): the service which is subscribed to the event.
variables (dict, optional): contains the ``{names: values}`` of the
evented variables. Defaults to `None`.
Raises:
AttributeError: Not all attributes are returned with each event. An
`AttributeError` will be raised if you attempt to access as an
attribute a variable which was not returned in the event.
Example:
>>> print event.variables['transport_state']
'STOPPED'
>>> print event.transport_state
'STOPPED'
"""
# pylint: disable=too-few-public-methods, too-many-arguments
def __init__(self, sid, seq, service, timestamp, variables=None):
# Initialisation has to be done like this, because __setattr__ is
# overridden, and will not allow direct setting of attributes
self.__dict__['sid'] = sid
self.__dict__['seq'] = seq
self.__dict__['timestamp'] = timestamp
self.__dict__['service'] = service
self.__dict__['variables'] = variables if variables is not None else {}
def __getattr__(self, name):
if name in self.variables:
return self.variables[name]
else:
raise AttributeError('No such attribute: %s' % name)
def __setattr__(self, name, value):
"""Disable (most) attempts to set attributes.
This is not completely foolproof. It just acts as a warning! See
`object.__setattr__`.
"""
raise TypeError('Event object does not support attribute assignment')
class EventNotifyHandler(Resource):
"""Handles HTTP ``NOTIFY`` Verbs sent to the listener server."""
isLeaf = True
def render_NOTIFY(self, request): # pylint: disable=invalid-name
"""Serve a ``NOTIFY`` request.
A ``NOTIFY`` request will be sent by a Sonos device when a state
variable changes. See the `UPnP Spec §4.3 [pdf]
<http://upnp.org/specs/arch/UPnP-arch
-DeviceArchitecture-v1.1.pdf>`_ for details.
"""
timestamp = time.time()
seq = request.requestHeaders.getRawHeaders('Seq')[0] # Event sequence number
sid = request.requestHeaders.getRawHeaders('Sid')[0] # Event Subscription Identifier
content = request.content.read()
# find the relevant subscription from the sid
if sid in _sid_to_subscription.keys():
subscription = _sid_to_subscription[sid]
service = subscription.service
log.info(
"Event %s received for %s service at %s", seq,
service.service_id, timestamp)
log.debug("Event content: %s", content)
variables = parse_event_xml(content)
# Build the Event object
event = Event(sid, seq, service, timestamp, variables)
# pass the event details on to the service so it can update its cache.
# pylint: disable=protected-access
service._update_cache_on_event(event)
# Pass the event to the subscription for handling
subscription.notify(event)
return 'OK'
def log_message(self, fmt, *args):
# Divert standard webserver logging to the debug log
log.debug(fmt, *args)
class EventListener(object):
"""The Event Listener.
Runs an http server which is an endpoint for ``NOTIFY``
requests from Sonos devices.
"""
def __init__(self):
super(EventListener, self).__init__()
#: `bool`: Indicates whether the server is currently running
self.is_running = False
#: `tuple`: The address (ip, port) on which the server is
#: configured to listen.
# Empty for the moment. (It is set in `start`)
self.address = ()
def start(self, any_zone):
"""Start the event listener listening on the local machine at port 1400
(default)
Make sure that your firewall allows connections to this port
Args:
any_zone (SoCo): Any Sonos device on the network. It does not
matter which device. It is used only to find a local IP address
reachable by the Sonos net.
Note:
The port on which the event listener attempts to listen is configurable.
See `config.EVENT_LISTENER_PORT`. If this port is unavailable, the event
listener will attempt to listen on the next available port, within a
range of 100 from config.EVENT_LISTENER_PORT.
"""
self.is_running = True
factory = Site(EventNotifyHandler())
for port in range(config.EVENT_LISTENER_PORT, config.EVENT_LISTENER_PORT + 100):
try:
self.port = reactor.listenTCP(port, factory)
log.info("Event listener started")
# Find our local network IP address which is accessible to the
# Sonos net, see http://stackoverflow.com/q/166506
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
temp_sock.connect((any_zone.ip_address, port))
ip_address = temp_sock.getsockname()[0]
temp_sock.close()
self.address = (ip_address, port)
log.info("Event listener running on %s", self.address)
break
except twisted.internet.error.CannotListenError, e:
print "Port %d is busy: %s" % (port, e)
continue
def stop(self):
"""Stop the event listener."""
if not self.is_running:
return
self.is_running = False
port, self.port = self.port, None
port.stopListening()
log.info("Event listener stopped")
class Subscription(object):
"""A class representing the subscription to a UPnP event."""
# pylint: disable=too-many-instance-attributes
def __init__(self, service, event_queue=None):
"""
Args:
service (Service): The SoCo `Service` to which the subscription
should be made.
event_queue (:class:`~queue.Queue`): A queue on which received
events will be put. If not specified, a queue will be
created and used.
"""
super(Subscription, self).__init__()
self.service = service
#: `str`: A unique ID for this subscription
self.sid = None
#: `int`: The amount of time in seconds until the subscription expires.
self.timeout = None
#: `bool`: An indication of whether the subscription is subscribed.
self.is_subscribed = False
#: :class:`~queue.Queue`: The queue on which events are placed.
self.events = Queue() if event_queue is None else event_queue
#: `int`: The period (seconds) for which the subscription is requested
self.requested_timeout = None
# A flag to make sure that an unsubscribed instance is not
# resubscribed
self._has_been_unsubscribed = False
# The time when the subscription was made
self._timestamp = None
# Used to keep track of the auto_renew loop
self._auto_renew_loop = None
# A callback function to be called whenever an event is received.
# If self.callback is set and is callable, the events queue (self.events) won't be used.
# The callback function will be called with an Event object as the only argument.
self.callback = None
@serialised
def subscribe(self, requested_timeout=None, auto_renew=False):
"""Subscribe to the service.
If requested_timeout is provided, a subscription valid for that number
of seconds will be requested, but not guaranteed. Check
`timeout` on return to find out what period of validity is
actually allocated.
Note:
SoCo will try to unsubscribe any subscriptions which are still
subscribed on program termination, but it is good practice for
you to clean up by making sure that you call :meth:`unsubscribe`
yourself.
Args:
requested_timeout(int, optional): The timeout to be requested.
auto_renew (bool, optional): If `True`, renew the subscription
automatically shortly before timeout. Default `False`.
"""
# TIMEOUT is provided for in the UPnP spec, but it is not clear if
# Sonos pays any attention to it. A timeout of 86400 secs always seems
# to be allocated
self.requested_timeout = requested_timeout
if self._has_been_unsubscribed:
raise SoCoException(
'Cannot resubscribe instance once unsubscribed')
service = self.service
# The event listener must be running, so start it if not
if not event_listener.is_running:
event_listener.start(service.soco)
# an event subscription looks like this:
# SUBSCRIBE publisher path HTTP/1.1
# HOST: publisher host:publisher port
# CALLBACK: <delivery URL>
# NT: upnp:event
# TIMEOUT: Second-requested subscription duration (optional)
# pylint: disable=unbalanced-tuple-unpacking
ip_address, port = event_listener.address
headers = {
'Callback': ['<http://{0}:{1}>'.format(ip_address, port)],
'NT': ['upnp:event']
}
if requested_timeout is not None:
headers["TIMEOUT"] = ["Second-{0}".format(requested_timeout)]
d = httpRequest(
'SUBSCRIBE', service.base_url + service.event_subscription_url,
headers)
def process(response):
self.sid = response.headers.getRawHeaders('Sid')[0]
timeout = response.headers.getRawHeaders('Timeout')[0]
# According to the spec, timeout can be "infinite" or "second-123"
# where 123 is a number of seconds. Sonos uses "Second-123" (with a
# capital letter)
if timeout.lower() == 'infinite':
self.timeout = None
else:
self.timeout = int(timeout.lstrip('Second-'))
self._timestamp = time.time()
self.is_subscribed = True
log.info(
"Subscribed to %s, sid: %s",
service.base_url + service.event_subscription_url, self.sid)
# Add the sid to the sid to subscription mapping so it can be looked up
# by sid
_sid_to_subscription[self.sid] = self
# Register this subscription to be unsubscribed at exit if still alive
reactor.addSystemEventTrigger('before', 'shutdown', self.unsubscribe)
# Set up auto_renew
if not auto_renew:
return
# Autorenew just before expiry, say at 85% of self.timeout seconds
interval = self.timeout * 85 / 100
self._auto_renew_loop = task.LoopingCall(self.renew)
self._auto_renew_loop.start(interval)
d.addCallback(process)
d.addErrback(printError)
return d
@serialised
def renew(self, requested_timeout=None):
"""Renew the event subscription.
You should not try to renew a subscription which has been
unsubscribed, or once it has expired.
Args:
requested_timeout (int, optional): The period for which a renewal
request should be made. If None (the default), use the timeout
requested on subscription.
"""
log.info("Autorenewing subscription %s", self.sid)
if self._has_been_unsubscribed:
raise SoCoException(
'Cannot renew subscription once unsubscribed')
if not self.is_subscribed:
raise SoCoException(
'Cannot renew subscription before subscribing')
if self.time_left == 0:
raise SoCoException(
'Cannot renew subscription after expiry')
# SUBSCRIBE publisher path HTTP/1.1
# HOST: publisher host:publisher port
# SID: uuid:subscription UUID
# TIMEOUT: Second-requested subscription duration (optional)
headers = {
'SID': [self.sid]
}
if requested_timeout is None:
requested_timeout = self.requested_timeout
if requested_timeout is not None:
headers["TIMEOUT"] = ["Second-{0}".format(requested_timeout)]
d = httpRequest(
'SUBSCRIBE', self.service.base_url + self.service.event_subscription_url,
headers)
def process(response):
timeout = response.headers.getRawHeaders('Timeout')[0]
# According to the spec, timeout can be "infinite" or "second-123"
# where 123 is a number of seconds. Sonos uses "Second-123" (with a
# a capital letter)
if timeout.lower() == 'infinite':
self.timeout = None
else:
self.timeout = int(timeout.lstrip('Second-'))
self._timestamp = time.time()
self.is_subscribed = True
log.info(
"Renewed subscription to %s, sid: %s",
self.service.base_url + self.service.event_subscription_url,
self.sid)
d.addCallback(process)
d.addErrback(printError)
return d
@serialised
def unsubscribe(self):
"""Unsubscribe from the service's events.
Once unsubscribed, a Subscription instance should not be reused
"""
# Trying to unsubscribe if already unsubscribed, or not yet
# subscribed, fails silently
if self._has_been_unsubscribed or not self.is_subscribed:
return
self.is_subscribed = False
# Cancel any auto renew
if self._auto_renew_loop:
self._auto_renew_loop.stop()
self._auto_renew_loop = None
self._timestamp = None
self.events = None
self.callback = None
# remove queue from sid to subscription mapping
try:
del _sid_to_subscription[self.sid]
except KeyError:
pass
self._has_been_unsubscribed = True
log.info(
"Unsubscribed from %s, sid: %s",
self.service.base_url + self.service.event_subscription_url,
self.sid)
return self._cancel_subscription()
def _cancel_subscription(self):
# Send an unsubscribe request like this:
# UNSUBSCRIBE publisher path HTTP/1.1
# HOST: publisher host:publisher port
# SID: uuid:subscription UUID
headers = {
'SID': [self.sid]
}
d = httpRequest(
'UNSUBSCRIBE', self.service.base_url + self.service.event_subscription_url,
headers)
# If the network is down, keep trying every minute
def success(result):
self.service = None
self.sid = None
def error(e):
if e.check(twisted.internet.error.ConnectError):
task.deferLater(reactor, 60, self._cancel_subscription)
else:
printError(e)
d.addCallback(success)
d.addErrback(error)
return d
@serialised
def notify(self, event):
if self._has_been_unsubscribed:
return
if self.callback and hasattr(self.callback, '__call__'):
self.callback(event)
else:
self.events.put(event)
@property
def time_left(self):
"""
`int`: The amount of time left until the subscription expires (seconds)
If the subscription is unsubscribed (or not yet subscribed),
`time_left` is 0.
"""
if self._timestamp is None:
return 0
else:
time_left = self.timeout - (time.time() - self._timestamp)
return time_left if time_left > 0 else 0
# pylint: disable=C0103
event_listener = EventListener()
# Used to store a mapping of sids to subsciption instances
_sid_to_subscription = {}
# -*- coding: utf-8 -*-
from collections import deque
from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import Agent, BrowserLikeRedirectAgent
from twisted.web.http_headers import Headers
from twisted.internet.error import ConnectError
class Serialiser():
def __init__(self):
self.queue = deque([])
def serialise(self, fn, *args, **kwargs):
def execute(result, fn, *args, **kwargs):
return fn(*args, **kwargs)
def next(result):
self.queue.popleft()
self.callnext()
return result
d = defer.Deferred()
d.addCallback(execute, fn, *args, **kwargs)
d.addErrback(printError)
d.addCallback(next)
self.queue.append(d)
if len(self.queue) == 1:
self.callnext()
return d
def callnext(self):
if len(self.queue):
d = self.queue[0]
d.callback(None)
serialise = Serialiser().serialise
def serialised(fn):
def inner(self, *args, **kwargs):
return serialise(fn, self, *args, **kwargs)
inner.fn = fn
return inner
def httpRequest(method, url, headers={}):
agent = BrowserLikeRedirectAgent(Agent(reactor))
d = agent.request(
method,
url.encode('latin-1'),
Headers(headers)
)
return d
def printError(e):
if e.check(ConnectError):
print 'Connection error:', e.value
return
print '------------------------error.printError: traceback------------------------'
print e.getTraceback()
print '---------------------------------------------------------------------------'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment