Skip to content

Instantly share code, notes, and snippets.

@mwtian
Last active August 29, 2021 06:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwtian/140b09ed50fc7f0de172344194b2e7c4 to your computer and use it in GitHub Desktop.
Save mwtian/140b09ed50fc7f0de172344194b2e7c4 to your computer and use it in GitHub Desktop.
Shuffling object ref with actor
import datetime
import random
import time
import ray
from ray import ray_constants
from ray import cloudpickle
from collections import defaultdict
ray.init(address="auto")
OBJ_REFS = 1000 # number of put_object tasks to run, and number of top-level object refs input to ray.get()
EMBEDDED_OBJ_REFS = 600 # number of embedded object refs per input object
def run():
cluster_resources = ray.cluster_resources()
next_node_idx = 0
node_idx_to_id = {}
for resource_name in cluster_resources.keys():
if resource_name.startswith("node:"):
node_idx_to_id[next_node_idx] = resource_name
next_node_idx += 1
map_actors = []
put_tasks_pending = []
i = 0
start = time.perf_counter()
for _ in range(OBJ_REFS):
node_id = node_idx_to_id[i % len(node_idx_to_id)]
resources = {node_id: ray_constants.MIN_RESOURCE_GRANULARITY}
map_actor = Map \
.options(resources=resources) \
.remote(f"mapper {i}")
map_actors.append(map_actor)
put_tasks_pending.append(map_actor.put.remote())
i += 1
print(f"getting {len(put_tasks_pending)} put task results...")
group_ids_list = ray.get(put_tasks_pending)
print(f"got {len(group_ids_list)} put task results")
print(f"retrieved all put task results at: {datetime.datetime.now()}")
stop = time.perf_counter()
print("total put tasks time: ", stop - start)
# gather all object refs for the same group ID
group_id_to_map_actors = defaultdict(list)
for actor, group_ids in zip(map_actors, group_ids_list):
for group_id in group_ids:
group_id_to_map_actors[group_id].append(actor)
reduce_tasks_pending = []
i = 0
start = time.perf_counter()
for group_id, actors in group_id_to_map_actors.items():
node_id = node_idx_to_id[i % len(node_idx_to_id)]
resources = {node_id: ray_constants.MIN_RESOURCE_GRANULARITY}
promise = reduce_group\
.options(resources=resources)\
.remote(group_id, actors)
reduce_tasks_pending.append(promise)
i += 1
print(f"getting {len(reduce_tasks_pending)} reduce tasks...")
ray.get(reduce_tasks_pending)
print(f"got all reduce tasks at: {datetime.datetime.now()}")
stop = time.perf_counter()
print("total reduce task time: ", stop - start)
@ray.remote
class Map:
def __init__(self, name):
self.name = name
def put(self):
start = time.perf_counter()
pseudorandom_indices = list(range(EMBEDDED_OBJ_REFS))
random.shuffle(pseudorandom_indices)
self.group_id_to_obj_ref = {}
group_ids = set()
for group_id in pseudorandom_indices:
obj_ref = ray.put(
f"ref to data, group {group_id} from {self.name}")
self.group_id_to_obj_ref[group_id] = obj_ref
group_ids.add(group_id)
stop = time.perf_counter()
print(f"put {EMBEDDED_OBJ_REFS} objects latency: ", stop - start)
print(f"put object end time: {datetime.datetime.now()}")
return group_ids
def get(self, group_id):
return self.group_id_to_obj_ref[group_id]
@ray.remote
def reduce_group(group_id, actors):
start = time.perf_counter()
output_objects = ray.get(ray.get(
[actor.get.remote(group_id) for actor in actors]))
stop = time.perf_counter()
assert(len(output_objects) == OBJ_REFS)
assert(all(f"group {group_id}" in output for output in output_objects))
print(f"get {len(output_objects)} object refs latency: ", stop - start)
print(f"get object end time: {datetime.datetime.now()}")
if __name__ == '__main__':
print(f"start time: {datetime.datetime.now()}")
start_e2e = time.perf_counter()
run()
stop_e2e = time.perf_counter()
print("total latency: ", stop_e2e - start_e2e)
print(f"end time: {datetime.datetime.now()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment