Skip to content

Instantly share code, notes, and snippets.

@toejough
Last active July 19, 2016 14:24
Show Gist options
  • Save toejough/244452a0268c8ff6afdb2776aeb717b7 to your computer and use it in GitHub Desktop.
Save toejough/244452a0268c8ff6afdb2776aeb717b7 to your computer and use it in GitHub Desktop.
exploring python asyncio
import asyncio
import functools
# [ Helpers ]
async def run_blocking(func, *args, **kwargs):
partial = functools.partial(func, *args, **kwargs)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, partial)
def queue_async(func, *args, **kwargs):
loop = asyncio.get_event_loop()
loop.create_task(func(*args, **kwargs))
def run_async_tasks():
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(asyncio.Task.all_tasks(loop)))
loop.close()
# [ Tasks ]
RUN = True
async def stop():
await asyncio.sleep(5)
global RUN
RUN = False
async def async_main():
while RUN:
await hello_world()
# [ Async Actions ]
async def hello_world():
await run_blocking(print, 'hello world')
# [ Main ]
queue_async(async_main)
queue_async(stop)
run_async_tasks()
@toejough
Copy link
Author

should probably be simplified with a class that holds the main loop, instead of calling get_event_loop in all of the helpers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment