Skip to content

Instantly share code, notes, and snippets.

@osiloke
Created July 25, 2015 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save osiloke/f4f578e299bfc52a8083 to your computer and use it in GitHub Desktop.
Save osiloke/f4f578e299bfc52a8083 to your computer and use it in GitHub Desktop.
from collections import deque
import json
from kivy import Logger
import threading
from kivy.clock import Clock
from kivy.network.urlrequest import UrlRequest
from kivy.support import install_twisted_reactor
# from memory_profiler import profile
import zmq
from kivy.compat import queue
from threading import Thread
logger = Logger.getChild("Signage Messenger")
class MessengerBase(object):
def __init__(self):
self._loading_image = None
self._error_image = None
self._num_workers = 2
self._max_upload_per_frame = 2
self._paused = False
self._resume_cond = threading.Condition()
self._send_cond = threading.Condition()
self._q_send = deque()
self._q_done = deque()
self._client = []
self._running = False
self._start_wanted = False
self._trigger_update = Clock.create_trigger(self._update)
self.create_connection()
def create_connection(self):
self.zmq = zmq.Context()
self.sender = self.zmq.socket(zmq.REQ)
self.sender.connect("tcp://localhost:%s" % 5666)
def __del__(self):
try:
Clock.unschedule(self._update)
except Exception:
pass
def _set_num_workers(self, num):
if num < 2:
raise Exception('Must have at least 2 workers')
self._num_workers = num
def _get_num_workers(self):
return self._num_workers
num_workers = property(_get_num_workers, _set_num_workers)
def start(self):
'''Start the loader thread/process.'''
self._running = True
def run(self, *largs):
'''Main loop for the loader.'''
pass
def stop(self):
'''Stop the loader thread/process.'''
self._running = False
def pause(self):
'''Pause the loader, can be useful during interactions.
.. versionadded:: 1.6.0
'''
self._paused = True
def resume(self):
'''Resume the loader, after a :meth:`pause`.
.. versionadded:: 1.6.0
'''
self._paused = False
self._resume_cond.acquire()
self._resume_cond.notify_all()
self._resume_cond.release()
def _wait_for_resume(self):
while self._running and self._paused:
self._resume_cond.acquire()
self._resume_cond.wait(0.25)
self._resume_cond.release()
def _update(self, *largs):
if self._start_wanted:
if not self._running:
self.start()
self._start_wanted = False
# in pause mode, don't unqueue anything.
if self._paused:
self._trigger_update()
return
def send(self, msg, send_callback=None, post_callback=None,
**kwargs):
self._q_send.appendleft({
'msg': msg,
'send_callback': send_callback,
'post_callback': post_callback,
'kwargs': kwargs})
self._start_wanted = True
self._trigger_update()
# @profile
def _send_msg(self, kwargs):
self._send_cond.acquire()
msg = kwargs.get("msg")
load_callback = kwargs.get('load_callback')
post_callback = kwargs.get('post_callback')
if isinstance(msg, dict):
self.sender.send_json(msg)
else:
self.sender.send(msg)
try:
data = self.sender.recv_json()
except Exception as e:
Logger.info("Messenger: Invalid data received, %s" % str(e))
else:
if "error" in data:
logger.info("Messenger: last message sent <%s> %s" % (msg, data["msg"]))
if post_callback:
data = post_callback(data)
self._trigger_update()
self._send_cond.release()
class _Worker(Thread):
'''Thread executing tasks from a given tasks queue
'''
def __init__(self, pool, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.pool = pool
self.start()
def run(self):
while self.pool.running:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
# print(e)
pass
self.tasks.task_done()
class _ThreadPool(object):
'''Pool of threads consuming tasks from a queue
'''
def __init__(self, num_threads):
super(_ThreadPool, self).__init__()
self.running = True
self.tasks = queue.Queue()
for _ in range(num_threads):
_Worker(self, self.tasks)
def add_task(self, func, *args, **kargs):
'''Add a task to the queue
'''
self.tasks.put((func, args, kargs))
def stop(self):
self.running = False
self.tasks.join()
class ZMQMessengerThreadPool(MessengerBase):
def __init__(self):
super(ZMQMessengerThreadPool, self).__init__()
self.pool = None
def start(self):
super(ZMQMessengerThreadPool, self).start()
self.pool = _ThreadPool(self._num_workers)
Clock.schedule_interval(self.run, 0)
def stop(self):
super(ZMQMessengerThreadPool, self).stop()
Clock.unschedule(self.run)
self.pool.stop()
def run(self, *largs):
while self._running:
try:
parameters = self._q_send.pop()
# print parameters
except:
return
self.pool.add_task(self._send_msg, parameters)
class HTTPMessengerThreadPool(MessengerBase):
def __init__(self):
super(HTTPMessengerThreadPool, self).__init__()
self.pool = None
self.device_key = None
self.push_url = None
self.pull_url = None
def create_connection(self):
pass
def start(self):
super(HTTPMessengerThreadPool, self).start()
self.pool = _ThreadPool(self._num_workers)
Clock.schedule_interval(self.run, 0)
def stop(self):
super(HTTPMessengerThreadPool, self).stop()
Clock.unschedule(self.run)
self.pool.stop()
def run(self, *largs):
while self._running:
try:
parameters = self._q_send.pop()
# print parameters
except Exception:
return
self.pool.add_task(self._send_msg, parameters)
def _send_msg(self, kwargs):
self._send_cond.acquire()
msg = kwargs.get("msg")
load_callback = kwargs.get('load_callback')
post_callback = kwargs.get('post_callback')
# if isinstance(msg, dict):
msg["from_id"] = self.device_key
_kwargs = kwargs.get("kwargs")
url = _kwargs.get('url', self.push_url)
method = kwargs.get('method', 'PUT')
headers = {
'Content-type': 'application/json',
'Accept': 'text/json',
'HTTP-DEVICE-KEY': self.device_key,
}
def data_pushed(req, result):
Logger.debug("HTTP Messenger: Request to %s was successful" % url)
Logger.debug("HTTP Messenger: Request data was %s" % str(msg))
Logger.debug("HTTP Messenger: Request result was %s" % str(result))
# print result
if post_callback:
data = post_callback(json.loads(result))
self._trigger_update()
def error(req, error):
Logger.error("HTTP Messenger: An error occured while submitting request, with error %s" % error)
Logger.error("HTTP Messenger: Server responded with headers %s " % req.resp_headers)
self._trigger_update()
# def failure(req, result):
# Logger.error("HTTP Messenger: Request failed to be submitted")
# Logger.error("HTTP Messenger: Server responded with headers %s " % req.resp_headers)
# self._trigger_update()
req = UrlRequest(url,
# debug=True,
on_success=data_pushed,
on_error=error,
# on_failure=failure,
req_headers=headers,
req_body=json.dumps(msg),
method=method)
# req.wait()
self._send_cond.release()
HTTPMessenger = HTTPMessengerThreadPool()
ZMQMessenger = ZMQMessengerThreadPool()
Logger.info('Loader: using a thread pool of {} workers'.format(
ZMQMessengerThreadPool.num_workers))
if __name__ == '__main__':
from kivy.app import App
from kivy.uix.boxlayout import BoxLayout
class MessengerApp(App):
def build(self):
def post_callback(data):
print data
ZMQMessenger.send("Hello", post_callback=post_callback)
Clock.schedule_once(lambda dt: ZMQMessenger.send("Scheduled message"), 5.0)
return BoxLayout()
# Window.fullscreen = True
MessengerApp().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment