Skip to content

Instantly share code, notes, and snippets.

@nqbao
Last active April 25, 2017 17:13
Show Gist options
  • Save nqbao/5a1d1b212e2e0a527ef59b1560c74a74 to your computer and use it in GitHub Desktop.
Save nqbao/5a1d1b212e2e0a527ef59b1560c74a74 to your computer and use it in GitHub Desktop.
bench_grpc_3.py
#!/usr/bin/env python
import grpc.beta.implementations
import tensorflow_serving.apis.predict_pb2 as predict_pb2
import tensorflow_serving.apis.prediction_service_pb2 as prediction_service_pb2
import random
import numpy as np
import tensorflow as tf
import threading
import time
from timeit import timeit
# from concurrent.futures import ProcessPoolExecutor
class Runner:
def __init__(self, func, runs, concurrent=100, debug=False, outfile=None):
self._condition = threading.Condition()
self._concurrent = concurrent
self._runs = runs
self._func = func
# tracking metrics
self._state = {}
self._sent = 0
self._active = 0
self._finished = 0
self._success = 0
self._error = 0
self._debug = debug
self._outfile = outfile
def spawn(self):
self._started_time = time.time()
for i in range(self._runs):
self.launch(i)
if self._sent % 100 == 0 and self._sent != self._runs and self._finished > 0:
self.print_report()
def print_report(self):
durs = time.time() - self._started_time
if durs > 1:
rps = int(self._finished / durs)
else:
rps = "??"
success_rate = round(100 * self._success / self._finished, 2)
sum_latency = 0
max_latency = 0
for s in self._state.values():
if 'duration' in s:
sum_latency += s['duration']
max_latency = max(s['duration'], max_latency)
avg_latency = 1000 * round(sum_latency / self._finished, 4)
max_latency = 1000 * round(max_latency, 4)
print "Sent %s requests, RPS: %s, Success Rate: %s%%, Avg. Latency: %sms, Max Latency: %sms" % (
self._sent,
rps,
success_rate,
avg_latency,
max_latency
)
def write_latency_file(self):
if self._outfile:
with open(self._outfile, "w+") as f:
f.write("started_time,duration,success\n")
for v in self._state.values():
f.write(
"%s,%s,%s\n" % (
v['start_time'],
round(1000 * v['duration'], 2),
'Y' if v['result'] == 'success' else 'N'
)
)
def launch(self, key):
with self._condition:
if self._active >= self._concurrent:
self._condition.wait()
self._active += 1
self._sent += 1
self._state[key] = {
"start_time": time.time()
}
f = self._func(key)
f._key = key
f.add_done_callback(self._done)
def _done(self, f):
with self._condition:
state = self._state[f._key]
try:
f.result()
self._success += 1
state['result'] = 'success'
except Exception as ex:
if self._debug:
print ex
self._error += 1
state['result'] = 'error'
state['duration'] = time.time() - state['start_time']
self._active -= 1
self._finished += 1
self._condition.notify()
def wait(self):
with self._condition:
while self._finished < self._runs:
self._condition.wait()
self.print_report()
self.write_latency_file()
print("Total time: %ss" % (
round(time.time() - self._started_time, 2)
))
def convert_to_request(inputs, model_name):
request = predict_pb2.PredictRequest()
request.model_spec.name = model_name
for key, value in inputs.iteritems():
value = np.array(value)
if value.dtype == np.float64:
value = value.astype(np.float32)
elif value.dtype == np.int64:
value = value.astype(np.int32)
request.inputs[key].CopyFrom(tf.contrib.util.make_tensor_proto(value, shape=value.shape))
return request
def main():
import argparse
import json
parser = argparse.ArgumentParser(description='TF gRPC Benchmark')
parser.add_argument("--input-file", help="Path to input file (JSON format)", required=True)
parser.add_argument("--model-name", help="Model name", default='default')
parser.add_argument("-n", default=1000, type=int)
parser.add_argument("-c", default=10, type=int)
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", default=8500, type=int)
parser.add_argument("--max-timeout", default=1.0, type=float)
parser.add_argument("--latency-output", type=str, help="Save latency record to a csv file")
ns = parser.parse_args()
with open(ns.input_file, "r") as f:
inputs = json.load(f)
print "Running %s runs against %s:%s" % (
ns.n,
ns.host,
ns.port
)
request_pb2 = convert_to_request(inputs, ns.model_name)
channels = [
grpc.beta.implementations.insecure_channel(ns.host, ns.port) for _ in range(ns.c)
]
def predict(i):
channel = channels[i % len(channels)]
stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
return stub.Predict.future(request_pb2, ns.max_timeout)
def run():
runner = Runner(predict, ns.n, ns.c, outfile=ns.latency_output)
runner.spawn()
runner.wait()
run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment