Skip to content

Instantly share code, notes, and snippets.

@Zhuwenhui
Forked from rhoboro/async_sample.py
Created September 12, 2019 18:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Zhuwenhui/4f018afbbb58fe5b4a1fbd3b10dec218 to your computer and use it in GitHub Desktop.
Save Zhuwenhui/4f018afbbb58fe5b4a1fbd3b10dec218 to your computer and use it in GitHub Desktop.
asyncio, aiohttpを利用した並列処理のサンプルコード
import asyncio
import aiohttp
import async_timeout
from aiohttp import ClientError
async def _fetch(session, url, coro):
"""HTTPリソースからデータを取得しコルーチンを呼び出す
:param session: aiohttp.ClientSessionインスタンス
:param url: アクセス先のURL
:param coro: urlとaiohttp.ClientResponseを引数に取るコルーチン
:return: coroの戻り値
"""
async with async_timeout.timeout(10):
try:
response = await session.get(url)
except ClientError as e:
print(e)
response = None
return await coro(url, response)
async def _bound_fetch(semaphore, url, session, coro):
"""並列処理数を制限しながらHTTPリソースを取得するコルーチン
:param semaphore: 並列数を制御するためのSemaphore
:param session: aiohttp.ClientSessionインスタンス
:param url: アクセス先のURL
:param coro: urlとaiohttp.ClientResponseを引数に取るコルーチン
:return: coroの戻り値
"""
async with semaphore:
return await _fetch(session, url, coro)
async def _run(urls, coro, limit=1):
"""並列処理数を制限しながらHTTPリソースを取得するコルーチン
:param urls: URLの一覧
:param coro: urlとaiohttp.ClientResponseを引数に取るコルーチン
:param limit: 並列実行の最大数
:return: coroの戻り値のリスト。urlsと同順で返す
"""
tasks = []
semaphore = asyncio.Semaphore(limit)
# [SSL: CERTIFICATE_VERIFY_FAILED]エラーを回避する
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
for url in urls:
task = asyncio.ensure_future(_bound_fetch(semaphore, url, session, coro))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
def main(urls, coro, limit=3):
"""並列処理数を制限しながらHTTPリソースを取得し、任意の処理を行う
:param urls: URLの一覧
:param coro: urlとaiohttp.ClientResponseを引数に取る任意のコルーチン
:param limit: 並列実行の最大数
:return: coroの戻り値のリスト。urlsと同順。
"""
loop = asyncio.get_event_loop()
results = loop.run_until_complete(_run(urls, coro, limit))
return results
if __name__ == '__main__':
# url, responseを受け取る任意のコルーチン
async def coroutine(url, response):
return url, response.status, await response.text()
urls = [
'https://www.google.co.jp/',
'https://twitter.com/',
'https://www.facebook.com/',
]
results = main(urls=urls, coro=coroutine, limit=3)
for url, status, body in results:
print(url, status, body[:10])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment