Skip to content

Instantly share code, notes, and snippets.

@robertnishihara
Last active April 17, 2023 11:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robertnishihara/c825c54d103a9de488f1d80b2c6a19bd to your computer and use it in GitHub Desktop.
Save robertnishihara/c825c54d103a9de488f1d80b2c6a19bd to your computer and use it in GitHub Desktop.
import time
@ray.remote
class MessageActor(object):
def __init__(self):
self.messages = []
def add_message(self, message):
self.messages.append(message)
def get_and_clear_messages(self):
messages = self.messages
self.messages = []
return messages
# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
for i in range(100):
time.sleep(1)
message_actor.add_message.remote(
"Message {} from worker {}.".format(i, j))
# Create a message actor.
message_actor = MessageActor.remote()
# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]
# Periodically get the messages and print them.
for _ in range(100):
new_messages = ray.get(message_actor.get_and_clear_messages.remote())
print("New messages:", new_messages)
time.sleep(1)
# This script prints something like the following:
# New messages: []
# New messages: ['Message 0 from worker 1.', 'Message 0 from worker 0.']
# New messages: ['Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.']
# New messages: ['Message 2 from worker 1.', 'Message 2 from worker 0.', 'Message 2 from worker 2.']
# New messages: ['Message 3 from worker 2.', 'Message 3 from worker 1.', 'Message 3 from worker 0.']
# New messages: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.']
# New messages: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.']
@haoliangjiang
Copy link

Great post on towards data science! One silly question about example code. As there is only one actor., why there are several lists popped out instead of one like ['Message 0 from worker 1.', 'Message 0 from worker 0.', 'Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.'....]?

My guess is that the part of periodically getting the messages is in another thread. When this loop starts, the list comprehensive thing is still in process? In this case, Ray changes the way where the codes are executed serially originally. But it raises a problem that how to estimate what results I can achieve at the end?

Thanks for sharing in advance.

@yc2984
Copy link

yc2984 commented Apr 17, 2023

Hey, @haoliangjiang interesting question.

Ray changes the way where the codes are executed serially originally

I think that's expected since ray.remote is async, so it doesn't wait until the result of the remote function to be completed. Only the ray.get part is blocking.
If you want to make sure that the results are deterministic, do not clear the messages until the very end, i.e. when all the 3 workers have finished publishing messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment