Skip to content

Instantly share code, notes, and snippets.

@rkorsak
Last active October 1, 2020 16:14
Show Gist options
  • Save rkorsak/2af2e711cc790cb9d2f0e28302d77f58 to your computer and use it in GitHub Desktop.
Save rkorsak/2af2e711cc790cb9d2f0e28302d77f58 to your computer and use it in GitHub Desktop.
A simple reproduction of multi-processing issues when running FastAPI in Docker
__pycache__
*.pyc
*.pyo
*.pyd
version: "3.8"
services:
app:
build:
context: .
ports:
- 8000:80
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.8
COPY . /app
from fastapi import FastAPI, Depends
from functools import lru_cache
from logging import getLogger
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
logger = getLogger(__name__)
app = FastAPI()
global_process_pool = None
@lru_cache
def get_process_pool():
return ProcessPool()
@lru_cache
def get_thread_pool():
return ThreadPool()
@app.on_event("startup")
async def on_startup():
global global_process_pool
global_process_pool = ProcessPool()
@app.on_event("shutdown")
async def on_shutdown():
global_process_pool.terminate()
@app.post("/serial")
async def run_serial():
for n in range(10):
do_work(n)
return "Complete"
@app.post("/multiproc/dependency")
async def run_multiproc(process_pool = Depends(get_process_pool)):
logger.error("Running multiprocessing-based work with a dependency-injected pool")
logger.error(f"Using pool: {process_pool}")
process_pool.starmap(do_work, [(n,) for n in range(10)])
return "Complete"
@app.post("/multiproc/context")
async def run_multiproc():
logger.error("Running multiprocessing-based work with a context-managed pool")
with ProcessPool() as process_pool:
logger.error(f"Using pool: {process_pool}")
process_pool.starmap(do_work, [(n,) for n in range(10)])
return "Complete"
@app.post("/multiproc/global")
async def run_multiproc():
logger.error("Running multiprocessing-based work with a global pool")
logger.error(f"Using pool: {global_process_pool}")
global_process_pool.starmap(do_work, [(n,) for n in range(10)])
return "Complete"
@app.post("/multiproc/ad-hoc")
async def run_multiproc():
logger.error("Running multiprocessing-based work with new pool scoped to this route")
process_pool = ProcessPool()
logger.error(f"Using pool: {process_pool}")
process_pool.starmap(do_work, [(n,) for n in range(10)])
return "Complete"
@app.post("/threaded")
async def run_threaded(thread_pool = Depends(get_thread_pool)):
logger.error("Running thread-based work")
logger.error(f"Using pool: {thread_pool}")
thread_pool.starmap(do_work, [(n,) for n in range(10)])
return "Complete"
def do_work(n: int):
logger.error(f"Doing work: {n}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment