Skip to content

Instantly share code, notes, and snippets.

@adilkhash
Created October 28, 2022 12:03
Show Gist options
  • Save adilkhash/3f8863006813676d3d31804f01ab7e9c to your computer and use it in GitHub Desktop.
Save adilkhash/3f8863006813676d3d31804f01ab7e9c to your computer and use it in GitHub Desktop.
import typing as t
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
import requests
def parallelize(f: t.Callable = None, max_workers: int = 5):
def decorator(func):
@wraps(func)
def wrapper(arguments: t.List[t.Tuple[t.Any, ...]]) -> t.Dict[int, t.Any]:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(func, *args): i
for i, args in enumerate(arguments)
}
output = {}
for future in as_completed(futures):
try:
result = future.result()
except Exception as e:
output[futures[future]] = e
else:
output[futures[future]] = result
return output
return wrapper
if f and callable(f):
return decorator(f)
else:
return decorator
@parallelize
def get_status(url: str) -> int:
return requests.get(url).status_code
if __name__ == '__main__':
output = get_status([
('https://khashtamov.com',),
('https://ya.ru/', ),
('https://tengrinews.kz/', ),
('https://vlast.kz/', ),
('https://google.com/', ),
('https://flip.kz/', ),
('https://meloman.kz/', ),
])
print(output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment