Skip to content

Instantly share code, notes, and snippets.

@liviaerxin
Created March 31, 2023 08:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liviaerxin/ee6c1548ef739dda87f49b64890784d3 to your computer and use it in GitHub Desktop.
Save liviaerxin/ee6c1548ef739dda87f49b64890784d3 to your computer and use it in GitHub Desktop.
Serving long-running/heavy-computation task via FastAPI and multiprocessing.Process #python #fastapi #ml
"""
Serving long-running/heavy-computation task via FastAPI and multiprocessing.Process
usage:
```sh
uvicorn main:app --reload
```
"""
from fastapi import FastAPI
from multiprocessing import Pool, Process, TimeoutError, Queue
from multiprocessing.pool import AsyncResult
import os
import sys
import asyncio
from typing import List
import queue
app = FastAPI()
q = Queue()
def task(q: Queue, x: int):
import time
import os
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] start task")
time.sleep(x)
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] end task")
q.put({"pid": os.getpid(), "completed": True})
def callback(result):
import os
import json
with open("test.txt", "w") as f:
json.dump(result, f)
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] callback result[{result}]")
async def task_consumer():
global q
while True:
try:
result = q.get_nowait()
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] task_consumer [{result}]")
except queue.Empty:
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] .")
await asyncio.sleep(1)
@app.on_event("startup")
async def startup():
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] app startup")
asyncio.create_task(task_consumer())
@app.get("/")
async def root():
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] root")
return {"message": "Hello World"}
@app.get("/analyze")
async def analyze():
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] analyze")
result = {}
# pool = Pool(processes=1)
# pool.apply_async(
# task,
# args=(10,),
# )
global q
p = Process(
target=task,
args=(
q,
10,
),
)
p.start()
# res = pool.apply_async(task, args=(10,), callback=callback)
# NOTE: block the main asyncio loop, other requests are not available
# res.get()
# try:
# result = res.get(timeout=-1)
# except TimeoutError:
# sys.stdout.write(".")
# await asyncio.sleep(0.01)
# return {"result": res.ready()}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment