Skip to content

Instantly share code, notes, and snippets.

@alukach
Created May 4, 2023 05:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alukach/8c3e6c509115d8743aee1e90b4d33897 to your computer and use it in GitHub Desktop.
Save alukach/8c3e6c509115d8743aee1e90b4d33897 to your computer and use it in GitHub Desktop.
Multiprocessing + Asyncio
"""
An example of a script that does CPU-bound work (checksum calculation) followed by
IO-bound work (upload to server) in a performant manner.
Inspiration: https://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces#29147750
"""
import asyncio
import datetime
import hashlib
import multiprocessing
import random
import time
import typing
from concurrent.futures import ProcessPoolExecutor
# Logging colors
green = "\033[92m"
blue = "\033[96m"
clear = "\033[0m"
def log(filename, msg, color=clear):
"""
Mock logger (works in multiprocessing scenario)
"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(
f"{timestamp:<24} {multiprocessing.current_process().name:<15} {filename:<8} {color}{msg}{clear}"
)
def generate_checksum(filepath: str) -> str:
"""
Blocking checksum generation. Should run in its own process.
"""
log(filepath, "Starting checksum...", color=blue)
delay = random.randint(1, 50) / 10
time.sleep(delay) # Pretend this is fast + expensive calculation
log(filepath, f"Completed checksum after {delay} seconds.", color=blue)
return hashlib.sha256(random.randbytes(100)).hexdigest()
async def upload(filepath: str, checksum: str) -> None:
"""
Non-blocking upload. Should run as coroutine.
"""
async with max_concurrent_uploads:
log(filepath, "Starting upload...", color=green)
delay = random.randint(1, 10)
await asyncio.sleep(delay) # Pretend this is slow upload
log(
filepath,
f"Completed upload after {delay} seconds.",
color=green,
)
async def process_file(filepath: str):
checksum = await loop.run_in_executor(pool, generate_checksum, filepath)
await upload(filepath, checksum)
return filepath
async def main(filenames: typing.List[str]):
start = time.time()
for i, task in enumerate(
asyncio.as_completed([process_file(filename) for filename in filenames])
):
# get the next result
filepath = await task
log(filepath, f"{int(i * 100 / len(filenames))}% done")
print(f"Complete in {time.time() - start:.1f} seconds")
if __name__ == "__main__":
random.seed(2)
filecount = 100
cpu_count = multiprocessing.cpu_count()
max_concurrent_uploads = asyncio.Semaphore(cpu_count * 10)
pool = ProcessPoolExecutor(max_workers=cpu_count)
loop = asyncio.get_event_loop()
loop.run_until_complete(main([f"{filename}.txt" for filename in range(filecount)]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment