Skip to content

Instantly share code, notes, and snippets.

@codemation
Last active December 25, 2020 20:16
Show Gist options
  • Save codemation/a3651cc704480dae34e5ba2dccb91b66 to your computer and use it in GitHub Desktop.
Save codemation/a3651cc704480dae34e5ba2dccb91b66 to your computer and use it in GitHub Desktop.
import asyncio, uuid, time
import subprocess
from fastapi import FastAPI
from easyrpc.server import EasyRpcServer
server = FastAPI()
work_queue = asyncio.Queue()
work_results = {}
rpc_server = EasyRpcServer(server, '/ws/jobs', server_secret='abcd1234')
@rpc_server.origin(namespace='jobs')
async def process_payload(payload, job_type='http'):
"""
loads payload into work_queue with generated id
returns id to requestor for polling
job_types:
- http
- subprocess
"""
# do the processing
request_id = str(uuid.uuid1())
results = f'added payload {payload} to queue with request_id: {request_id}'
await work_queue.put((request_id, job_type, payload))
return {"request_id": request_id}
@rpc_server.origin(namespace='jobs')
async def send_results(request_id, results):
"""
applies results to work_results with request_id
"""
work_results[request_id] = results
@rpc_server.origin(namespace='jobs')
async def get_results(request_id, timeout=5):
"""
applies results to work_results with request_id
"""
start = time.time()
while time.time() - start < 5:
if not request_id in work_results:
await asyncio.sleep(0.5)
continue
return work_results.pop(request_id)
return {"error": f"{request_id} not ready"}
async def worker():
while True:
try:
job = await work_queue.get()
request_id, job_type, payload = job
if job_type == 'http':
results = await async_work_http(payload)
await send_results(request_id, results)
else:
await start_subprocess(request_id, payload)
except asyncio.CancelledError:
break
async def start_subprocess(request_id, payload):
"""
start a subprocess which should use an EasyRpcProxy connection
to 'send_results' of work
"""
p = subprocess.run(['echo', json.dumps(payload), '|', 'python', 'job_subprocess.py', request_id])
async def async_work_http(payload):
# insert asynchronus http call here
return {'results': 'results'}
@server.on_event('startup')
async def startup():
# create n mumber workers
for _ in range(2):
asyncio.create_task(worker())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment