Skip to content

Instantly share code, notes, and snippets.

@liviaerxin
Created March 31, 2023 08:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liviaerxin/4de85f251941bb385b6bcc8374e471be to your computer and use it in GitHub Desktop.
Save liviaerxin/4de85f251941bb385b6bcc8374e471be to your computer and use it in GitHub Desktop.
Serving long-running/heavy-computation task via FastAPI and concurrent.futures.ProcessPoolExecutor #python #fastapi #ml
"""
Serving long-running/heavy-computation task via FastAPI and concurrent.futures.ProcessPoolExecutor
usage:
```sh
uvicorn main:app --reload
```
"""
from fastapi import FastAPI
import os
import sys
import asyncio
from typing import List
import queue
from concurrent.futures import ProcessPoolExecutor, Future
app = FastAPI()
executor = ProcessPoolExecutor(1)
task_list: List[Future] = []
def task(x: int):
import time
import os
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] start task")
time.sleep(x)
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] end task")
return {"pid": os.getpid(), "completed": True}
def callback(future: Future):
import os
print(
f"PID[{os.getpid()}] Parent[{os.getppid()}] callback result[{future.result()}]"
)
async def task_consumer():
global task_list
while True:
for future in task_list:
if future.done():
print(
f"PID[{os.getpid()}] Parent[{os.getppid()}] task_consumer [{future}]"
)
task_list.remove(future)
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] .")
await asyncio.sleep(1)
@app.on_event("startup")
async def startup():
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] app startup")
asyncio.create_task(task_consumer())
@app.get("/")
async def root():
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] root")
return {"message": "Hello World"}
@app.get("/analyze")
async def analyze():
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] analyze")
future = executor.submit(task, 10)
future.add_done_callback(callback)
task_list.append(future)
# res = pool.apply_async(task, args=(10,), callback=callback)
# NOTE: block the main asyncio loop, other requests are not available
# res.get()
# try:
# result = res.get(timeout=-1)
# except TimeoutError:
# sys.stdout.write(".")
# await asyncio.sleep(0.01)
return {"result": future.done()}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment