Skip to content

Instantly share code, notes, and snippets.

@ngoodger
Last active January 12, 2020 09:51
Show Gist options
  • Save ngoodger/54e8ea66a63acb4f1e96c9e52e90d0b6 to your computer and use it in GitHub Desktop.
Save ngoodger/54e8ea66a63acb4f1e96c9e52e90d0b6 to your computer and use it in GitHub Desktop.
from aiohttp import web
import argparse
import asyncio
from dataclasses import dataclass
from io import BytesIO
import numpy as np
import pyarrow
import prometheus_client as pc
import time
import torch
import uvloop
@dataclass
class InferenceRequest():
future: asyncio.Future
data: np.ndarray
size: int
async def inference_coroutine(app):
x = torch.zeros([app["max_batch_size"]] + app["input_shape"], dtype=torch.float32)
if app["device"] == "cuda": x = x.pin_memory() # Faster cpu to gpu transfer.
inference_request = None
enqueued_inference_requests = []
while True:
x_slice = x
inference_requests = []
# Collect concurrent requests.
while True:
await asyncio.sleep(0) # Release to asyncio loop.
if inference_request:
inference_requests.append(inference_request)
input_tensor = torch.from_numpy(inference_request.data)
x_slice[:inference_request.size] = input_tensor
x_slice = x_slice[inference_request.size:]
inference_request = None
if not app["queue"].empty():
inference_request = app["queue"].get_nowait()
batch_size = x_slice.shape[0]
if inference_request and (batch_size - inference_request.size < 0): break
if not enqueued_inference_requests and inference_requests: break
if enqueued_inference_requests and inference_done_event.query(): break
# Set future result for each pending coroutine.
while len(enqueued_inference_requests) > 0:
enqueued_inference_request = enqueued_inference_requests.pop(0)
result = y_slice[:enqueued_inference_request.size].to("cpu").numpy()
enqueued_inference_request.future.set_result(result)
y_slice = y_slice[enqueued_inference_request.size:]
# Enqueue all inference requests as parallel batch.
if len(inference_requests) > 0:
batch_size = x.shape[0] - x_slice.shape[0]
y_slice = app["model"](x[:batch_size].to(app["device"]))
inference_done_event = torch.cuda.Event()
inference_done_event.record() # Check when gpu done.
if batch_size == 1: y_slice = y_slice.unsqueeze(0) # Keep batch dimension.
enqueued_inference_requests = inference_requests
REQUEST_LATENCY = pc.Histogram('latency', 'Request Latency (s)', ['app_name'])
REQUEST_COUNTER = pc.Counter('requests_total', 'Total Request Count', ['app_name'])
EXAMPLES_COUNTER = pc.Counter('examples_total', 'Total Examples Count', ['app_name'])
async def inference_request_handler(request):
start_time = time.time()
np_array = pyarrow.deserialize(await request.content.read())
inference_request = InferenceRequest(future=request.app["loop"].create_future(),
data=np_array,
size=np_array.shape[0])
await request.app["queue"].put(inference_request)
await inference_request.future
result = inference_request.future.result()
return_bytes = BytesIO(pyarrow.serialize(result).to_buffer())
response = web.Response(body=return_bytes)
REQUEST_LATENCY.labels('inference_server').observe(time.time() - start_time)
REQUEST_COUNTER.labels('inference_server').inc()
EXAMPLES_COUNTER.labels('inference_server').inc(amount=inference_request.size)
return response
async def metrics_handler(request):
return web.Response(body=pc.generate_latest())
async def start_background_tasks(app):
app['inference_coroutine'] = asyncio.create_task(inference_coroutine(app))
async def cleanup_background_tasks(app):
app['inference_coroutine'].cancel()
await app['inference_coroutine']
def main():
parser = argparse.ArgumentParser(description="Pytorch GPU model serving.")
parser.add_argument("--max-batch-size", type=int, required=True,
help="Maximum inference batch size.")
parser.add_argument("--model-definition", type=str, required=True,
help='Python code which sets `model` object')
parser.add_argument("--input-shape", type=int, nargs="+", required=True,
help="Shape of input not including batch dimension.")
my_args = parser.parse_args()
uvloop.install() # 2x - 4x faster than standard asyncio event loop.
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
app = web.Application()
for k, v in vars(my_args).items(): app[k] = v # Add args to app dict.
app["loop"] = loop
app["queue"] = asyncio.Queue()
app["device"] = "cuda" if torch.cuda.is_available() else "cpu"
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
eval_symbols = {}
exec(bytes(app["model_definition"], "utf-8").decode("unicode_escape"), eval_symbols)
app["model"] = eval_symbols["model"].to(app["device"])
for param in app["model"].parameters(): param.requires_grad = False
app.router.add_post('/', inference_request_handler)
app.router.add_get('/metrics', metrics_handler)
web.run_app(app, port=8122)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment