Skip to content

Instantly share code, notes, and snippets.

@ershovio
Created December 16, 2022 12:41
Show Gist options
  • Save ershovio/7151d8928b9694e6d52f61945b67ca0d to your computer and use it in GitHub Desktop.
Save ershovio/7151d8928b9694e6d52f61945b67ca0d to your computer and use it in GitHub Desktop.
Example of using Scraper API with aiohttp
import os
import requests
import time
import logging
import aiohttp
import asyncio
class ScraperAPI:
def __init__(
self,
api_key: str | None = None,
waiting_time_coef: int = 2,
exp_backoff_coef: float = 1.0,
num_of_concurrent_requests: int = 5
):
self._api_key = api_key or os.environ['SCRAPER_API_KEY']
self._waiting_time = waiting_time_coef
self._exp_backoff = exp_backoff_coef
self._num_of_conc_req = num_of_concurrent_requests
self._logger = logging.getLogger(__name__)
async def scrape_urls(self, urls: list[str]) -> dict[str, list[dict]]:
status_urls = self._get_status_urls(urls)
res = {
'success': [],
'failed': []
}
responses = await self._get_data_from_status_urls(status_urls)
for response in responses.values():
if response['status'] == 'finished':
res['success'].append((response['url'], response['response']))
else:
res['failed'].append((response['url'], response['response']))
if len(res['failed']) > 0:
self._logger.warning(f'Failed to scrape {len(res["failed"])} urls. URLs:\n{[(i[0], i[1]["status"]) for i in res["failed"]]}')
return res
async def _get_data_from_status_urls(
self,
status_urls: list[str],
res: dict = {},
current_backoff: float = 1.0
) -> dict[str, dict]:
waiting_time = max(
self._waiting_time * current_backoff,
self._waiting_time * current_backoff * len(status_urls) / self._num_of_conc_req
)
self._logger.info(f'Waiting for {waiting_time} sec')
time.sleep(waiting_time)
responses = await self._get_responses_from_status_urls(status_urls)
running_jobs = [i for i in responses if i['status'] == 'running']
completed_jobs = [i for i in responses if i['status'] != 'running']
res.update({i['url']: i for i in completed_jobs})
if len(running_jobs) > 0:
self._logger.info(f'{len(running_jobs)} jobs are still running')
return await self._get_data_from_status_urls(
[i['statusUrl'] for i in running_jobs],
res,
current_backoff * self._exp_backoff
)
else:
self._logger.info(f'All {len(res.keys())} jobs are completed')
return res
async def _get_responses_from_status_urls(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit_per_host=self._num_of_conc_req)) as session:
return await asyncio.gather(*[self._get_response_from_status_url(url, session) for url in urls])
async def _get_response_from_status_url(self, url: str, session: aiohttp.ClientSession) -> dict:
async with session.get(url) as response:
return await response.json()
def _get_status_urls(self, urls: list[str]) -> list[str]:
scraping_jobs = requests.post(
url = 'https://async.scraperapi.com/batchjobs',
json = { 'apiKey': self._api_key, 'urls': urls}
)
return [i['statusUrl'] for i in scraping_jobs.json()]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment