Skip to content

Instantly share code, notes, and snippets.

@oxyflour
Created January 31, 2016 08:31
Show Gist options
  • Save oxyflour/56ec9f888f37c5e56893 to your computer and use it in GitHub Desktop.
Save oxyflour/56ec9f888f37c5e56893 to your computer and use it in GitHub Desktop.
simple upnp python interface
import xml.dom.minidom
def node_to_obj(node):
obj = {}
if node.nodeType == 1:
if node.attributes.length > 0:
obj["#attr"] = {}
for i in range(0, node.attributes.length):
attr = node.attributes.item(i)
obj["#attr"][attr.name] = attr.value
elif node.nodeType == 3:
obj = node.nodeValue
if node.hasChildNodes():
for i in range(0, node.childNodes.length):
child = node.childNodes.item(i)
name = child.nodeName
if not obj.has_key(name):
obj[name] = node_to_obj(child)
else:
if not type(obj[name]) == type([]):
obj[name] = [obj[name]]
obj[name].append(node_to_obj(child))
if len(obj) == 1 and obj.has_key("#text"):
obj = obj["#text"]
if not len(obj):
obj = ""
return obj
def xml_to_obj(text):
dom = xml.dom.minidom.parseString(text)
return node_to_obj(dom.documentElement);
def soap_encode(type, name, params):
body = ""
for key in params.keys():
param = "<%s>%s</%s>" % (key, params[key], key)
body = body + param.encode("UTF-8")
return '<?xml version="1.0" encoding="UTF-8"?><s:Envelope s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Body><u:%s xmlns:u="%s">%s</u:%s></s:Body></s:Envelope>' % (name, type, body, name)
def soap_decode(text):
data = xml_to_obj(text)
return data.get("s:Body")
import httplib, urlparse
import SOAPle
type_map = {\
"A_ARG_TYPE_ObjectID":"string r", \
"A_ARG_TYPE_Result":"xml r", \
"A_ARG_TYPE_SearchCriteria":"string o", \
"A_ARG_TYPE_BrowseFlag":"BrowseDirectChildren,BrowseMetadata r", \
"A_ARG_TYPE_Filter":"CSVstring r", \
"A_ARG_TYPE_SortCriteria":"CSVstring r", \
"A_ARG_TYPE_Index":"ui4 r", \
"A_ARG_TYPE_Count":"ui4 r", \
"A_ARG_TYPE_UpdateID":"ui4 r" \
}
def type_convert(type, value):
info = type_map.get(type)
if not info:
return str(value)
st = info.split(" ")
type, req = st[0], st[1]
if type == "string" or type == "CSVstring":
if not value == None:
return str(value)
elif req == "r":
return ""
elif type == "ui4":
if not value == None:
return int(value)
elif req == "r":
return 0
elif type == "xml":
if not value == None:
try:
return SOAPle.xml_to_obj(value.encode("UTF-8"))
except:
return str(value)
elif req == "r":
return {}
else:
st = type.split(",")
for i in range(0, len(st)):
if value == st[i]:
return value
if req == "r":
return st[0]
def http_fetch(host, port, url, method="GET", header={}, body=None):
conn = httplib.HTTPConnection(host, port)
conn.request(method, url, body, header)
resp = conn.getresponse()
text = resp.read()
conn.close()
if (resp.status != 200):
raise httplib.HTTPException, "server returned " + str(resp.status)
else:
return text
service_map = {\
}
class serviceMeta(type):
def __init__(cls, name, bases, attrs):
if attrs.has_key("service_type"):
service_map[attrs["service_type"]] = cls
return type.__init__(cls, name, bases, attrs)
@staticmethod
def get(host, port, path, type, control):
if service_map.has_key(type):
return service_map[type](host, port, path, type, control)
return service(host, port, path, type, control)
class service:
__metaclass__ = serviceMeta
def __init__(self, host, port, path, type, control):
self.host, self.port, self.path, self.type, self.control = host, port, path, type, control
text = http_fetch(host, port, path)
self.desp = SOAPle.xml_to_obj(text)
def call(self, method=None, params={}):
result = []
actions = self.desp["actionList"]["action"]
if not type(actions) == type([]):
actions = [actions]
for i in range(0, len(actions)):
result.append(actions[i]["name"])
if actions[i]["name"] == method:
argdef = actions[i]["argumentList"]["argument"];
if not type(argdef) == type([]):
argdef = [argdef]
data = self.execute(method, argdef, params)
return self.process(method, argdef, data["u:"+method+"Response"])
if method == None:
return result
else:
raise Error, "no such method!"
def execute(self, method, argdef, params):
args = {}
for i in range(0, len(argdef)):
name = argdef[i]["name"]
if argdef[i]["direction"] == "in":
value = self.param_convert(argdef[i]["relatedStateVariable"], name, params.get(name))
if not value == None:
args[name] = value
header = {"Content-Type":"text/xml; charset=\"utf-8\"", \
"SOAPAction":self.type+"#"+method, \
"Connection":"close", \
"User-Agent":"chromeframe", \
"Host":self.host+":"+str(self.port)}
body = SOAPle.soap_encode(self.type, method, args)
text = http_fetch(self.host, self.port, self.control, "POST", header, body)
return SOAPle.soap_decode(text)
def process(self, method, argdef, data):
result = {}
for i in range(0, len(argdef)):
name = argdef[i]["name"]
if argdef[i]["direction"] == "out":
value = self.param_convert(argdef[i]["relatedStateVariable"], name, data[name])
result[name] = value
return result
def param_convert(self, type, name, value):
return type_convert(type, value)
class device:
def __init__(self, url_or_host):
if len(url_or_host) == 3:
self.host, self.port, self.path = url_or_host[0], url_or_host[1], url_or_host[2]
else:
param = urlparse.urlparse(url_or_host)
net = param.netloc.split(":")
self.host, self.port, self.path = net[0], int(net[1]) if len(net) > 1 else 80, param.path
text = http_fetch(self.host, self.port, self.path)
self.desp = SOAPle.xml_to_obj(text)
def getService(self, id=None):
result = []
services = self.desp["device"]["serviceList"]["service"]
if not type(services) == type([]):
services = [services]
for i in range(0, len(services)):
result.append(services[i]["serviceId"])
if services[i]["serviceId"] == id:
return serviceMeta.get(self.host, self.port, services[i]["SCPDURL"],\
services[i]["serviceType"], services[i]["controlURL"]);
if id == None:
return result
else:
raise Error, "no such service!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment