Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created July 20, 2023 16:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mooreniemi/ed5c468d41505ea8bc454648fe3d9b84 to your computer and use it in GitHub Desktop.
Save mooreniemi/ed5c468d41505ea8bc454648fe3d9b84 to your computer and use it in GitHub Desktop.
ray get worker progress and result
import ray
import time
import logging
from threading import Thread
log = logging.getLogger(__name__)
@ray.remote
class Worker:
def __init__(self, id, num):
self.id = id
self.num = num
self.progress = 0
self.done = False
self.result = None
def process(self):
thread = Thread(target=self.process_thread)
thread.start()
def process_thread(self):
for i in range(0, self.num):
# some codes that consume time, and we use time.sleep() to simulate it
time.sleep(1)
self.progress = i
if self.progress == self.num - 1:
self.done = True
self.result = f"Finished {self.id}"
def get_progress(self):
return (self.progress, self.done)
def get_result(self):
return self.result
ray.init()
actors = 3
seconds = 10
log.info(f"Here we generate {actors} actors, each runs {seconds} seconds.")
workers = [Worker.remote(i, seconds) for i in range(0, actors)]
log.info("For each worker, we call the process function to start working")
[worker.process.remote() for worker in workers]
log.info("All workers started..")
log.info("Here we use loop to monitor the progress")
unfinished_workers = list(range(0, actors))
while len(unfinished_workers) > 0:
for e in unfinished_workers:
worker = workers[e]
progress, done = ray.get(worker.get_progress.remote())
log.info("Retrieving Progress for {}-{}".format(e, progress))
if done:
log.info(f"Finished {e}")
unfinished_workers.remove(e)
results = ray.get([worker.get_result.remote() for worker in workers])
log.info(results)
@mooreniemi
Copy link
Author

mooreniemi commented Jul 20, 2023

This gist is an adaptation of this issue's example code. L51 above is of course very brittle, if a worker never finishes this whole process would hang. Would need a time out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment