Skip to content

Instantly share code, notes, and snippets.

@simon-mo
Created December 23, 2022 20:02
Show Gist options
  • Save simon-mo/98f95f24c3d54463405586a8d7ff2992 to your computer and use it in GitHub Desktop.
Save simon-mo/98f95f24c3d54463405586a8d7ff2992 to your computer and use it in GitHub Desktop.

Simple in memory job queue in Ray Serve

serve run queue_proxy:app
python client.py

(base) ➜  tmp python client.py
['0c675328-33a7-4adc-9531-4b8de7e01d51', 'a2d8f800-0ee8-49f1-90eb-8a690f038a0c', '88f3c26e-2202-4829-a391-85c89bae1611', 'a611f982-6076-4041-ae8c-43ab20e8abf0', '77163304-510d-4314-b094-6f48e6313fc5']
0c675328-33a7-4adc-9531-4b8de7e01d51 False
a2d8f800-0ee8-49f1-90eb-8a690f038a0c False
88f3c26e-2202-4829-a391-85c89bae1611 False
a611f982-6076-4041-ae8c-43ab20e8abf0 False
77163304-510d-4314-b094-6f48e6313fc5 False

0c675328-33a7-4adc-9531-4b8de7e01d51 True
a2d8f800-0ee8-49f1-90eb-8a690f038a0c False
88f3c26e-2202-4829-a391-85c89bae1611 False
a611f982-6076-4041-ae8c-43ab20e8abf0 False
77163304-510d-4314-b094-6f48e6313fc5 False

0c675328-33a7-4adc-9531-4b8de7e01d51 True
a2d8f800-0ee8-49f1-90eb-8a690f038a0c True
88f3c26e-2202-4829-a391-85c89bae1611 False
a611f982-6076-4041-ae8c-43ab20e8abf0 False
77163304-510d-4314-b094-6f48e6313fc5 False

0c675328-33a7-4adc-9531-4b8de7e01d51 True
a2d8f800-0ee8-49f1-90eb-8a690f038a0c True
88f3c26e-2202-4829-a391-85c89bae1611 True
a611f982-6076-4041-ae8c-43ab20e8abf0 False
77163304-510d-4314-b094-6f48e6313fc5 False

0c675328-33a7-4adc-9531-4b8de7e01d51 True
a2d8f800-0ee8-49f1-90eb-8a690f038a0c True
88f3c26e-2202-4829-a391-85c89bae1611 True
a611f982-6076-4041-ae8c-43ab20e8abf0 True
77163304-510d-4314-b094-6f48e6313fc5 False

0c675328-33a7-4adc-9531-4b8de7e01d51 True
a2d8f800-0ee8-49f1-90eb-8a690f038a0c True
88f3c26e-2202-4829-a391-85c89bae1611 True
a611f982-6076-4041-ae8c-43ab20e8abf0 True
77163304-510d-4314-b094-6f48e6313fc5 True
import requests
import random
import time
NUM_JOBS = 5
ids = []
for _ in range(NUM_JOBS):
resp = requests.post("http://localhost:8000/submit", json={"arg": str(random.random())})
ids.append(resp.json())
print(ids)
num_done = 0
while num_done < NUM_JOBS:
num_done = 0
for i in ids:
resp = requests.get("http://localhost:8000/check", params={"job_id": i})
print(i, resp.json())
if resp.json():
num_done += 1
print()
time.sleep(2)
from ray import serve
import time
from fastapi import FastAPI
import asyncio
from uuid import uuid4
import pydantic
from ray.serve.drivers import DAGDriver
@serve.deployment(
max_concurrent_queries=2,
autoscaling_config={"min_replicas": 1, "max_replicas": 2}
)
class TrainingWorker:
def __init__(self, init_arg):
self.arg = init_arg
def __call__(self, arg):
print("working on", arg)
time.sleep(2)
app = FastAPI()
class JobSpec(pydantic.BaseModel):
arg: str
@serve.deployment
@serve.ingress(app)
class APIServer:
def __init__(self, worker_pool_handle):
# TODO: for persistence, use ray.storage API
# A dictionary of job_id -> async_submission_task
# The ordering is generally FIFO, but no strict guarantee.
self.pending_jobs = dict()
self.worker_pool_handle = worker_pool_handle
async def run_one_job(self, spec: JobSpec):
# TODO: you can do more fine-grained status tracking in here.
# submit to worker
ref = await self.worker_pool_handle.remote(spec)
# wait for training
# TODO: exception handling
result = await ref
@app.post("/submit")
async def accept_job(self, spec: JobSpec):
job_id = str(uuid4())
self.pending_jobs[job_id] = asyncio.get_event_loop().create_task(self.run_one_job(spec))
return job_id
@app.get("/check")
async def status(self, job_id:str) -> bool:
#TODO: handle missing id
task: asyncio.Task = self.pending_jobs[job_id]
#TODO: return structured output, currently just true or false
return task.done()
app = APIServer.bind(TrainingWorker.bind(init_arg=1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment