Created
August 15, 2021 19:02
-
-
Save mwtian/625b9a8bdc907e07e175e4d40c21a4a5 to your computer and use it in GitHub Desktop.
Slow getting / deallocating nested object refs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import time | |
import ray | |
from ray import ray_constants | |
from ray import cloudpickle | |
OBJ_REFS = 500 # number of put_object tasks to run, and number of top-level object refs input to ray.get() | |
EMBEDDED_OBJ_REFS = 2400 # number of embedded object refs per input object | |
REF_HIDING = False # whether to hide embedded object refs via cloudpickle.dumps(ray.put(input_object))) | |
@ray.remote(num_returns=2) | |
def put_object(input_object): | |
obj_idx_to_obj_id = {} | |
for obj_idx in range(EMBEDDED_OBJ_REFS): | |
obj_ref = ray.put( | |
input_object | |
) | |
obj_idx_to_obj_id[obj_idx] = obj_ref \ | |
if not REF_HIDING \ | |
else cloudpickle.dumps(obj_ref) | |
return 1, obj_idx_to_obj_id | |
def run(): | |
cluster_resources = ray.cluster_resources() | |
next_node_idx = 0 | |
node_idx_to_id = {} | |
for resource_name in cluster_resources.keys(): | |
if resource_name.startswith("node"): | |
node_idx_to_id[next_node_idx] = resource_name | |
next_node_idx += 1 | |
print(f"putting {OBJ_REFS} objects...") | |
start = time.perf_counter() | |
input_objects = list(range(OBJ_REFS)) | |
barrier = [] | |
results = [] | |
i = 0 | |
for input_object in input_objects: | |
node_id = node_idx_to_id[i % len(node_idx_to_id)] | |
resources = {node_id: ray_constants.MIN_RESOURCE_GRANULARITY} | |
done, promise = put_object.options(resources=resources).remote( | |
input_object, | |
) | |
barrier.append(done) | |
results.append(promise) | |
i += 1 | |
r = ray.get(barrier) | |
stop_put = time.perf_counter() | |
print("put objects latency: ", stop_put - start) | |
print(f"put objects end time: {datetime.datetime.now()}") | |
print(f"getting {len(results)} task results...") | |
start = time.perf_counter() | |
# uncomment for profiling | |
# prof = cProfile.Profile() | |
# prof = prof.runctx( | |
# "ray.get(results)", | |
# {"ray": ray}, | |
# {"results": results} | |
# ) | |
# prof.dump_stats("/home/ubuntu/output.pstats") | |
r = ray.get(results) | |
stop_get = time.perf_counter() | |
print("get objects latency: ", stop_get - start) | |
print(f"get objects end time: {datetime.datetime.now()}") | |
print(f"got {len(results)} task results") | |
print(f"retrieved all results at: {datetime.datetime.now()}") | |
return stop_get | |
if __name__ == '__main__': | |
print(f"start time: {datetime.datetime.now()}") | |
ray.init() | |
start_e2e = time.perf_counter() | |
stop_get = run() | |
stop_e2e = time.perf_counter() | |
print("dealloc latency: ", stop_e2e - stop_get) | |
print("total latency: ", stop_e2e - start_e2e) | |
print(f"end time: {datetime.datetime.now()}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment