Skip to content

Instantly share code, notes, and snippets.

@mwtian
Created August 15, 2021 19:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwtian/625b9a8bdc907e07e175e4d40c21a4a5 to your computer and use it in GitHub Desktop.
Save mwtian/625b9a8bdc907e07e175e4d40c21a4a5 to your computer and use it in GitHub Desktop.
Slow getting / deallocating nested object refs
import datetime
import time
import ray
from ray import ray_constants
from ray import cloudpickle
OBJ_REFS = 500 # number of put_object tasks to run, and number of top-level object refs input to ray.get()
EMBEDDED_OBJ_REFS = 2400 # number of embedded object refs per input object
REF_HIDING = False # whether to hide embedded object refs via cloudpickle.dumps(ray.put(input_object)))
@ray.remote(num_returns=2)
def put_object(input_object):
obj_idx_to_obj_id = {}
for obj_idx in range(EMBEDDED_OBJ_REFS):
obj_ref = ray.put(
input_object
)
obj_idx_to_obj_id[obj_idx] = obj_ref \
if not REF_HIDING \
else cloudpickle.dumps(obj_ref)
return 1, obj_idx_to_obj_id
def run():
cluster_resources = ray.cluster_resources()
next_node_idx = 0
node_idx_to_id = {}
for resource_name in cluster_resources.keys():
if resource_name.startswith("node"):
node_idx_to_id[next_node_idx] = resource_name
next_node_idx += 1
print(f"putting {OBJ_REFS} objects...")
start = time.perf_counter()
input_objects = list(range(OBJ_REFS))
barrier = []
results = []
i = 0
for input_object in input_objects:
node_id = node_idx_to_id[i % len(node_idx_to_id)]
resources = {node_id: ray_constants.MIN_RESOURCE_GRANULARITY}
done, promise = put_object.options(resources=resources).remote(
input_object,
)
barrier.append(done)
results.append(promise)
i += 1
r = ray.get(barrier)
stop_put = time.perf_counter()
print("put objects latency: ", stop_put - start)
print(f"put objects end time: {datetime.datetime.now()}")
print(f"getting {len(results)} task results...")
start = time.perf_counter()
# uncomment for profiling
# prof = cProfile.Profile()
# prof = prof.runctx(
# "ray.get(results)",
# {"ray": ray},
# {"results": results}
# )
# prof.dump_stats("/home/ubuntu/output.pstats")
r = ray.get(results)
stop_get = time.perf_counter()
print("get objects latency: ", stop_get - start)
print(f"get objects end time: {datetime.datetime.now()}")
print(f"got {len(results)} task results")
print(f"retrieved all results at: {datetime.datetime.now()}")
return stop_get
if __name__ == '__main__':
print(f"start time: {datetime.datetime.now()}")
ray.init()
start_e2e = time.perf_counter()
stop_get = run()
stop_e2e = time.perf_counter()
print("dealloc latency: ", stop_e2e - stop_get)
print("total latency: ", stop_e2e - start_e2e)
print(f"end time: {datetime.datetime.now()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment