Skip to content

Instantly share code, notes, and snippets.

@paretech
Created July 26, 2019 14:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paretech/5eb6681efaa8dae35fcebe27a79fb738 to your computer and use it in GitHub Desktop.
Save paretech/5eb6681efaa8dae35fcebe27a79fb738 to your computer and use it in GitHub Desktop.
Python AsyncIO Example #1
import asyncio
import logging
import random
import sys
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
def config_logging():
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
level=logging.DEBUG,
datefmt="%H:%M:%S",
stream=sys.stderr,
)
def bug(msg='Hi, this is Bug!'):
LOGGER.warning(msg)
raise Exception(msg)
async def heartbeat():
# Ensure something running on loop https://bugs.python.org/issue23057
while True:
LOGGER.info('heartbeat...')
await asyncio.sleep(1)
async def worker(q, ident=None):
LOGGER.info(f'Worker #{ident} - Starting')
try:
while True:
present_page = await q.get()
LOGGER.info(f'Worker #{ident} - Servicing Page {present_page}')
# Produce more tasks
if present_page is 0:
for new_page in range(1, 10):
LOGGER.info(f'Worker #{ident} - Adding Page {new_page} to the queue')
await q.put(new_page)
q.task_done()
# If bug before task_done() need different strategy
if present_page is 5: bug()
# Sleep is good for examples, otherwise first worker may appear
# to be doing all the work. Not exactly picture of concurrency...
await asyncio.sleep(random.uniform(0.75, 1.0))
except asyncio.CancelledError:
LOGGER.warning(f'Worker #{ident} - Cancelled')
except Exception as e:
LOGGER.error(f'Worker #{ident} - Raised exception: {e!r}')
finally:
LOGGER.info(f'Worker #{ident} - Stopped')
async def main():
q = asyncio.queues.Queue()
# Place first job on the queue
await q.put(0)
# Schedule heartbeats
asyncio.create_task(heartbeat())
# Schedule some workes
task_pool = [asyncio.create_task(worker(q, i)) for i in range(5)]
LOGGER.info('Finished creating tasks, waiting on queue...')
# Join will block until count of unfinished tasks is zero (all queues).
# Exception handling more challenging when using join.
await q.join()
LOGGER.info('Queue stopped blocking, on with the show!')
# Don't forget to cancel running tasks after queue is empty or will
# see asyncio errors for "Task was destroyed but it is pending!"
asyncio.gather(*task_pool).cancel()
if __name__ == '__main__':
config_logging()
loop = asyncio.get_event_loop()
loop.set_debug(True)
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
LOGGER.info("User aborted")
finally:
LOGGER.info("That's All Folks!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment