Skip to content

Instantly share code, notes, and snippets.

@FBosler
Last active September 1, 2022 20:40
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save FBosler/c77d28f7d719ac52364bd8420cb0880d to your computer and use it in GitHub Desktop.
Save FBosler/c77d28f7d719ac52364bd8420cb0880d to your computer and use it in GitHub Desktop.
ThreadPool
from datetime import datetime
import aiohttp
import asyncio
URL = "https://medium.fabianbosler.de/run"
async def sample_asyncio(samples):
start = datetime.now()
async def main():
async with aiohttp.ClientSession() as session:
async with session.get(URL) as resp:
return await resp.json()
results = await asyncio.gather(*[main() for _ in range(samples)])
return results, datetime.now()-start
if __name__ == '__main__':
print(asyncio.run(sample_asyncio(64))[1])
from datetime import datetime
from multiprocessing.pool import ThreadPool
from itertools import chain
import pandas as pd
import requests
import aiohttp
import asyncio
URL = "https://medium.fabianbosler.de/run"
def fetch_quote(*args):
try:
res = requests.get(URL).json()
except Exception:
res = 'ERROR'
return res
def parallel_extraction(threads, samples):
start = datetime.now()
res = []
with ThreadPool(processes=threads) as pool:
res.extend(pool.map(fetch_quote, range(samples)))
return res, datetime.now() - start
def sequential_extraction(samples):
start = datetime.now()
res = []
for dt in range(samples):
res.append(fetch_quote())
return res, datetime.now() - start
async def sample_asyncio(samples):
start = datetime.now()
async def main():
async with aiohttp.ClientSession() as session:
async with session.get(URL) as resp:
return await resp.json()
results = await asyncio.gather(*[main() for _ in range(samples)])
return results, datetime.now() - start
if __name__ == '__main__':
benchmarking_res = []
for num_samples in chain(range(0, 10,3), range(10, 20, 5), [50], range(100, 2100, 600)):
for num_threads in [-1, 0, 8, 16, 24, 32, 64, 128]:
print(f'started with {num_samples} samples and {num_threads} threads')
if num_threads == -1:
data, time = asyncio.run(sample_asyncio(num_samples))
elif num_threads == 0:
data, time = sequential_extraction(num_samples)
else:
data, time = parallel_extraction(num_threads, num_samples)
errors = len([x for x in data if x == 'ERROR'])
benchmarking_res.append({
'num_samples': num_samples,
'num_threads': num_threads,
'execution_time': time,
'errors': errors
})
_ = pd.DataFrame(benchmarking_res)
_.to_csv('results.csv', index=False)
_ = pd.pivot(_, index='num_samples', columns='num_threads', values='execution_time')
_.applymap(lambda x: pd.to_timedelta(x)).plot(figsize=(16, 9))
from multiprocessing.pool import ThreadPool
import requests
from datetime import datetime
URL = "https://medium.fabianbosler.de/run"
def fetch_quote(*args):
try:
res = requests.get(URL).json()
except Exception:
res = 'ERROR'
return res
def parallel_extraction(threads, samples):
start = datetime.now()
res = []
with ThreadPool(processes=threads) as pool:
res.extend(pool.map(fetch_quote, range(samples)))
return res, datetime.now() - start
if __name__ == "__main__":
print(parallel_extraction(16, 64))
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
res = []
with Pool(5) as p:
res.append(p.map(f, [1, 2, 3]))
print(res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment