Skip to content

Instantly share code, notes, and snippets.

@decebal
Forked from mmellison/grpc_asyncio.py
Created February 15, 2018 13:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save decebal/0c56cf9cd8d3dd97cc150ae5b7291aad to your computer and use it in GitHub Desktop.
Save decebal/0c56cf9cd8d3dd97cc150ae5b7291aad to your computer and use it in GitHub Desktop.
gRPC Servicer with Asyncio (Python 3.6+)
import asyncio
from concurrent import futures
import functools
import inspect
import threading
from grpc import _server
def _loop_mgr(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()
# If we reach here, the loop was stopped.
# We should gather any remaining tasks and finish them.
pending = asyncio.Task.all_tasks(loop=loop)
if pending:
loop.run_until_complete(asyncio.gather(*pending))
class AsyncioExecutor(futures.Executor):
def __init__(self, *, loop=None):
super().__init__()
self._shutdown = False
self._loop = loop or asyncio.get_event_loop()
self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,),
daemon=True)
self._thread.start()
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('Cannot schedule new futures after shutdown')
if not self._loop.is_running():
raise RuntimeError("Loop must be started before any function can "
"be submitted")
if inspect.iscoroutinefunction(fn):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self._loop)
else:
func = functools.partial(fn, *args, **kwargs)
return self._loop.run_in_executor(None, func)
def shutdown(self, wait=True):
self._loop.stop()
self._shutdown = True
if wait:
self._thread.join()
# --------------------------------------------------------------------------- #
async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
context = _server._Context(rpc_event, state, request_deserializer)
try:
return await behavior(argument, context), True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception calling application: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _take_response_from_response_iterator(rpc_event, state, response_iterator):
try:
return await response_iterator.__anext__(), True
except StopAsyncIteration:
return None, True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
response, proceed = await _call_behavior(rpc_event, state, behavior,
argument, request_deserializer)
if proceed:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_server._status(rpc_event, state, serialized_response)
async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
# Notice this calls the normal `_call_behavior` not the awaitable version.
response_iterator, proceed = _server._call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
if proceed:
while True:
response, proceed = await _take_response_from_response_iterator(
rpc_event, state, response_iterator)
if proceed:
if response is None:
_server._status(rpc_event, state, None)
break
else:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
print(response)
if serialized_response is not None:
print("Serialized Correctly")
proceed = _server._send_response(rpc_event, state,
serialized_response)
if not proceed:
break
else:
break
else:
break
_server._unary_response_in_pool = _unary_response_in_pool
_server._stream_response_in_pool = _stream_response_in_pool
if __name__ == '__main__':
server = grpc.server(AsyncioExecutor())
# Add Servicer and Start Server Here
import auth_pb2_grpc, auth_pb2
# The servicer can now use async/await syntax
class AuthServiceServicer(auth_pb2_grpc.AuthServiceServicer):
async def DoSomething(self, request, context):
for i in range(10):
print("Sleeping for {} Seconds".format(i))
await asyncio.sleep(i)
yield auth_pb2.Response(num=str(i))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment