Skip to content

Instantly share code, notes, and snippets.

@w4rum
Last active November 21, 2020 08:32
Show Gist options
  • Save w4rum/1f749149cafeec11c863bd625e62a5e9 to your computer and use it in GitHub Desktop.
Save w4rum/1f749149cafeec11c863bd625e62a5e9 to your computer and use it in GitHub Desktop.
syntax = "proto3";
package example_service;
service ExampleService {
rpc ExampleUnaryUnary(ExampleRequest) returns (ExampleResponse);
rpc ExampleUnaryStream(ExampleRequest) returns (stream ExampleResponse);
rpc ExampleStreamUnary(stream ExampleRequest) returns (ExampleResponse);
rpc ExampleStreamStream(stream ExampleRequest) returns (stream ExampleResponse);
}
message ExampleRequest {
string example_string = 1;
int64 example_integer = 2;
}
message ExampleResponse {
string example_string = 1;
int64 example_integer = 2;
}
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: example_service.proto
# plugin: python-betterproto
from dataclasses import dataclass
from typing import AsyncIterable, AsyncIterator, Dict, Iterable, Union
import betterproto
import grpclib
@dataclass(eq=False, repr=False)
class ExampleRequest(betterproto.Message):
example_string: str = betterproto.string_field(1)
example_integer: int = betterproto.int64_field(2)
def __post_init__(self) -> None:
super().__post_init__()
@dataclass(eq=False, repr=False)
class ExampleResponse(betterproto.Message):
example_string: str = betterproto.string_field(1)
example_integer: int = betterproto.int64_field(2)
def __post_init__(self) -> None:
super().__post_init__()
class ExampleServiceStub(betterproto.ServiceStub):
async def example_unary_unary(
self, *, example_string: str = "", example_integer: int = 0
) -> "ExampleResponse":
request = ExampleRequest()
request.example_string = example_string
request.example_integer = example_integer
return await self._unary_unary(
"/example_service.ExampleService/ExampleUnaryUnary",
request,
ExampleResponse,
)
async def example_unary_stream(
self, *, example_string: str = "", example_integer: int = 0
) -> AsyncIterator["ExampleResponse"]:
request = ExampleRequest()
request.example_string = example_string
request.example_integer = example_integer
async for response in self._unary_stream(
"/example_service.ExampleService/ExampleUnaryStream",
request,
ExampleResponse,
):
yield response
async def example_stream_unary(
self,
request_iterator: Union[
AsyncIterable["ExampleRequest"], Iterable["ExampleRequest"]
],
) -> "ExampleResponse":
return await self._stream_unary(
"/example_service.ExampleService/ExampleStreamUnary",
request_iterator,
ExampleRequest,
ExampleResponse,
)
async def example_stream_stream(
self,
request_iterator: Union[
AsyncIterable["ExampleRequest"], Iterable["ExampleRequest"]
],
) -> AsyncIterator["ExampleResponse"]:
async for response in self._stream_stream(
"/example_service.ExampleService/ExampleStreamStream",
request_iterator,
ExampleRequest,
ExampleResponse,
):
yield response
class ExampleServiceImplementation(betterproto.ServiceImplementation):
async def example_unary_unary(
self, example_string: str, example_integer: int
) -> "ExampleResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def example_unary_stream(
self, example_string: str, example_integer: int
) -> AsyncIterator["ExampleResponse"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def example_stream_unary(
self, example_request_iterator: AsyncIterable["ExampleRequest"]
) -> "ExampleResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def example_stream_stream(
self, example_request_iterator: AsyncIterable["ExampleRequest"]
) -> AsyncIterator["ExampleResponse"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_example_unary_unary(self, stream):
request = await stream.recv_message()
request_kwargs = {
"example_string": request.example_string,
"example_integer": request.example_integer,
}
await self._call_rpc_handler_server_unary(
self.example_unary_unary,
stream,
request_kwargs,
)
async def __rpc_example_unary_stream(self, stream):
request = await stream.recv_message()
request_kwargs = {
"example_string": request.example_string,
"example_integer": request.example_integer,
}
await self._call_rpc_handler_server_stream(
self.example_unary_stream,
stream,
request_kwargs,
)
async def __rpc_example_stream_unary(self, stream):
request_kwargs = {"example_request_iterator": stream.__aiter__()}
await self._call_rpc_handler_server_unary(
self.example_stream_unary,
stream,
request_kwargs,
)
async def __rpc_example_stream_stream(self, stream):
request_kwargs = {"example_request_iterator": stream.__aiter__()}
await self._call_rpc_handler_server_stream(
self.example_stream_stream,
stream,
request_kwargs,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/example_service.ExampleService/ExampleUnaryUnary": grpclib.const.Handler(
self.__rpc_example_unary_unary,
grpclib.const.Cardinality.UNARY_UNARY,
ExampleRequest,
ExampleResponse,
),
"/example_service.ExampleService/ExampleUnaryStream": grpclib.const.Handler(
self.__rpc_example_unary_stream,
grpclib.const.Cardinality.UNARY_STREAM,
ExampleRequest,
ExampleResponse,
),
"/example_service.ExampleService/ExampleStreamUnary": grpclib.const.Handler(
self.__rpc_example_stream_unary,
grpclib.const.Cardinality.STREAM_UNARY,
ExampleRequest,
ExampleResponse,
),
"/example_service.ExampleService/ExampleStreamStream": grpclib.const.Handler(
self.__rpc_example_stream_stream,
grpclib.const.Cardinality.STREAM_STREAM,
ExampleRequest,
ExampleResponse,
),
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment