Skip to content

Instantly share code, notes, and snippets.

@edoakes
Last active October 22, 2020 19:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edoakes/753f5989efdf290fc7498004f705d756 to your computer and use it in GitHub Desktop.
Save edoakes/753f5989efdf290fc7498004f705d756 to your computer and use it in GitHub Desktop.
Ray Serve by reference vs. pass by value experiment
import time
import numpy as np
import ray
ray.init(log_to_driver=False)
@ray.remote
class Actor:
def __init__(self, next_actor, by_ref):
self.next_actor = next_actor
self.by_ref = by_ref
async def f(self, data):
if self.next_actor is None:
if self.by_ref:
ray.get(data[0])
return
return await self.next_actor.f.remote(data)
@ray.remote
class Driver:
def __init__(self, by_ref, data_size):
self.by_ref = by_ref
self.data = ray.put(np.zeros(data_size, dtype=np.uint8))
self.last = Actor.remote(None, by_ref)
self.middle = Actor.remote(self.last, by_ref)
self.first = Actor.remote(self.middle, by_ref)
async def run(self):
if self.by_ref:
await self.first.f.remote([self.data])
else:
await self.first.f.remote(self.data)
def run_condition(by_ref, data_size, batch_size):
d = Driver.remote(by_ref, data_size)
stats = []
for i in range(15):
start = time.time()
ray.get([d.run.remote() for _ in range(batch_size)])
if i >= 5:
stats.append(batch_size / (time.time() - start))
return f"{np.mean(stats)} tasks/s +- {round(np.std(stats), 2)}"
by_ref_small_single = run_condition(True, 1, 1)
print(f"by_ref_small_single: {by_ref_small_single}")
by_val_small_single = run_condition(False, 1, 1)
print(f"by_val_small_single: {by_val_small_single}")
by_ref_large_single = run_condition(True, 1*1024*1024, 1)
print(f"by_ref_large_single: {by_ref_large_single}")
by_val_large_single = run_condition(False, 1*1024*1024, 1)
print(f"by_val_large_single: {by_val_large_single}")
by_ref_small_batch = run_condition(True, 1, 1000)
print(f"by_ref_small_batch: {by_ref_small_batch}")
by_val_small_batch = run_condition(False, 1, 1000)
print(f"by_val_small_batch: {by_val_small_batch}")
by_ref_large_batch = run_condition(True, 1*1024*1024, 1000)
print(f"by_ref_large_batch: {by_ref_large_batch}")
by_val_large_batch = run_condition(False, 1*1024*1024, 1000)
print(f"by_val_large_batch: {by_val_large_batch}")
import time
import numpy as np
import ray
ray.init(log_to_driver=False)
@ray.remote
class Actor:
def __init__(self, next_actor, by_ref, data_size):
self.next_actor = next_actor
self.by_ref = by_ref
self.data_size = data_size
async def f(self):
if self.next_actor is None:
return np.zeros(self.data_size, dtype=np.uint8)
elif self.by_ref:
return self.next_actor.f.remote()
else:
return await self.next_actor.f.remote()
@ray.remote
class Driver:
def __init__(self, by_ref, data_size):
self.last = Actor.remote(None, by_ref, data_size)
self.middle = Actor.remote(self.last, by_ref, data_size)
self.first = Actor.remote(self.middle, by_ref, data_size)
async def run(self):
await self.first.f.remote()
def run_condition(by_ref, data_size, batch_size):
d = Driver.remote(by_ref, data_size)
stats = []
for i in range(15):
start = time.time()
ray.get([d.run.remote() for _ in range(batch_size)])
if i >= 5:
stats.append(batch_size / (time.time() - start))
return f"{np.mean(stats)} tasks/s +- {round(np.std(stats), 2)}"
by_ref_small_single = run_condition(True, 1, 1)
print(f"by_ref_small_single: {by_ref_small_single}")
by_val_small_single = run_condition(False, 1, 1)
print(f"by_val_small_single: {by_val_small_single}")
by_ref_large_single = run_condition(True, 1*1024*1024, 1)
print(f"by_ref_large_single: {by_ref_large_single}")
by_val_large_single = run_condition(False, 1*1024*1024, 1)
print(f"by_val_large_single: {by_val_large_single}")
by_ref_small_batch = run_condition(True, 1, 1000)
print(f"by_ref_small_batch: {by_ref_small_batch}")
by_val_small_batch = run_condition(False, 1, 1000)
print(f"by_val_small_batch: {by_val_small_batch}")
by_ref_large_batch = run_condition(True, 1*1024*1024, 1000)
print(f"by_ref_large_batch: {by_ref_large_batch}")
by_val_large_batch = run_condition(False, 1*1024*1024, 1000)
print(f"by_val_large_batch: {by_val_large_batch}")
@edoakes
Copy link
Author

edoakes commented Oct 22, 2020

code/ray % python return-by-ref.py 
2020-10-22 14:54:21,365 INFO services.py:1090 -- View the Ray dashboard at http://127.0.0.1:8265
by_ref_small_single: 723.825242872019 tasks/s +- 18.96
by_val_small_single: 343.4190377994986 tasks/s +- 51.95
by_ref_large_single: 599.1383351240366 tasks/s +- 52.51
by_val_large_single: 159.4893027072748 tasks/s +- 8.46
by_ref_small_batch: 1611.8728186864605 tasks/s +- 146.8
by_val_small_batch: 2057.50264882852 tasks/s +- 177.93
by_ref_large_batch: 1560.0818102735507 tasks/s +- 77.81
by_val_large_batch: 910.9845048366773 tasks/s +- 36.67

@edoakes
Copy link
Author

edoakes commented Oct 22, 2020

code/ray % python pass-by-ref.py  
2020-10-22 14:55:24,766 INFO services.py:1090 -- View the Ray dashboard at http://127.0.0.1:8265
by_ref_small_single: 294.7036678965033 tasks/s +- 15.0
by_val_small_single: 309.26607193172833 tasks/s +- 14.66
by_ref_large_single: 290.3758920879603 tasks/s +- 7.19
by_val_large_single: 161.09007000010388 tasks/s +- 11.3
by_ref_small_batch: 1545.2030714606449 tasks/s +- 87.55
by_val_small_batch: 1585.4281270688184 tasks/s +- 39.46
by_ref_large_batch: 1561.6375755209374 tasks/s +- 59.22
by_val_large_batch: 1046.531351491924 tasks/s +- 28.23

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment