Skip to content

Instantly share code, notes, and snippets.

@architkulkarni
Last active June 1, 2023 19:16
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save architkulkarni/b08a7733157c8bb0b6a7c854064974a7 to your computer and use it in GitHub Desktop.
Save architkulkarni/b08a7733157c8bb0b6a7c854064974a7 to your computer and use it in GitHub Desktop.

job-queue-hackathon

March 2021 Ray job queue hackathon project

Running demo

  • Dependencies: fastapi, aio_pika
  • Install RabbitMQ with sudo apt-get install rabbitmq-server and runsudo service rabbitmq-server start to start the RabbitMQ server
  • python worker_async.py to start the RabbitMQ worker
  • python main.py to start FastAPI
import time
def sleep_fn(sleep_time: int):
time.sleep(sleep_time)
print(f"Finished sleeping for {sleep_time}")
import sys
import asyncio
from aio_pika import connect, Message, DeliveryMode
import pickle
import defs
async def submit_helper(func, args, loop):
# args is a list of arguments
# Usage: submit(defs.my_func, args)
# my_func needs to be imported from another file defs.py, see
# https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror.
# Perform connection
connection = await connect("amqp://guest:guest@localhost/", loop=loop)
# Creating a channel
channel = await connection.channel()
message_body = pickle.dumps([func, args])
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT
)
# Sending the message
await channel.default_exchange.publish(
message, routing_key="task_queue"
)
await connection.close()
def submit_job(func, args):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(submit_helper(func, args, loop))
import asyncio
import time
from fastapi import FastAPI
import ray
import uvicorn
from defs import sleep_fn
from job_queue import submit_job
app = FastAPI()
ray.init(address="auto", dashboard_host="0.0.0.0", ignore_reinit_error=True)
@app.get("/demo_endpoint/{sleep_time}")
def demo_endpoint(sleep_time: int):
submit_job(sleep_fn, sleep_time)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5000)
import asyncio
from aio_pika import connect, IncomingMessage
import pickle
import ray
import time
loop = asyncio.get_event_loop()
ray.init()
@ray.remote(num_cpus=2)
def wrapper(pickled_fn):
[func, args] = pickle.loads(pickled_fn)
func(args)
async def on_message(message: IncomingMessage):
await wrapper.remote(message.body)
message.ack()
async def main():
# Perform connection
connection = await connect("amqp://guest:guest@localhost/", loop=loop)
# Creating a channel
channel = await connection.channel()
await channel.set_qos()
# Declaring queue
queue = await channel.declare_queue(
"task_queue",
durable=True
)
# Start listening the queue with name 'task_queue'
await queue.consume(on_message)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment