Last active
January 12, 2020 09:51
-
-
Save ngoodger/54e8ea66a63acb4f1e96c9e52e90d0b6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aiohttp import web | |
import argparse | |
import asyncio | |
from dataclasses import dataclass | |
from io import BytesIO | |
import numpy as np | |
import pyarrow | |
import prometheus_client as pc | |
import time | |
import torch | |
import uvloop | |
@dataclass | |
class InferenceRequest(): | |
future: asyncio.Future | |
data: np.ndarray | |
size: int | |
async def inference_coroutine(app): | |
x = torch.zeros([app["max_batch_size"]] + app["input_shape"], dtype=torch.float32) | |
if app["device"] == "cuda": x = x.pin_memory() # Faster cpu to gpu transfer. | |
inference_request = None | |
enqueued_inference_requests = [] | |
while True: | |
x_slice = x | |
inference_requests = [] | |
# Collect concurrent requests. | |
while True: | |
await asyncio.sleep(0) # Release to asyncio loop. | |
if inference_request: | |
inference_requests.append(inference_request) | |
input_tensor = torch.from_numpy(inference_request.data) | |
x_slice[:inference_request.size] = input_tensor | |
x_slice = x_slice[inference_request.size:] | |
inference_request = None | |
if not app["queue"].empty(): | |
inference_request = app["queue"].get_nowait() | |
batch_size = x_slice.shape[0] | |
if inference_request and (batch_size - inference_request.size < 0): break | |
if not enqueued_inference_requests and inference_requests: break | |
if enqueued_inference_requests and inference_done_event.query(): break | |
# Set future result for each pending coroutine. | |
while len(enqueued_inference_requests) > 0: | |
enqueued_inference_request = enqueued_inference_requests.pop(0) | |
result = y_slice[:enqueued_inference_request.size].to("cpu").numpy() | |
enqueued_inference_request.future.set_result(result) | |
y_slice = y_slice[enqueued_inference_request.size:] | |
# Enqueue all inference requests as parallel batch. | |
if len(inference_requests) > 0: | |
batch_size = x.shape[0] - x_slice.shape[0] | |
y_slice = app["model"](x[:batch_size].to(app["device"])) | |
inference_done_event = torch.cuda.Event() | |
inference_done_event.record() # Check when gpu done. | |
if batch_size == 1: y_slice = y_slice.unsqueeze(0) # Keep batch dimension. | |
enqueued_inference_requests = inference_requests | |
REQUEST_LATENCY = pc.Histogram('latency', 'Request Latency (s)', ['app_name']) | |
REQUEST_COUNTER = pc.Counter('requests_total', 'Total Request Count', ['app_name']) | |
EXAMPLES_COUNTER = pc.Counter('examples_total', 'Total Examples Count', ['app_name']) | |
async def inference_request_handler(request): | |
start_time = time.time() | |
np_array = pyarrow.deserialize(await request.content.read()) | |
inference_request = InferenceRequest(future=request.app["loop"].create_future(), | |
data=np_array, | |
size=np_array.shape[0]) | |
await request.app["queue"].put(inference_request) | |
await inference_request.future | |
result = inference_request.future.result() | |
return_bytes = BytesIO(pyarrow.serialize(result).to_buffer()) | |
response = web.Response(body=return_bytes) | |
REQUEST_LATENCY.labels('inference_server').observe(time.time() - start_time) | |
REQUEST_COUNTER.labels('inference_server').inc() | |
EXAMPLES_COUNTER.labels('inference_server').inc(amount=inference_request.size) | |
return response | |
async def metrics_handler(request): | |
return web.Response(body=pc.generate_latest()) | |
async def start_background_tasks(app): | |
app['inference_coroutine'] = asyncio.create_task(inference_coroutine(app)) | |
async def cleanup_background_tasks(app): | |
app['inference_coroutine'].cancel() | |
await app['inference_coroutine'] | |
def main(): | |
parser = argparse.ArgumentParser(description="Pytorch GPU model serving.") | |
parser.add_argument("--max-batch-size", type=int, required=True, | |
help="Maximum inference batch size.") | |
parser.add_argument("--model-definition", type=str, required=True, | |
help='Python code which sets `model` object') | |
parser.add_argument("--input-shape", type=int, nargs="+", required=True, | |
help="Shape of input not including batch dimension.") | |
my_args = parser.parse_args() | |
uvloop.install() # 2x - 4x faster than standard asyncio event loop. | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
loop = asyncio.get_event_loop() | |
app = web.Application() | |
for k, v in vars(my_args).items(): app[k] = v # Add args to app dict. | |
app["loop"] = loop | |
app["queue"] = asyncio.Queue() | |
app["device"] = "cuda" if torch.cuda.is_available() else "cpu" | |
app.on_startup.append(start_background_tasks) | |
app.on_cleanup.append(cleanup_background_tasks) | |
eval_symbols = {} | |
exec(bytes(app["model_definition"], "utf-8").decode("unicode_escape"), eval_symbols) | |
app["model"] = eval_symbols["model"].to(app["device"]) | |
for param in app["model"].parameters(): param.requires_grad = False | |
app.router.add_post('/', inference_request_handler) | |
app.router.add_get('/metrics', metrics_handler) | |
web.run_app(app, port=8122) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment