Skip to content

Instantly share code, notes, and snippets.

@deliro
Last active November 21, 2020 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deliro/26a3ce1808fc052a4c17984131649b33 to your computer and use it in GitHub Desktop.
Save deliro/26a3ce1808fc052a4c17984131649b33 to your computer and use it in GitHub Desktop.
async parsing of wikipedia with process pool
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
from loguru import logger as loguru
from lxml.html import fromstring
pool = ProcessPoolExecutor()
parser_sem = asyncio.Semaphore(pool._max_workers)
loguru.info(f"CPU workers: {pool._max_workers}")
host = "https://ru.wikipedia.org"
start_from = f"{host}/wiki/Заглавная_страница"
q_d = asyncio.Queue(maxsize=1024)
q_p = asyncio.Queue()
sem = asyncio.Semaphore(100)
downloaded_urls = set()
class O:
downloaded = 0
parsed = 0
downloading = 0
down_pending = 0
waiting_for_download_q = 0
o = O()
async def log_printer(queue_d, queue_p):
while True:
loguru.debug(
f"[PRINTER] to Download: {queue_d.qsize()}, to Parse: {queue_p.qsize()}"
f" downloaded: {o.downloaded}, parsed: {o.parsed}"
f" pending: {o.down_pending}, downloading: {o.downloading}"
f" waiting Q: {o.waiting_for_download_q}"
f" tasks: {len(asyncio.all_tasks())}"
)
await asyncio.sleep(0.33)
def lxml_parse(html):
try:
tree = fromstring(html)
urls = tree.xpath("//a/@href")
try:
title = tree.find(".//title").text
except AttributeError:
title = "<UNKNOWN>"
new_urls = []
for url in urls:
if url.startswith("/") and not url.startswith("//"):
new_urls.append(f"{host}{url}")
elif url.startswith("http"):
new_urls.append(url)
return new_urls, title
except Exception as e:
loguru.error(f"Parse error: {e}")
return [], "<ERROR>"
async def parse(html):
loop = asyncio.get_event_loop()
urls, title = await loop.run_in_executor(pool, lxml_parse, html)
o.parsed += 1
return urls, title
async def start_parse_task(content, queue_d):
async with parser_sem:
urls, title = await parse(content)
# loguru.debug(f"[PARSER]: Parse done {title}")
o.waiting_for_download_q += 1
for url in urls:
if url not in downloaded_urls:
await queue_d.put(url)
o.waiting_for_download_q -= 1
# loguru.debug(f"[PARSER]: Add {len(urls)} to download queue")
async def parser(queue_d, queue_p):
while True:
content = await queue_p.get()
asyncio.create_task(start_parse_task(content, queue_d))
async def downloader(queue_d, queue_p, session):
while True:
url = await queue_d.get()
if url in downloaded_urls:
continue
o.down_pending += 1
async with sem:
o.down_pending -= 1
o.downloading += 1
try:
async with session.get(url) as resp:
downloaded_urls.add(url)
# loguru.debug(f"[DOWNLOADER]: got response for {url}")
try:
text = await resp.text()
await queue_p.put(text)
except UnicodeDecodeError:
pass
o.downloaded += 1
except Exception as e:
loguru.error(f"Download error: {e}")
finally:
o.downloading -= 1
async def main():
await q_d.put(start_from)
async with aiohttp.ClientSession() as session:
ds = []
for i in range(100):
ds.append(asyncio.create_task(downloader(q_d, q_p, session)))
p = asyncio.create_task(parser(q_d, q_p))
printer = asyncio.create_task(log_printer(q_d, q_p))
await asyncio.gather(*ds, p, printer)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment