Skip to content

Instantly share code, notes, and snippets.

@edoakes
Created June 9, 2023 21:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edoakes/1b4c73a32a5bb01d0a264a263be7bb11 to your computer and use it in GitHub Desktop.
Save edoakes/1b4c73a32a5bb01d0a264a263be7bb11 to your computer and use it in GitHub Desktop.
Example of how to use a message passing architecture with Ray Serve.
import asyncio
from ray import serve
@serve.deployment
class MessageConsumer:
def __init__(self, topic: str):
asyncio.get_running_loop().create_task(
self.poll_for_messages(topic)
)
def check_health(self):
pass # TODO: check the health of the task pulling from queue.
async def poll_for_messages(self, topic: str):
client = MessageQueueClient(topic)
while True:
message = await client.get_message()
result = await self.process_message(message)
# Can post the result to a downstream topic to chain logic together.
await client.post_message(downstream_topic, result)
async def process_message(self, message):
pass # TODO: fill in code to do preprocessing/inference/etc.
def build_app(args):
return MessageConsumer.options(
num_replicas=args.get("num_replicas", 1)
).bind(args["topic"])
applications:
- name: message-consumer-topic-topic1
import_path: message_consumer:build_app
args:
topic: topic1
- name: message-consumer-topic-topic2
import_path: message_consumer:build_app
args:
topic: topic2
num_replicas: 2
@edoakes
Copy link
Author

edoakes commented Jun 9, 2023

The example above shows how you can define a set of Ray Serve deployments to process messages from a message queue (minus the relevant business logic).

In the config file, we define two different applications each pulling from a different topic. These can be scaled, updated, etc. independently as any normal Serve application.

An alternative to directly pulling from a message queue would be to have some kind of hook that pulls from the queue and sends the requests to Ray Serve over HTTP instead. That would avoid writing the custom logic to pull from the queue (and associated health checking, performance considerations) at the cost of requiring another component.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment