Skip to content

Instantly share code, notes, and snippets.

@sgraaf
Created October 14, 2020 15:06
Show Gist options
  • Save sgraaf/81eefa6fcb1f9efee25fb28eaa379ded to your computer and use it in GitHub Desktop.
Save sgraaf/81eefa6fcb1f9efee25fb28eaa379ded to your computer and use it in GitHub Desktop.
A requests Session that can make concurrent GET and POST-requests.
import os
from concurrent.futures import Future, ThreadPoolExecutor
from functools import partial
from itertools import repeat
from typing import Any, Dict, List, Optional, Union
from requests import Response, Session
class ConcurrentSession(Session):
def __init__(self, max_workers: Optional[int] = os.cpu_count() * 5) -> None:
super().__init__()
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def get(
self,
url_or_urls: Union[str, List[str]],
return_futures: Optional[bool] = False,
**kwargs
) -> Union[Response, List[Response], Future, List[Future]]:
_get = partial(super().get, **kwargs)
if isinstance(url_or_urls, str): # single URL
future = self.executor.submit(_get, url_or_urls)
return future if return_futures else future.result()
if return_futures: # return a `Future` object instead of a `Response` object
return [self.executor.submit(_get, url) for url in url_or_urls]
return list(self.executor.map(_get, url_or_urls))
def post(
self,
url_or_urls: Union[str, List[str]],
data: Optional[Union[Any, List[Any]]] = None,
json: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
return_futures: Optional[bool] = False,
**kwargs
) -> Union[Response, List[Response]]:
_post = partial(super().post, data=data, json=json, **kwargs)
if isinstance(url_or_urls, str): # single URL
assert not(isinstance(data, list) or isinstance(json, list))
future = self.executor.submit(_post, url_or_urls)
return future if return_futures else future.result()
if isinstance(data, list): # different data for each URL
assert len(url_or_urls) == len(data)
if return_futures: # return a `Future` object instead of a `Response` object
return [self.executor.submit(partial(super().post, json=json, **kwargs), url, _data) for url, _data in zip(url_or_urls, data)]
return list(self.executor.map(partial(super().post, json=json, **kwargs), url_or_urls, data))
if isinstance(json, list): # different JSON for each URL
assert len(url_or_urls) == len(json)
if return_futures: # return a `Future` object instead of a `Response` object
return [self.executor.submit(partial(super().post, **kwargs), url, None, _json) for url, _json in zip(url_or_urls, json)]
return list(self.executor.map(partial(super().post, **kwargs), url_or_urls, repeat[None], json))
if return_futures: # return a `Future` object instead of a `Response` object
return [self.executor.submit(_post, url) for url in url_or_urls]
return list(self.executor.map(_post, url_or_urls))
def close(self) -> None:
self.executor.shutdown(wait=True)
super().close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment