asyncio samples
import asyncio
import concurrent.futures
import requests
Seconds = [
("first", 5),
("second", 0),
("third", 3)
async def sleeping(order, seconds, hook=None):
await asyncio.sleep(seconds)
if hook:
return order
async def basic_async():
# the order of result is nonsequential (not depends on order, even sleeping time)
for s in Seconds:
r = await sleeping(*s)
print("{0} is finished.".format(r))
return True
async def parallel_by_gather():
# execute by parallel
def notify(order):
print(order + " has just finished.")
cors = [sleeping(s[0], s[1], hook=notify) for s in Seconds]
results = await asyncio.gather(*cors)
return results
async def parallel_by_wait():
# execute by parallel
def notify(order):
print(order + " has just finished.")
cors = [sleeping(s[0], s[1], hook=notify) for s in Seconds]
done, pending = await asyncio.wait(cors)
return done, pending
async def queue_execution(arg_urls, callback, parallel=2):
# see refs
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
for u in arg_urls:
async def fetch(q):
while not q.empty():
u = await q.get()
future = loop.run_in_executor(None, requests.get, u)
await future
tasks = [fetch(queue) for i in range(parallel)]
return await asyncio.wait(tasks)
async def limited_parallel(limit=3):
sem = asyncio.Semaphore(limit)
# function want to limit the number of parallel
async def limited_sleep(num):
with await sem:
return await sleeping(str(num), num)
import random
tasks = [limited_sleep(random.randint(0, 3)) for i in range(9)]
return await asyncio.wait(tasks)
async def future_callback(callback):
futures = []
for s in Seconds:
cor = sleeping(*s)
f = asyncio.ensure_future(cor)
await asyncio.wait(futures)
def get_async_iterator(arg_urls):
class AsyncIterator():
def __init__(self, urls):
self.urls = iter(urls)
self.__loop = None
async def __aiter__(self):
self.__loop = asyncio.get_event_loop()
return self
async def __anext__(self):
u = next(self.urls)
future = self.__loop.run_in_executor(None, requests.get, u)
resp = await future
except StopIteration:
raise StopAsyncIteration
return resp
return AsyncIterator(arg_urls)
def print_num(num):
async def async_by_process():
executor = concurrent.futures.ProcessPoolExecutor()
queue = asyncio.Queue()
for i in range(10):
async def proc(q):
while not q.empty():
i = await q.get()
future = loop.run_in_executor(executor, print_num, i)
await future
tasks = [proc(queue) for i in range(4)] #cpu core
return await asyncio.wait(tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
print("@basic async ******************************************")
print("@parallel by gather ***********************************")
# the result of asyncio.gather is mysterious!
results = loop.run_until_complete(parallel_by_gather())
for r in results:
print("asyncio.gather result: {0}".format(r))
print("@parallel by wait *************************************")
done, pending = loop.run_until_complete(parallel_by_wait())
for d in done:
dr = d.result()
print("asyncio.wait result: {0}".format(dr))
print("@queue execution **************************************")
results = []
def store_result(f):
], store_result))
for r in results:
print("queue execution: {0}".format(r.url))
print("@limited parallel **************************************")
done, pending = loop.run_until_complete(limited_parallel())
for d in done:
print("limited parallel: {0}".format(d.result()))
print("@future callback **************************************")
for r in results:
print("future callback: {0}".format(r))
print("@async iterator ***************************************")
async def async_fetch(urls):
ai = get_async_iterator(urls)
async for resp in ai:
print("@async by process *************************************")
