Skip to content

Instantly share code, notes, and snippets.

@r-wheeler
Forked from mmellison/grpc_asyncio.py
Created October 23, 2019 21:00
Show Gist options
  • Save r-wheeler/272ba592d2b1d440b1fe4c54330366ad to your computer and use it in GitHub Desktop.
Save r-wheeler/272ba592d2b1d440b1fe4c54330366ad to your computer and use it in GitHub Desktop.
gRPC Servicer with Asyncio (Python 3.6+)
import asyncio
from concurrent import futures
import functools
import inspect
import threading
from grpc import _server
def _loop_mgr(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()
# If we reach here, the loop was stopped.
# We should gather any remaining tasks and finish them.
pending = asyncio.Task.all_tasks(loop=loop)
if pending:
loop.run_until_complete(asyncio.gather(*pending))
class AsyncioExecutor(futures.Executor):
def __init__(self, *, loop=None):
super().__init__()
self._shutdown = False
self._loop = loop or asyncio.get_event_loop()
self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,),
daemon=True)
self._thread.start()
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('Cannot schedule new futures after shutdown')
if not self._loop.is_running():
raise RuntimeError("Loop must be started before any function can "
"be submitted")
if inspect.iscoroutinefunction(fn):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self._loop)
else:
func = functools.partial(fn, *args, **kwargs)
return self._loop.run_in_executor(None, func)
def shutdown(self, wait=True):
self._loop.stop()
self._shutdown = True
if wait:
self._thread.join()
# --------------------------------------------------------------------------- #
async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
context = _server._Context(rpc_event, state, request_deserializer)
try:
return await behavior(argument, context), True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception calling application: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _take_response_from_response_iterator(rpc_event, state, response_iterator):
try:
return await response_iterator.__anext__(), True
except StopAsyncIteration:
return None, True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
response, proceed = await _call_behavior(rpc_event, state, behavior,
argument, request_deserializer)
if proceed:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_server._status(rpc_event, state, serialized_response)
async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
# Notice this calls the normal `_call_behavior` not the awaitable version.
response_iterator, proceed = _server._call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
if proceed:
while True:
response, proceed = await _take_response_from_response_iterator(
rpc_event, state, response_iterator)
if proceed:
if response is None:
_server._status(rpc_event, state, None)
break
else:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
print(response)
if serialized_response is not None:
print("Serialized Correctly")
proceed = _server._send_response(rpc_event, state,
serialized_response)
if not proceed:
break
else:
break
else:
break
_server._unary_response_in_pool = _unary_response_in_pool
_server._stream_response_in_pool = _stream_response_in_pool
if __name__ == '__main__':
server = grpc.server(AsyncioExecutor())
# Add Servicer and Start Server Here
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
import auth_pb2_grpc, auth_pb2
# The servicer can now use async/await syntax
class AuthServiceServicer(auth_pb2_grpc.AuthServiceServicer):
async def DoSomething(self, request, context):
for i in range(10):
print("Sleeping for {} Seconds".format(i))
await asyncio.sleep(i)
yield auth_pb2.Response(num=str(i))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment