Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Created August 10, 2018 12:26
Show Gist options
  • Save vxgmichel/82c3c4a683e75e89c62ae08e2cce7efd to your computer and use it in GitHub Desktop.
Save vxgmichel/82c3c4a683e75e89c62ae08e2cce7efd to your computer and use it in GitHub Desktop.
Answering on stack overflow - see the docstring
"""
A corrected version of the program in following stackoverflow question:
https://stackoverflow.com/questions/51775413/python-asyncio-run-coroutine-threadsafe-never-running-coroutine
"""
import asyncio
from threading import Thread
from contextlib import contextmanager
@contextmanager
def background_thread_loop():
def run_forever(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
loop = asyncio.new_event_loop()
try:
thread = Thread(target=run_forever, args=(loop,))
thread.start()
yield loop
finally:
loop.call_soon_threadsafe(loop.stop)
thread.join()
class Foo:
# Synchronous methods
def _run_async(self, coro):
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
future.result()
def __init__(self, loop):
self._loop = loop
self._run_async(self._init())
def put(self, item):
return self._run_async(self._put(item))
def join(self):
return self._run_async(self._join())
# Async methods
async def _init(self):
self._queue = asyncio.Queue()
self._consumer_task = self._loop.create_task(self._consumer())
async def _consumer(self):
while True:
message = await self._queue.get()
if message == "END OF QUEUE":
break
print(f"Processing {message!r}...")
async def _join(self):
return await self._consumer_task
async def _put(self, item):
return await self._queue.put(item)
def main():
with background_thread_loop() as loop:
f = Foo(loop)
f.put("This is a message")
f.put("END OF QUEUE")
f.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment