Skip to content

Instantly share code, notes, and snippets.

@danielperna84
Created May 22, 2020 20:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielperna84/28653b1ebf760064cdbecf5f4828f5f3 to your computer and use it in GitHub Desktop.
Save danielperna84/28653b1ebf760064cdbecf5f4828f5f3 to your computer and use it in GitHub Desktop.
import time
import logging
import threading
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCRequestHandler
import xmlrpc.client
LOG = logging.getLogger(__name__)
#### Configuration for CCU ####
CCU_IP = '127.0.0.1'
CCU_PORT = 2010
#### Configuration for local server ####
LOCAL_IP = '127.0.0.1'
LOCAL_PORT = 32001
INTERFACE_ID = 'testclient'
#### Device storage ####
DEVICES = []
#### XML-RPC Proxy with thread-locking support ####
class LockingServerProxy(xmlrpc.client.ServerProxy):
"""
ServerProxy implementation with lock when request is executing
"""
def __init__(self, *args, **kwargs):
"""
Initialize new proxy for server
"""
self.lock = threading.Lock()
xmlrpc.client.ServerProxy.__init__(self, encoding="ISO-8859-1", *args, **kwargs)
def __request(self, *args, **kwargs):
"""
Call method on server side
"""
with self.lock:
parent = xmlrpc.client.ServerProxy
# pylint: disable=E1101
return parent._ServerProxy__request(self, *args, **kwargs)
def __getattr__(self, *args, **kwargs):
"""
Magic method dispatcher
"""
return xmlrpc.client._Method(self.__request, *args, **kwargs)
####### The Server the CCU will communicate with #######
class RPCFunctions():
def __init__(self):
self.devices = DEVICES
def error(self, interface_id, errorcode, msg):
"""
When some error occurs the CCU will send its error message here.
"""
LOG.warning("RPCFunctions.error: interface_id = %s, errorcode = %i, message = %s",
interface_id, int(errorcode), str(msg))
return True
def event(self, interface_id, address, value_key, value):
"""
If a device emits some sort event, we will handle it here.
"""
LOG.info("RPCFunctions.event: interface_id = %s, address = %s, value_key = %s, value = %s" % (
interface_id, address, value_key.upper(), str(value)))
return True
def listDevices(self, interface_id):
"""
The CCU asks for devices known to our XML-RPC server.
We respond to that request using this method.
"""
LOG.info("RPCFunctions.listDevices: interface_id = %s, _devices_raw = %s" % (
interface_id, str(self.devices)))
return DEVICES
def newDevices(self, interface_id, dev_descriptions):
"""
The CCU informs us about newly added devices.
We react on that and add those devices as well.
"""
global DEVICES
LOG.info("RPCFunctions.newDevices: interface_id = %s, dev_descriptions = %s" % (
interface_id, str(dev_descriptions)))
for dd in dev_descriptions:
DEVICES.append(dd)
return True
class RequestHandler(SimpleXMLRPCRequestHandler):
"""We handle requests to / and /RPC2"""
rpc_paths = ('/', '/RPC2',)
class ServerThread(threading.Thread):
"""XML-RPC server thread to handle messages from CCU"""
def __init__(self,
local_ip=LOCAL_IP,
local_port=LOCAL_PORT,
ccu_ip=CCU_IP,
ccu_port=CCU_PORT,
interface_id=INTERFACE_ID):
LOG.debug("ServerThread.__init__")
threading.Thread.__init__(self)
self._local_ip = local_ip
self._local_port = local_port
self._interface_id = interface_id
self._ccu_ip = ccu_ip
self._ccu_port = int(ccu_port)
self._proxy = LockingServerProxy(
"http://%s:%i" % (self._ccu_ip, self._ccu_port))
self._rpcfunctions = RPCFunctions()
LOG.debug("ServerThread.__init__: Setting up server")
self.server = SimpleXMLRPCServer((self._local_ip, self._local_port),
requestHandler=RequestHandler,
logRequests=False)
self.server.register_introspection_functions()
self.server.register_multicall_functions()
LOG.debug("ServerThread.__init__: Registering RPC functions")
self.server.register_instance(
self._rpcfunctions, allow_dotted_names=True)
def proxyInit(self):
"""
Send init to receive events.
"""
LOG.debug("Init")
self._proxy.init("http://%s:%i" % (self._local_ip, self._local_port), self._interface_id)
def proxyDeInit(self):
"""
De-Init from CCU.
"""
LOG.debug("De-Init")
self._proxy.init(
"http://%s:%i" % (self._local_ip, self._local_port))
def run(self):
LOG.info("Starting server at http://%s:%i" % (self._local_ip, self._local_port))
self.server.serve_forever()
def stop(self):
"""
To stop the server we de-init from the CCU,
then shut down our XML-RPC server.
"""
self.proxyDeInit()
LOG.info("Shutting down server")
self.server.shutdown()
LOG.debug("ServerThread.stop: Stopping ServerThread")
self.server.server_close()
LOG.info("HomeMatic XML-RPC Server stopped")
def clearDevices():
"""
Clear device storage.
"""
global DEVICES
DEVICES.clear()
def demo():
"""
Run example server and try to initialize. Return the Server-object.
"""
s = ServerThread()
s.start()
s.proxyInit()
return s
#### Demo for the bug ####
logging.basicConfig(level=logging.DEBUG)
s = demo()
time.sleep(5)
s.proxyInit()
time.sleep(5)
s.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment