Skip to content

Instantly share code, notes, and snippets.

@alexrudy
Created June 18, 2022 00:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexrudy/fd9d2d65d7fd75e9718499e28ec5600a to your computer and use it in GitHub Desktop.
Save alexrudy/fd9d2d65d7fd75e9718499e28ec5600a to your computer and use it in GitHub Desktop.
Parallelism Demo
"""
Python parallelism demo
Run this file with `python parallel-demo.py` to see how different
parallelism strategies fare with different kinds of parallism.
This requires aiohttp (pip install aiohttp) and requests (pip install requests)
Two environment variables control behavior:
INTERACT=1 python t.py
will stop before running each test, and wait for you to press a key to continue.
DEBUG=1 python t.py
will turn on debug logging.
"""
import multiprocessing
import concurrent.futures
import time
import requests
import logging
import functools
import asyncio
import os
import random
import io
import threading
import aiohttp
import abc
import typing as t
DEBUG = bool(os.environ.get("DEBUG", ""))
INTERACT = bool(os.environ.get("INTERACT", ""))
if DEBUG:
logging.basicConfig(level=logging.DEBUG)
def fib(i: int) -> int:
"""Recursively compute fibbonaci numbers
This implementaion is designed to be slow to simulate CPU-bound work.
"""
if i == 1:
return 1
if i == 0:
return 0
return fib(i - 1) + fib(i - 2)
N = 31
def busyloop() -> None:
"""
Spend about 1s of CPU time computing the 32nd fibonnaicci number.
"""
# This takes about 1s of CPU time
# and holds the GIL during most of that time.
start = time.monotonic()
x = fib(32) * fib(32)
while time.monotonic() - start < 1.0:
x += x+N
logging.debug(f"Finished in {time.monotonic() - start:.1f}")
def msg(msg: str, i: int) -> None:
"""Print a message with the worker number prefixed"""
print(f"[{i}] {msg}")
def do_some_work(i: int, busy: bool=False) -> int:
"""Do some work
Runs an HTTP request (IO bound)
and either a sleep (no GIL holding)
or a fibonnaicci calculation (holds GIL and uses CPU)
"""
url = f"http://httpbin.org/get?i={i}"
msg(f"Starting request to {url}", i)
requests.get(url)
msg("Finished request. Starting work", i)
if busy:
busyloop()
else:
time.sleep(1)
msg(f"Completed work", i)
return i
async def do_some_async_work(i: int, busy: bool=False) -> int:
"""Do some work – uses cooperative multitasking
Runs an HTTP request (IO bound)
and either a sleep (no GIL holding)
or a fibonnaicci calculation (holds GIL and uses CPU)
"""
url = f"http://httpbin.org/get?i={i}"
msg(f"Starting request to {url}", i)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
resp = await response.text()
msg("Finished request. Starting work", i)
if busy:
busyloop()
else:
await asyncio.sleep(1)
msg(f"Completed work", i)
return i
class Context(metaclass=abc.ABCMeta):
"""Provides for a simple, no-op context manager
This makes context managers interchangeable with classes
like multiprocessing.Pool so we can compare parallelism strategies.
"""
def __enter__(self) -> "Context":
return self
def __exit__(self, *args):
return None
@abc.abstractmethod
def map(self, func: t.Callable[[int], int], iterable: t.Iterable[int]) -> t.List[int]:
raise NotImplemented("Subclasses must implement .map")
class Serial(Context):
"""
Run a sequence of tasks serially with no parallelism
"""
def map(self, func: t.Callable[[int], int], iterable: t.Iterable[int]) -> t.List[int]:
return list(map(func, iterable))
class AsyncIOPool(Context):
"""
Run a sequence of tasks using asyncio
"""
async def worker(self, func, item):
if asyncio.iscoroutinefunction(func):
return await func(item)
else:
return func(item)
async def driver(self, func, iterable):
tasks = [asyncio.create_task(self.worker(func, item)) for item in iterable]
return await asyncio.gather(*tasks)
def map(self, func: t.Callable[[int], int], iterable: t.Iterable[int]) -> t.List[int]:
return asyncio.run(self.driver(func, iterable))
class AsyncIODanglingPool(AsyncIOPool):
"""
Run a sequence of tasks using asyncio, but don't await
the individual tasks, just spawn them and leave them
This is not a good way to use asyncio, but it demos
one of the major pitfalls – failing to await a task
is easy and just causes tasks to never run.
"""
async def worker(self, func, item):
"""
A worker function for calling a func which is either async or a regular function
"""
r = await super().worker(func, item)
self.completed.add(r)
return r
async def builder(self, func, item):
"""
A worker function for calling a func which is either async or a regular function
"""
asyncio.create_task(self.worker(func, item))
async def driver(self, func, iterable):
tasks = [self.builder(func, item) for item in iterable]
await asyncio.wait(tasks)
return self.completed
def map(self, func, iterable):
self.completed = set()
return super().map(func, iterable)
def driver(pool_cls, n=5, busy=False, func=do_some_work):
"""Drive a processing pool to run n tasks"""
with pool_cls() as pool:
return list(pool.map(functools.partial(func, busy=busy), range(n)))
def trial(func, *args, name):
"""Run a single example, printing out information"""
print(f" {name} ".center(70, "-"))
if INTERACT:
input("Start?")
start = time.monotonic()
finished = func(*args)
end = time.monotonic()
print(f"Took {end-start:.1f}s to run 5 {name}")
msg = ", ".join((str(i) for i in finished))
print(f"Completed {len(finished)} tasks: {msg}")
return (end - start, len(finished))
def main():
"""Run all the examples and show the summaries at the end"""
n_tasks = 5
TIMINGS = {}
for (pool, pool_name) in [
(Serial, "serial"),
(multiprocessing.Pool, "multiprocessing"),
(concurrent.futures.ThreadPoolExecutor, "concurrent.futures threads"),
(concurrent.futures.ProcessPoolExecutor, "concurrent.futures process"),
(AsyncIOPool, "asyncio"),
(AsyncIODanglingPool, "asyncio dangling"),
]:
for busy in (True, False):
_busy = " busy" if busy else ""
name = f"{pool_name}{_busy}"
TIMINGS[name] = trial(driver, pool, n_tasks, busy, name=name)
for busy in (True, False):
_busy = " busy" if busy else ""
name = f"asyncio with cooperation{_busy}"
TIMINGS[name] = trial(driver, AsyncIOPool, n_tasks, busy, do_some_async_work, name=name)
name = f"asyncio dangling with cooperation{_busy}"
TIMINGS[name] = trial(driver, AsyncIODanglingPool, n_tasks, busy, do_some_async_work, name=name)
print("")
print(" summary ".center(70, "="))
width = max(len(name) for name in TIMINGS)
for name, (duration, n_finished) in TIMINGS.items():
summary_line(name, duration, n_finished, width, n_tasks)
print(" sorted summary ".center(70, "="))
for name in sorted(TIMINGS.keys(), key=lambda key: TIMINGS[key][0]):
(duration, n_finished) = TIMINGS[name]
summary_line(name, duration, n_finished, width, n_tasks)
def summary_line(name, duration, n_finished, width, n_expected):
"""Print a summary line"""
is_err = "!!!" if n_finished != n_expected else ""
print(f"{name:{width}.{width}s} | {duration: 5.1f}s | {n_finished: 2d}/5 finished {is_err}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment