Skip to content

Instantly share code, notes, and snippets.

@kunev
Last active July 9, 2020 15:09
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save kunev/f83146d407c81a2d64a6 to your computer and use it in GitHub Desktop.
Save kunev/f83146d407c81a2d64a6 to your computer and use it in GitHub Desktop.
Coroutines example with python's asyncio module
import asyncio
@asyncio.coroutine
def open_file(name):
print("opening {}".format(name))
return open(name)
@asyncio.coroutine
def close_file(file):
print("closing {}".format(file.name))
file.close()
@asyncio.coroutine
def read_data(file):
print("reading {}".format(file.name))
return file.read()
@asyncio.coroutine
def process_data(filename):
# I want the result from open_file(filename)
# untill it's done don't bother calling me
file = yield from asyncio.async(open_file(filename))
print('opened {}'.format(filename))
# I want the result from read_data(file)
# untill it's done don't bother calling me
data = yield from asyncio.async(read_data(file))
print('read {}'.format(filename))
yield from close_file(file)
@asyncio.coroutine
def main_coro(loop):
# start our tasks asynchronously in futures
tasks = [
asyncio.async(process_data('/etc/passwd')),
asyncio.async(process_data('/etc/group')),
asyncio.async(process_data('/var/log/Xorg.0.log')),
]
# untill all futures are done
while not all(task.done() for task in tasks):
# take a short nap
yield from asyncio.sleep(0.01)
# we're done, so stop the event loop
loop.stop()
# get event loop
loop = asyncio.get_event_loop()
# schedule the main coroutine to start as soon as possible
loop.call_soon(asyncio.async, main_coro(loop))
# run untill explicitly stopped
loop.run_forever()
# instead of the above two lines we can also run
# loop.run_until_complete(main_coro()) and remove
# the loop parameter for main_coro and the call
# to loop.stop() at the end of it
loop.close()
@drt24
Copy link

drt24 commented Feb 12, 2015

This does not work. The IO operations are blocking IO operations. https://groups.google.com/forum/#!topic/python-tulip/MvpkQeetWZA Running with PYTHONASYNCIODEBUG=1 python3 showed me that the read_data coroutine was taking 0.1s to run as was blocking.

@chris-zen
Copy link

Hi @kunev, nice Gist idea! but as @drt24 comments there are blocking IO operations that block the loop. You can use loop.run_in_executor() to detach the blocking operations from the loop thread. Something like:

@asyncio.coroutine
def read_data(file):
  print("reading {}".format(file.name))
  loop = asyncio.get_event_loop()
  data = yield from loop.run_in_executor(None, file.read) 
  return data

And you could replace lines such as

 data = yield from asyncio.async(read_data(file))

by

 data = yield from read_data(file)

You could elaborate a bit more the gist by reading the file line by line:

@asyncio.coroutine
def read_line(file, loop):
  line = yield from loop.run_in_executor(None, file.readline)
  if len(line) == 0:
    return None
  return line.decode("utf-8").rstrip("\n")
  while True:
      line = yield from read_line(file, loop)
      if line is None:
        break
      # do something with line

@zmedico
Copy link

zmedico commented Sep 18, 2016

I've created an AsyncIteratorExecutor class, which can be a handy way to execute I/O via a thread. I release this to the public domain:

import asyncio


class AsyncIteratorExecutor:
    """
    Converts a regular iterator into an asynchronous
    iterator, by executing the iterator in a thread.
    """
    def __init__(self, iterator, loop=None, executor=None):
        self.__iterator = iterator
        self.__loop = loop or asyncio.get_event_loop()
        self.__executor = executor

    def __aiter__(self):
        return self

    async def __anext__(self):
        value = await self.__loop.run_in_executor(
            self.__executor, next, self.__iterator, self)
        if value is self:
            raise StopAsyncIteration
        return value


async def cat_file_async(filename):
    with open(filename, 'rt') as f:
        async for line in AsyncIteratorExecutor(f):
            print(line.rstrip())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(
            cat_file_async('/path/of/file.txt'))
    finally:
        loop.close()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment