Skip to content

Instantly share code, notes, and snippets.

@mzizzi
Created June 16, 2015 20:24
Show Gist options
  • Save mzizzi/7e918468610acbf3c629 to your computer and use it in GitHub Desktop.
Save mzizzi/7e918468610acbf3c629 to your computer and use it in GitHub Desktop.
import asyncio
import threading
import time
class MyBlockingSubscription(object):
def __init__(self, handler):
super().__init__()
self.handler = handler
def subscribe(self):
for i in self.gen():
self.handler.handle(i)
def gen(self):
counter = 0
while True:
yield counter
counter += 1
time.sleep(1)
class MyHandlerThread(threading.Thread):
def __init__(self):
super().__init__()
self.loop = None
self.started = threading.Event()
self.stopped = threading.Event()
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.started.set()
self.loop.run_forever()
self.stopped.set()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def handle(self, data):
self.loop.call_soon_threadsafe(
asyncio.async,
self._handle(data))
@asyncio.coroutine
def _handle(self, data):
print('enter %s' % data)
yield from asyncio.sleep(2)
print('exit %s' % data)
if __name__ == '__main__':
handler = MyHandlerThread()
handler.start()
handler.started.wait()
MyBlockingSubscription(handler).subscribe()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment