-
-
Save robertnishihara/c825c54d103a9de488f1d80b2c6a19bd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
@ray.remote | |
class MessageActor(object): | |
def __init__(self): | |
self.messages = [] | |
def add_message(self, message): | |
self.messages.append(message) | |
def get_and_clear_messages(self): | |
messages = self.messages | |
self.messages = [] | |
return messages | |
# Define a remote function which loops around and pushes | |
# messages to the actor. | |
@ray.remote | |
def worker(message_actor, j): | |
for i in range(100): | |
time.sleep(1) | |
message_actor.add_message.remote( | |
"Message {} from worker {}.".format(i, j)) | |
# Create a message actor. | |
message_actor = MessageActor.remote() | |
# Start 3 tasks that push messages to the actor. | |
[worker.remote(message_actor, j) for j in range(3)] | |
# Periodically get the messages and print them. | |
for _ in range(100): | |
new_messages = ray.get(message_actor.get_and_clear_messages.remote()) | |
print("New messages:", new_messages) | |
time.sleep(1) | |
# This script prints something like the following: | |
# New messages: [] | |
# New messages: ['Message 0 from worker 1.', 'Message 0 from worker 0.'] | |
# New messages: ['Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.'] | |
# New messages: ['Message 2 from worker 1.', 'Message 2 from worker 0.', 'Message 2 from worker 2.'] | |
# New messages: ['Message 3 from worker 2.', 'Message 3 from worker 1.', 'Message 3 from worker 0.'] | |
# New messages: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.'] | |
# New messages: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.'] |
Hey, @haoliangjiang interesting question.
Ray changes the way where the codes are executed serially originally
I think that's expected since ray.remote is async, so it doesn't wait until the result of the remote function to be completed. Only the ray.get part is blocking.
If you want to make sure that the results are deterministic, do not clear the messages until the very end, i.e. when all the 3 workers have finished publishing messages.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Great post on towards data science! One silly question about example code. As there is only one actor., why there are several lists popped out instead of one like ['Message 0 from worker 1.', 'Message 0 from worker 0.', 'Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.'....]?
My guess is that the part of periodically getting the messages is in another thread. When this loop starts, the list comprehensive thing is still in process? In this case, Ray changes the way where the codes are executed serially originally. But it raises a problem that how to estimate what results I can achieve at the end?
Thanks for sharing in advance.