Skip to content

Instantly share code, notes, and snippets.

@JacobCallahan
Created January 28, 2020 19:51
Show Gist options
  • Save JacobCallahan/9ab61a688ae87f7eaf6d32f175da0f5d to your computer and use it in GitHub Desktop.
Save JacobCallahan/9ab61a688ae87f7eaf6d32f175da0f5d to your computer and use it in GitHub Desktop.
prototype general async requests thing
import aiohttp
import asyncio
import async_timeout
import click
from logzero import logger
from pathlib import path
class AsyncRunner:
"""A general class to perform async requests and process data"""
def __init__(self, *args, **kwargs):
self.data = kwargs["data"] # (rest_of_link, data)
self.host_url = kwargs["host_url"]
self.base_path = kwargs["base_path"]
self._semaphore = asyncio.Semaphore(kwargs.get("max_connections", 100))
self._queue = []
def _construct_link(data_elem):
return f"{self.host_url or ''}{self.base_path or ''}{data_elem or ''}"
async def _async_get(self, session, elem):
"""visit a page and download the content, returning the element and content"""
async with self._semaphore, session.get(
self._construct_link(elem), ssl=False
) as response:
content = await response.read()
logger.debug(elem)
return (elem, content)
async def _async_loop(self):
"""asynchronously visit each data element and store results"""
tasks = []
async with aiohttp.ClientSession() as session:
for elem in self.data:
task = asyncio.ensure_future(self._async_get(session, elem))
tasks.append(task)
results = await asyncio.gather(*tasks)
for result in results:
self._queue.append(result)
def _visit_links(self, retries=3):
"""main controller for asynchronous page visiting, will attempt 3 retries"""
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(self._async_loop())
except aiohttp.client_exceptions.ServerDisconnectedError as err:
logger.warning(
f"Lost connection to host.{'Retrying in 10 seconds' if retries else ''}"
)
if retries:
time.sleep(10)
self._visit_links(retries - 1)
def _handle_results(self):
"""perform checks or carry our operations on the results"""
while self._queue:
link_comp, data = self._queue.pop(0)
# !!!! perform your checks here !!!!
pass
def run(self):
"""main function for the runner class"""
logger.info(f"Starting requests against: {self.host_url}{self.base_path}")
self._visit_links()
# sort the results by link element, to normalize return order
self._queue = sorted(self._queue, key=lambda x: x[0])
self._handle_results()
return True
def save(self):
"""Use this method to save your results"""
pass
@click.command()
@click.option(
"--host",
help="Base host url",
default=None,
)
@click.option(
"--base_path",
help="Base request path",
default=None,
)
@click.option(
"--max_connections",
help="Base host url",
type=int,
default=100,
)
def run():
data = None # you are responsible for this
host_url = None
base_path = None
runner = AsyncRunner(
data=my_data,
host_url=host,
base_path=base_path,
max_connections=max_connections,
)
runner.run()
runner.save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment