Last active
April 25, 2017 17:13
-
-
Save nqbao/5a1d1b212e2e0a527ef59b1560c74a74 to your computer and use it in GitHub Desktop.
bench_grpc_3.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import grpc.beta.implementations | |
import tensorflow_serving.apis.predict_pb2 as predict_pb2 | |
import tensorflow_serving.apis.prediction_service_pb2 as prediction_service_pb2 | |
import random | |
import numpy as np | |
import tensorflow as tf | |
import threading | |
import time | |
from timeit import timeit | |
# from concurrent.futures import ProcessPoolExecutor | |
class Runner: | |
def __init__(self, func, runs, concurrent=100, debug=False, outfile=None): | |
self._condition = threading.Condition() | |
self._concurrent = concurrent | |
self._runs = runs | |
self._func = func | |
# tracking metrics | |
self._state = {} | |
self._sent = 0 | |
self._active = 0 | |
self._finished = 0 | |
self._success = 0 | |
self._error = 0 | |
self._debug = debug | |
self._outfile = outfile | |
def spawn(self): | |
self._started_time = time.time() | |
for i in range(self._runs): | |
self.launch(i) | |
if self._sent % 100 == 0 and self._sent != self._runs and self._finished > 0: | |
self.print_report() | |
def print_report(self): | |
durs = time.time() - self._started_time | |
if durs > 1: | |
rps = int(self._finished / durs) | |
else: | |
rps = "??" | |
success_rate = round(100 * self._success / self._finished, 2) | |
sum_latency = 0 | |
max_latency = 0 | |
for s in self._state.values(): | |
if 'duration' in s: | |
sum_latency += s['duration'] | |
max_latency = max(s['duration'], max_latency) | |
avg_latency = 1000 * round(sum_latency / self._finished, 4) | |
max_latency = 1000 * round(max_latency, 4) | |
print "Sent %s requests, RPS: %s, Success Rate: %s%%, Avg. Latency: %sms, Max Latency: %sms" % ( | |
self._sent, | |
rps, | |
success_rate, | |
avg_latency, | |
max_latency | |
) | |
def write_latency_file(self): | |
if self._outfile: | |
with open(self._outfile, "w+") as f: | |
f.write("started_time,duration,success\n") | |
for v in self._state.values(): | |
f.write( | |
"%s,%s,%s\n" % ( | |
v['start_time'], | |
round(1000 * v['duration'], 2), | |
'Y' if v['result'] == 'success' else 'N' | |
) | |
) | |
def launch(self, key): | |
with self._condition: | |
if self._active >= self._concurrent: | |
self._condition.wait() | |
self._active += 1 | |
self._sent += 1 | |
self._state[key] = { | |
"start_time": time.time() | |
} | |
f = self._func(key) | |
f._key = key | |
f.add_done_callback(self._done) | |
def _done(self, f): | |
with self._condition: | |
state = self._state[f._key] | |
try: | |
f.result() | |
self._success += 1 | |
state['result'] = 'success' | |
except Exception as ex: | |
if self._debug: | |
print ex | |
self._error += 1 | |
state['result'] = 'error' | |
state['duration'] = time.time() - state['start_time'] | |
self._active -= 1 | |
self._finished += 1 | |
self._condition.notify() | |
def wait(self): | |
with self._condition: | |
while self._finished < self._runs: | |
self._condition.wait() | |
self.print_report() | |
self.write_latency_file() | |
print("Total time: %ss" % ( | |
round(time.time() - self._started_time, 2) | |
)) | |
def convert_to_request(inputs, model_name): | |
request = predict_pb2.PredictRequest() | |
request.model_spec.name = model_name | |
for key, value in inputs.iteritems(): | |
value = np.array(value) | |
if value.dtype == np.float64: | |
value = value.astype(np.float32) | |
elif value.dtype == np.int64: | |
value = value.astype(np.int32) | |
request.inputs[key].CopyFrom(tf.contrib.util.make_tensor_proto(value, shape=value.shape)) | |
return request | |
def main(): | |
import argparse | |
import json | |
parser = argparse.ArgumentParser(description='TF gRPC Benchmark') | |
parser.add_argument("--input-file", help="Path to input file (JSON format)", required=True) | |
parser.add_argument("--model-name", help="Model name", default='default') | |
parser.add_argument("-n", default=1000, type=int) | |
parser.add_argument("-c", default=10, type=int) | |
parser.add_argument("--host", default="localhost") | |
parser.add_argument("--port", default=8500, type=int) | |
parser.add_argument("--max-timeout", default=1.0, type=float) | |
parser.add_argument("--latency-output", type=str, help="Save latency record to a csv file") | |
ns = parser.parse_args() | |
with open(ns.input_file, "r") as f: | |
inputs = json.load(f) | |
print "Running %s runs against %s:%s" % ( | |
ns.n, | |
ns.host, | |
ns.port | |
) | |
request_pb2 = convert_to_request(inputs, ns.model_name) | |
channels = [ | |
grpc.beta.implementations.insecure_channel(ns.host, ns.port) for _ in range(ns.c) | |
] | |
def predict(i): | |
channel = channels[i % len(channels)] | |
stub = prediction_service_pb2.beta_create_PredictionService_stub(channel) | |
return stub.Predict.future(request_pb2, ns.max_timeout) | |
def run(): | |
runner = Runner(predict, ns.n, ns.c, outfile=ns.latency_output) | |
runner.spawn() | |
runner.wait() | |
run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment