Skip to content

Instantly share code, notes, and snippets.

@ericl
Created May 22, 2020 21:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ericl/a19cd7d8a7eb32cddefe64b665f48dcc to your computer and use it in GitHub Desktop.
Save ericl/a19cd7d8a7eb32cddefe64b665f48dcc to your computer and use it in GitHub Desktop.
import time
import os
import random
import numpy as np
import ray
FAST = "DRY_RUN" in os.environ
if FAST:
BY_VALUE = False
N_FRONTENDS = 2
N_WORKER = 2
K_FANOUT = 4
N_Q = 100
F_SIZE = 10 * 1024
else:
BY_VALUE = False
N_FRONTENDS = 16
N_WORKER = 128
K_FANOUT = 100
N_Q = 100
F_SIZE = 1 * 1024 * 1024
@ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1)
class Worker:
def __init__(self):
# Simulate a linear model's weight matrix.
self.W = np.ones([F_SIZE, 1], dtype=np.uint8)
def process(self, features):
# Simulate a fast classification decision (this takes ~2ms to run).
if random.random() > 0.999:
print("Simulating worker failure")
os._exit(1)
return np.sum(np.matmul(features, self.W)) > 0
def ping(self):
return "ok"
@ray.remote(num_cpus=1)
class Frontend:
def __init__(self, workers):
self.workers = workers
# Simulate a large memcache database of feature vectors.
self.database = np.ones([10000, F_SIZE], dtype=np.uint8)
def process(self, fanout=K_FANOUT):
user_id = random.randint(0, 1000)
value = self.database[user_id]
if BY_VALUE:
obj = value
else:
obj = ray.put(value)
for _ in range(fanout):
worker = random.choice(self.workers)
worker.process.remote(obj)
return "ok"
def await_complete(self):
ray.get([w.ping.remote() for w in self.workers])
return "ok"
def ping(self):
return "ok"
def execute_workload(frontends, num_q):
resp = []
for _ in range(num_q):
f = random.choice(frontends)
resp.append(f.process.remote())
print("waiting for completion")
ray.get([f.await_complete.remote() for f in frontends])
if __name__ == "__main__":
if FAST:
ray.init()
else:
ray.init(address="auto")
print("Creating workers", N_WORKER)
workers = [Worker.remote() for _ in range(N_WORKER)]
ray.get([w.ping.remote() for w in workers])
print("Creating frontends", N_FRONTENDS)
frontends = [Frontend.remote(workers) for _ in range(N_FRONTENDS)]
ray.get([w.ping.remote() for w in frontends])
print("Starting query sets of size", N_Q,
"by value", BY_VALUE, "request size", F_SIZE)
window = []
for _ in range(15):
start = time.time()
execute_workload(frontends, N_Q)
score = N_Q / (time.time() - start)
window.append(score)
if len(window) > 10:
window.pop(0)
print("queries per second", np.mean(window), "+-", np.std(window))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment