Created
July 25, 2015 16:42
-
-
Save osiloke/f4f578e299bfc52a8083 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
import json | |
from kivy import Logger | |
import threading | |
from kivy.clock import Clock | |
from kivy.network.urlrequest import UrlRequest | |
from kivy.support import install_twisted_reactor | |
# from memory_profiler import profile | |
import zmq | |
from kivy.compat import queue | |
from threading import Thread | |
logger = Logger.getChild("Signage Messenger") | |
class MessengerBase(object): | |
def __init__(self): | |
self._loading_image = None | |
self._error_image = None | |
self._num_workers = 2 | |
self._max_upload_per_frame = 2 | |
self._paused = False | |
self._resume_cond = threading.Condition() | |
self._send_cond = threading.Condition() | |
self._q_send = deque() | |
self._q_done = deque() | |
self._client = [] | |
self._running = False | |
self._start_wanted = False | |
self._trigger_update = Clock.create_trigger(self._update) | |
self.create_connection() | |
def create_connection(self): | |
self.zmq = zmq.Context() | |
self.sender = self.zmq.socket(zmq.REQ) | |
self.sender.connect("tcp://localhost:%s" % 5666) | |
def __del__(self): | |
try: | |
Clock.unschedule(self._update) | |
except Exception: | |
pass | |
def _set_num_workers(self, num): | |
if num < 2: | |
raise Exception('Must have at least 2 workers') | |
self._num_workers = num | |
def _get_num_workers(self): | |
return self._num_workers | |
num_workers = property(_get_num_workers, _set_num_workers) | |
def start(self): | |
'''Start the loader thread/process.''' | |
self._running = True | |
def run(self, *largs): | |
'''Main loop for the loader.''' | |
pass | |
def stop(self): | |
'''Stop the loader thread/process.''' | |
self._running = False | |
def pause(self): | |
'''Pause the loader, can be useful during interactions. | |
.. versionadded:: 1.6.0 | |
''' | |
self._paused = True | |
def resume(self): | |
'''Resume the loader, after a :meth:`pause`. | |
.. versionadded:: 1.6.0 | |
''' | |
self._paused = False | |
self._resume_cond.acquire() | |
self._resume_cond.notify_all() | |
self._resume_cond.release() | |
def _wait_for_resume(self): | |
while self._running and self._paused: | |
self._resume_cond.acquire() | |
self._resume_cond.wait(0.25) | |
self._resume_cond.release() | |
def _update(self, *largs): | |
if self._start_wanted: | |
if not self._running: | |
self.start() | |
self._start_wanted = False | |
# in pause mode, don't unqueue anything. | |
if self._paused: | |
self._trigger_update() | |
return | |
def send(self, msg, send_callback=None, post_callback=None, | |
**kwargs): | |
self._q_send.appendleft({ | |
'msg': msg, | |
'send_callback': send_callback, | |
'post_callback': post_callback, | |
'kwargs': kwargs}) | |
self._start_wanted = True | |
self._trigger_update() | |
# @profile | |
def _send_msg(self, kwargs): | |
self._send_cond.acquire() | |
msg = kwargs.get("msg") | |
load_callback = kwargs.get('load_callback') | |
post_callback = kwargs.get('post_callback') | |
if isinstance(msg, dict): | |
self.sender.send_json(msg) | |
else: | |
self.sender.send(msg) | |
try: | |
data = self.sender.recv_json() | |
except Exception as e: | |
Logger.info("Messenger: Invalid data received, %s" % str(e)) | |
else: | |
if "error" in data: | |
logger.info("Messenger: last message sent <%s> %s" % (msg, data["msg"])) | |
if post_callback: | |
data = post_callback(data) | |
self._trigger_update() | |
self._send_cond.release() | |
class _Worker(Thread): | |
'''Thread executing tasks from a given tasks queue | |
''' | |
def __init__(self, pool, tasks): | |
Thread.__init__(self) | |
self.tasks = tasks | |
self.daemon = True | |
self.pool = pool | |
self.start() | |
def run(self): | |
while self.pool.running: | |
func, args, kargs = self.tasks.get() | |
try: | |
func(*args, **kargs) | |
except Exception as e: | |
# print(e) | |
pass | |
self.tasks.task_done() | |
class _ThreadPool(object): | |
'''Pool of threads consuming tasks from a queue | |
''' | |
def __init__(self, num_threads): | |
super(_ThreadPool, self).__init__() | |
self.running = True | |
self.tasks = queue.Queue() | |
for _ in range(num_threads): | |
_Worker(self, self.tasks) | |
def add_task(self, func, *args, **kargs): | |
'''Add a task to the queue | |
''' | |
self.tasks.put((func, args, kargs)) | |
def stop(self): | |
self.running = False | |
self.tasks.join() | |
class ZMQMessengerThreadPool(MessengerBase): | |
def __init__(self): | |
super(ZMQMessengerThreadPool, self).__init__() | |
self.pool = None | |
def start(self): | |
super(ZMQMessengerThreadPool, self).start() | |
self.pool = _ThreadPool(self._num_workers) | |
Clock.schedule_interval(self.run, 0) | |
def stop(self): | |
super(ZMQMessengerThreadPool, self).stop() | |
Clock.unschedule(self.run) | |
self.pool.stop() | |
def run(self, *largs): | |
while self._running: | |
try: | |
parameters = self._q_send.pop() | |
# print parameters | |
except: | |
return | |
self.pool.add_task(self._send_msg, parameters) | |
class HTTPMessengerThreadPool(MessengerBase): | |
def __init__(self): | |
super(HTTPMessengerThreadPool, self).__init__() | |
self.pool = None | |
self.device_key = None | |
self.push_url = None | |
self.pull_url = None | |
def create_connection(self): | |
pass | |
def start(self): | |
super(HTTPMessengerThreadPool, self).start() | |
self.pool = _ThreadPool(self._num_workers) | |
Clock.schedule_interval(self.run, 0) | |
def stop(self): | |
super(HTTPMessengerThreadPool, self).stop() | |
Clock.unschedule(self.run) | |
self.pool.stop() | |
def run(self, *largs): | |
while self._running: | |
try: | |
parameters = self._q_send.pop() | |
# print parameters | |
except Exception: | |
return | |
self.pool.add_task(self._send_msg, parameters) | |
def _send_msg(self, kwargs): | |
self._send_cond.acquire() | |
msg = kwargs.get("msg") | |
load_callback = kwargs.get('load_callback') | |
post_callback = kwargs.get('post_callback') | |
# if isinstance(msg, dict): | |
msg["from_id"] = self.device_key | |
_kwargs = kwargs.get("kwargs") | |
url = _kwargs.get('url', self.push_url) | |
method = kwargs.get('method', 'PUT') | |
headers = { | |
'Content-type': 'application/json', | |
'Accept': 'text/json', | |
'HTTP-DEVICE-KEY': self.device_key, | |
} | |
def data_pushed(req, result): | |
Logger.debug("HTTP Messenger: Request to %s was successful" % url) | |
Logger.debug("HTTP Messenger: Request data was %s" % str(msg)) | |
Logger.debug("HTTP Messenger: Request result was %s" % str(result)) | |
# print result | |
if post_callback: | |
data = post_callback(json.loads(result)) | |
self._trigger_update() | |
def error(req, error): | |
Logger.error("HTTP Messenger: An error occured while submitting request, with error %s" % error) | |
Logger.error("HTTP Messenger: Server responded with headers %s " % req.resp_headers) | |
self._trigger_update() | |
# def failure(req, result): | |
# Logger.error("HTTP Messenger: Request failed to be submitted") | |
# Logger.error("HTTP Messenger: Server responded with headers %s " % req.resp_headers) | |
# self._trigger_update() | |
req = UrlRequest(url, | |
# debug=True, | |
on_success=data_pushed, | |
on_error=error, | |
# on_failure=failure, | |
req_headers=headers, | |
req_body=json.dumps(msg), | |
method=method) | |
# req.wait() | |
self._send_cond.release() | |
HTTPMessenger = HTTPMessengerThreadPool() | |
ZMQMessenger = ZMQMessengerThreadPool() | |
Logger.info('Loader: using a thread pool of {} workers'.format( | |
ZMQMessengerThreadPool.num_workers)) | |
if __name__ == '__main__': | |
from kivy.app import App | |
from kivy.uix.boxlayout import BoxLayout | |
class MessengerApp(App): | |
def build(self): | |
def post_callback(data): | |
print data | |
ZMQMessenger.send("Hello", post_callback=post_callback) | |
Clock.schedule_once(lambda dt: ZMQMessenger.send("Scheduled message"), 5.0) | |
return BoxLayout() | |
# Window.fullscreen = True | |
MessengerApp().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment