Skip to content

Instantly share code, notes, and snippets.

@apatrushev
Last active July 27, 2018 20:07
Show Gist options
  • Save apatrushev/94ae5ce486adf2d6126c9a969eb2d520 to your computer and use it in GitHub Desktop.
Save apatrushev/94ae5ce486adf2d6126c9a969eb2d520 to your computer and use it in GitHub Desktop.
background asyncio loop. the idea behind was to integrate asyncio code with sync code and/or other loops. this approach will work if you need not tight integration but will be happy with callbacks between (sync/your loop) and asyncio.
import time
import threading
import asyncio
import janus
class BackgroundLoopBase(threading.Thread):
def __init__(self, *args, **kwargs):
if len(args) > 1 or 'target' in kwargs:
raise RuntimeError('target override not supported')
name = kwargs.pop('name', self.__class__.__name__)
super().__init__(name=name, *args, **kwargs)
self.task_main = None
self.task_loop = None
self.task_main_event = None
@property
def loop(self):
return self.task_loop
def run(self, *args, **kwargs):
try:
asyncio.run(self.coro_main())
finally:
if self.task_main_event is not None:
self.task_main_event.set()
def start(self, *args, **kwargs):
self.task_main_event = threading.Event()
super().start(*args, **kwargs)
self.task_main_event.wait()
self.task_main_event = None
def stop(self):
loop = self.task_loop
if loop is not None:
loop.call_soon_threadsafe(self.task_main.cancel)
self.join()
async def coro_main_internal(self):
raise NotImplementedError()
async def coro_main(self):
self.task_main = asyncio.Task.current_task()
self.task_loop = asyncio.get_event_loop()
try:
self.task_main_event.set()
while True:
await self.coro_main_internal()
except asyncio.CancelledError:
pass
finally:
self.task_main = None
self.task_loop = None
class BackgroundLoopSleep(BackgroundLoopBase):
def __init__(self, *args, **kwargs):
self.sleep_time = kwargs.pop('sleep_time', 60)
super().__init__(*args, **kwargs)
async def coro_main_internal(self):
await asyncio.sleep(self.sleep_time)
class BackgroundLoopCoroQueue(BackgroundLoopBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = None
def put(self, coro, callback=None):
self.queue.sync_q.put((coro, callback))
async def coro_main_internal(self):
coro, callback = await self.queue.async_q.get()
task = asyncio.create_task(coro)
if callback is not None:
task.add_done_callback(callback)
async def coro_main(self):
self.queue = janus.Queue()
try:
await super().coro_main()
finally:
self.queue = None
if __name__ == '__main__':
async def test():
print(1)
b = BackgroundLoopCoroQueue()
try:
b.start()
e = threading.Event()
b.put(test(), lambda x: e.set())
e.wait()
finally:
b.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment