Last active November 22, 2016 05:01
SoCo Events module rewritten to use twisted
# -*- coding: utf-8 -*-
"""Classes to handle Sonos UPnP Events and Subscriptions."""
#from __future__ import unicode_literals
import logging
import socket
import time
from . import config
from .compat import Queue
from .data_structures import from_didl_string
from .exceptions import SoCoException
from .utils import camel_to_underscore
from .xml import XML
from .events_twisted_utils import httpRequest
from .events_twisted_utils import serialised
from .events_twisted_utils import printError
from twisted.internet import reactor
from twisted.internet import task
from twisted.web.server import Site
from twisted.web.resource import Resource
import twisted.internet.error
log = logging.getLogger(__name__) # pylint: disable=C0103
def parse_event_xml(xml_event):
"""Parse the body of a UPnP event.
xml_event (bytes): bytes containing the body of the event encoded
with utf-8.
dict: A dict with keys representing the evented variables. The
relevant value will usually be a string representation of the
variable's value, but may on occasion be:
* a dict (eg when the volume changes, the value will itself be a
dict containing the volume for each channel:
:code:`{'Volume': {'LF': '100', 'RF': '100', 'Master': '36'}}`)
* an instance of a `DidlObject` subclass (eg if it represents
track metadata).
* This version of has been modified to work with *
* twisted.internet. It assumes the application has a running *
* twisted reactor. *
Run this code, and change your volume, tracks etc::
from twisted.internet import reactor
import soco
from pprint import pprint
from import event_listener
def print_event(event):
pprint (event.variables)
except Exception as e:
print 'There was an error in print_event:', e
def main():
# pick a device at random and use it to get the group coordinator
device =
device =
print (device.player_name)
sub = device.renderingControl.subscribe()
sub2 = device.avTransport.subscribe()
sub.callback = print_event
sub2.callback = print_event
def before_shutdown():
reactor.addSystemEventTrigger('before', 'shutdown', before_shutdown)
if __name__=='__main__':
result = {}
tree = XML.fromstring(xml_event)
# property values are just under the propertyset, which
# uses this namespace
properties = tree.findall(
for prop in properties:
for variable in prop:
# Special handling for a LastChange event specially. For details on
# LastChange events, see
# and
if variable.tag == "LastChange":
last_change_tree = XML.fromstring(
# We assume there is only one InstanceID tag. This is true for
# Sonos, as far as we know.
# InstanceID can be in one of two namespaces, depending on
# whether we are looking at an avTransport event or a
# renderingControl event, so we need to look for both
instance = last_change_tree.find(
if instance is None:
instance = last_change_tree.find(
if instance is None:
instance = last_change_tree.find("{urn:schemas-sonos-com:metadata-1-0/Queue/}QueueID")
# Look at each variable within the LastChange event
for last_change_var in instance:
tag = last_change_var.tag
# Remove any namespaces from the tags
if tag.startswith('{'):
tag = tag.split('}', 1)[1]
# Un-camel case it
tag = camel_to_underscore(tag)
# Now extract the relevant value for the variable.
# The UPnP specs suggest that the value of any variable
# evented via a LastChange Event will be in the 'val'
# attribute, but audio related variables may also have a
# 'channel' attribute. In addition, it seems that Sonos
# sometimes uses a text value instead: see
value = last_change_var.get('val')
if value is None:
value = last_change_var.text
# If DIDL metadata is returned, convert it to a music
# library data structure
if value.startswith('<DIDL-Lite'):
value = from_didl_string(value)[0]
channel = last_change_var.get('channel')
if channel is not None:
if result.get(tag) is None:
result[tag] = {}
result[tag][channel] = value
result[tag] = value
result[camel_to_underscore(variable.tag)] = variable.text
return result
class Event(object):
"""A read-only object representing a received event.
The values of the evented variables can be accessed via the ``variables``
dict, or as attributes on the instance itself. You should treat all
attributes as read-only.
sid (str): the subscription id.
seq (str): the event sequence number for that subscription.
timestamp (str): the time that the event was received (from Python's
`time.time` function).
service (str): the service which is subscribed to the event.
variables (dict, optional): contains the ``{names: values}`` of the
evented variables. Defaults to `None`.
AttributeError: Not all attributes are returned with each event. An
`AttributeError` will be raised if you attempt to access as an
attribute a variable which was not returned in the event.
>>> print event.variables['transport_state']
>>> print event.transport_state
# pylint: disable=too-few-public-methods, too-many-arguments
def __init__(self, sid, seq, service, timestamp, variables=None):
# Initialisation has to be done like this, because __setattr__ is
# overridden, and will not allow direct setting of attributes
self.__dict__['sid'] = sid
self.__dict__['seq'] = seq
self.__dict__['timestamp'] = timestamp
self.__dict__['service'] = service
self.__dict__['variables'] = variables if variables is not None else {}
def __getattr__(self, name):
if name in self.variables:
return self.variables[name]
raise AttributeError('No such attribute: %s' % name)
def __setattr__(self, name, value):
"""Disable (most) attempts to set attributes.
This is not completely foolproof. It just acts as a warning! See
raise TypeError('Event object does not support attribute assignment')
class EventNotifyHandler(Resource):
"""Handles HTTP ``NOTIFY`` Verbs sent to the listener server."""
isLeaf = True
def render_NOTIFY(self, request): # pylint: disable=invalid-name
"""Serve a ``NOTIFY`` request.
A ``NOTIFY`` request will be sent by a Sonos device when a state
variable changes. See the `UPnP Spec §4.3 [pdf]
-DeviceArchitecture-v1.1.pdf>`_ for details.
timestamp = time.time()
seq = request.requestHeaders.getRawHeaders('Seq')[0] # Event sequence number
sid = request.requestHeaders.getRawHeaders('Sid')[0] # Event Subscription Identifier
content =
# find the relevant subscription from the sid
if sid in _sid_to_subscription.keys():
subscription = _sid_to_subscription[sid]
service = subscription.service
"Event %s received for %s service at %s", seq,
service.service_id, timestamp)
log.debug("Event content: %s", content)
variables = parse_event_xml(content)
# Build the Event object
event = Event(sid, seq, service, timestamp, variables)
# pass the event details on to the service so it can update its cache.
# pylint: disable=protected-access
# Pass the event to the subscription for handling
return 'OK'
def log_message(self, fmt, *args):
# Divert standard webserver logging to the debug log
log.debug(fmt, *args)
class EventListener(object):
"""The Event Listener.
Runs an http server which is an endpoint for ``NOTIFY``
requests from Sonos devices.
def __init__(self):
super(EventListener, self).__init__()
#: `bool`: Indicates whether the server is currently running
self.is_running = False
#: `tuple`: The address (ip, port) on which the server is
#: configured to listen.
# Empty for the moment. (It is set in `start`)
self.address = ()
def start(self, any_zone):
"""Start the event listener listening on the local machine at port 1400
Make sure that your firewall allows connections to this port
any_zone (SoCo): Any Sonos device on the network. It does not
matter which device. It is used only to find a local IP address
reachable by the Sonos net.
The port on which the event listener attempts to listen is configurable.
See `config.EVENT_LISTENER_PORT`. If this port is unavailable, the event
listener will attempt to listen on the next available port, within a
range of 100 from config.EVENT_LISTENER_PORT.
self.is_running = True
factory = Site(EventNotifyHandler())
for port in range(config.EVENT_LISTENER_PORT, config.EVENT_LISTENER_PORT + 100):
self.port = reactor.listenTCP(port, factory)"Event listener started")
# Find our local network IP address which is accessible to the
# Sonos net, see
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
temp_sock.connect((any_zone.ip_address, port))
ip_address = temp_sock.getsockname()[0]
self.address = (ip_address, port)"Event listener running on %s", self.address)
except twisted.internet.error.CannotListenError, e:
print "Port %d is busy: %s" % (port, e)
def stop(self):
"""Stop the event listener."""
if not self.is_running:
self.is_running = False
port, self.port = self.port, None
port.stopListening()"Event listener stopped")
class Subscription(object):
"""A class representing the subscription to a UPnP event."""
# pylint: disable=too-many-instance-attributes
def __init__(self, service, event_queue=None):
service (Service): The SoCo `Service` to which the subscription
should be made.
event_queue (:class:`~queue.Queue`): A queue on which received
events will be put. If not specified, a queue will be
created and used.
super(Subscription, self).__init__()
self.service = service
#: `str`: A unique ID for this subscription
self.sid = None
#: `int`: The amount of time in seconds until the subscription expires.
self.timeout = None
#: `bool`: An indication of whether the subscription is subscribed.
self.is_subscribed = False
#: :class:`~queue.Queue`: The queue on which events are placed. = Queue() if event_queue is None else event_queue
#: `int`: The period (seconds) for which the subscription is requested
self.requested_timeout = None
# A flag to make sure that an unsubscribed instance is not
# resubscribed
self._has_been_unsubscribed = False
# The time when the subscription was made
self._timestamp = None
# Used to keep track of the auto_renew loop
self._auto_renew_loop = None
# A callback function to be called whenever an event is received.
# If self.callback is set and is callable, the events queue ( won't be used.
# The callback function will be called with an Event object as the only argument.
self.callback = None
def subscribe(self, requested_timeout=None, auto_renew=False):
"""Subscribe to the service.
If requested_timeout is provided, a subscription valid for that number
of seconds will be requested, but not guaranteed. Check
`timeout` on return to find out what period of validity is
actually allocated.
SoCo will try to unsubscribe any subscriptions which are still
subscribed on program termination, but it is good practice for
you to clean up by making sure that you call :meth:`unsubscribe`
requested_timeout(int, optional): The timeout to be requested.
auto_renew (bool, optional): If `True`, renew the subscription
automatically shortly before timeout. Default `False`.
# TIMEOUT is provided for in the UPnP spec, but it is not clear if
# Sonos pays any attention to it. A timeout of 86400 secs always seems
# to be allocated
self.requested_timeout = requested_timeout
if self._has_been_unsubscribed:
raise SoCoException(
'Cannot resubscribe instance once unsubscribed')
service = self.service
# The event listener must be running, so start it if not
if not event_listener.is_running:
# an event subscription looks like this:
# SUBSCRIBE publisher path HTTP/1.1
# HOST: publisher host:publisher port
# CALLBACK: <delivery URL>
# NT: upnp:event
# TIMEOUT: Second-requested subscription duration (optional)
# pylint: disable=unbalanced-tuple-unpacking
ip_address, port = event_listener.address
headers = {
'Callback': ['<http://{0}:{1}>'.format(ip_address, port)],
'NT': ['upnp:event']
if requested_timeout is not None:
headers["TIMEOUT"] = ["Second-{0}".format(requested_timeout)]
d = httpRequest(
'SUBSCRIBE', service.base_url + service.event_subscription_url,
def process(response):
self.sid = response.headers.getRawHeaders('Sid')[0]
timeout = response.headers.getRawHeaders('Timeout')[0]
# According to the spec, timeout can be "infinite" or "second-123"
# where 123 is a number of seconds. Sonos uses "Second-123" (with a
# capital letter)
if timeout.lower() == 'infinite':
self.timeout = None
self.timeout = int(timeout.lstrip('Second-'))
self._timestamp = time.time()
self.is_subscribed = True
"Subscribed to %s, sid: %s",
service.base_url + service.event_subscription_url, self.sid)
# Add the sid to the sid to subscription mapping so it can be looked up
# by sid
_sid_to_subscription[self.sid] = self
# Register this subscription to be unsubscribed at exit if still alive
reactor.addSystemEventTrigger('before', 'shutdown', self.unsubscribe)
# Set up auto_renew
if not auto_renew:
# Autorenew just before expiry, say at 85% of self.timeout seconds
interval = self.timeout * 85 / 100
self._auto_renew_loop = task.LoopingCall(self.renew)
return d
def renew(self, requested_timeout=None):
"""Renew the event subscription.
You should not try to renew a subscription which has been
unsubscribed, or once it has expired.
requested_timeout (int, optional): The period for which a renewal
request should be made. If None (the default), use the timeout
requested on subscription.
""""Autorenewing subscription %s", self.sid)
if self._has_been_unsubscribed:
raise SoCoException(
'Cannot renew subscription once unsubscribed')
if not self.is_subscribed:
raise SoCoException(
'Cannot renew subscription before subscribing')
if self.time_left == 0:
raise SoCoException(
'Cannot renew subscription after expiry')
# SUBSCRIBE publisher path HTTP/1.1
# HOST: publisher host:publisher port
# SID: uuid:subscription UUID
# TIMEOUT: Second-requested subscription duration (optional)
headers = {
'SID': [self.sid]
if requested_timeout is None:
requested_timeout = self.requested_timeout
if requested_timeout is not None:
headers["TIMEOUT"] = ["Second-{0}".format(requested_timeout)]
d = httpRequest(
'SUBSCRIBE', self.service.base_url + self.service.event_subscription_url,
def process(response):
timeout = response.headers.getRawHeaders('Timeout')[0]
# According to the spec, timeout can be "infinite" or "second-123"
# where 123 is a number of seconds. Sonos uses "Second-123" (with a
# a capital letter)
if timeout.lower() == 'infinite':
self.timeout = None
self.timeout = int(timeout.lstrip('Second-'))
self._timestamp = time.time()
self.is_subscribed = True
"Renewed subscription to %s, sid: %s",
self.service.base_url + self.service.event_subscription_url,
return d
def unsubscribe(self):
"""Unsubscribe from the service's events.
Once unsubscribed, a Subscription instance should not be reused
# Trying to unsubscribe if already unsubscribed, or not yet
# subscribed, fails silently
if self._has_been_unsubscribed or not self.is_subscribed:
self.is_subscribed = False
# Cancel any auto renew
if self._auto_renew_loop:
self._auto_renew_loop = None
self._timestamp = None = None
self.callback = None
# remove queue from sid to subscription mapping
del _sid_to_subscription[self.sid]
except KeyError:
self._has_been_unsubscribed = True
"Unsubscribed from %s, sid: %s",
self.service.base_url + self.service.event_subscription_url,
return self._cancel_subscription()
def _cancel_subscription(self):
# Send an unsubscribe request like this:
# UNSUBSCRIBE publisher path HTTP/1.1
# HOST: publisher host:publisher port
# SID: uuid:subscription UUID
headers = {
'SID': [self.sid]
d = httpRequest(
'UNSUBSCRIBE', self.service.base_url + self.service.event_subscription_url,
# If the network is down, keep trying every minute
def success(result):
self.service = None
self.sid = None
def error(e):
if e.check(twisted.internet.error.ConnectError):
task.deferLater(reactor, 60, self._cancel_subscription)
return d
def notify(self, event):
if self._has_been_unsubscribed:
if self.callback and hasattr(self.callback, '__call__'):
def time_left(self):
`int`: The amount of time left until the subscription expires (seconds)
If the subscription is unsubscribed (or not yet subscribed),
`time_left` is 0.
if self._timestamp is None:
return 0
time_left = self.timeout - (time.time() - self._timestamp)
return time_left if time_left > 0 else 0
# pylint: disable=C0103
event_listener = EventListener()
# Used to store a mapping of sids to subsciption instances
_sid_to_subscription = {}
# -*- coding: utf-8 -*-
from collections import deque
from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import Agent, BrowserLikeRedirectAgent
from twisted.web.http_headers import Headers
from twisted.internet.error import ConnectError
class Serialiser():
def __init__(self):
self.queue = deque([])
def serialise(self, fn, *args, **kwargs):
def execute(result, fn, *args, **kwargs):
return fn(*args, **kwargs)
def next(result):
return result
d = defer.Deferred()
d.addCallback(execute, fn, *args, **kwargs)
if len(self.queue) == 1:
return d
def callnext(self):
if len(self.queue):
d = self.queue[0]
serialise = Serialiser().serialise
def serialised(fn):
def inner(self, *args, **kwargs):
return serialise(fn, self, *args, **kwargs)
inner.fn = fn
return inner
def httpRequest(method, url, headers={}):
agent = BrowserLikeRedirectAgent(Agent(reactor))
d = agent.request(
return d
def printError(e):
if e.check(ConnectError):
print 'Connection error:', e.value
print '------------------------error.printError: traceback------------------------'
print e.getTraceback()
print '---------------------------------------------------------------------------'
