Skip to content

Instantly share code, notes, and snippets.

@naggie
Last active June 18, 2019 04:10
Show Gist options
  • Save naggie/1c432b41613a23497a3c6f67c0adac28 to your computer and use it in GitHub Desktop.
Save naggie/1c432b41613a23497a3c6f67c0adac28 to your computer and use it in GitHub Desktop.

grpc_pass seems to cause grpc core to do a TCP reset when streaming a lot of data, ostensibly when response headers are being sent.

The grpc client experiences a HTTP/2 RST_STREAM frame.

The problem appears to be fixed if the grpc_buffer_size is set to a large number such as 100M. Presumably, this makes sense if the upstream GRPC server is able to produce data faster than the client can receive it; however I expected grpc_pass to act like proxy_pass and use a temporary file if the buffer size is exceeded.

Alternatively, disabling buffering and relying on flow-control seems like a good option, considering the GRPC stream data may be real-time in some use cases. With proxy_pass this is possible with proxy_buffering off. Unfortunately there is no such option with grpc_pass.

To reproduce

Get a VM running Ubuntu 16.04 Install nginx/1.13.10 Install python3/pip Install grpcio 1.10.0 (python -m pip3 install grpcio==1.10.0)

Run in terminal A: # note absolute filepath required nginx -c $(pwd)/nginx.conf

Run in terminal B:

python3 server.py

Run in terminal C:

python3 client.py

To prove that nginx triggers the behaviour, uncomment the address in client.py which bypasses nginx.

Error outputs

client.py:

..^CTraceback (most recent call last):
  File "client.py", line 18, in <module>
	for file_chunk in stub.GetFile(Sha256(sha256='a'*64)):
  File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 347, in __next__
	return self._next()
  File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 341, in _next
	raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>

nginx:

2018/04/04 12:49:34 [error] 27152#27152: *305 upstream prematurely closed connection while reading response header from upstream, client: 127.0.0.1, server: , request: "POST /agent.AgentService/GetFile HTTP/2.0", upstream: "grpc://127.0.0.1:50051", host: "localhost:7777"
127.0.0.1 - - [04/Apr/2018:12:49:34 +0100] "POST /agent.AgentService/GetFile HTTP/2.0" 200 4710410 "-" "grpc-python/1.10.0 grpc-c/6.0.0 (manylinux; chttp2; glamorous)"

grpc core (GRPC_TRACE=all GRPC_VERBOSITY=ERROR):

D0404 11:29:16.865209765   24981 connectivity_state.cc:162]  SET: 0x7fc0c4008460 server_transport: READY --> SHUTDOWN [close_transport] error=0x7fc0c400caf0 {"created":"@1522837756.865141109","description":"Delayed close due to in-progress write","file":"src/core/ext/transport/chttp2/transport/chttp2_transport.cc","file_line":594,"referenced_errors":[{"created":"@1522837756.865132502","description":"Endpoint read failed","file":"src/core/ext/transport/chttp2/transport/chttp2_transport.cc","file_line":2425,"occurred_during_write":1,"referenced_errors":[{"created":"@1522837756.865129250","description":"OS Error","errno":104,"fd":8,"file":"src/core/lib/iomgr/tcp_posix.cc","file_line":413,"grpc_status":14,"os_error":"Connection reset by peer","syscall":"recvmsg","target_address":"ipv4:127.0.0.1:36942"}]},{"created":"@1522837756.865165243","description":"OS Error","errno":32,"fd":8,"file":"src/core/lib/iomgr/tcp_posix.cc","file_line":571,"grpc_status":14,"os_error":"Broken pipe","syscall":"sendmsg","target_address":"ipv4:127.0.0.1:36942"}]}

Notes

Bug is intermittent and sometimes dependent on BLOCK_SIZE in server.py.

  • Over the internet, bug seems to occur more readily, even with the small block size
  • Locally, the configured block size always triggers but the small block size does not
  • Sometimes the large block size works (at least over the internet)

Seems to affect Ubuntu 16.04 and Mac OS 10.12.6 (Sierra) differently. On Mac, client will receive an intermittent 502 instead of RST_STREAM error 2.

syntax = "proto3";
package agent;
service AgentService {
rpc GetFile(Sha256) returns (stream FileChunk);
}
message Sha256 {
string sha256 = 1;
}
message FileChunk {
uint64 total_size = 1;
bytes data = 2;
}
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: agent.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='agent.proto',
package='agent',
syntax='proto3',
serialized_pb=_b('\n\x0b\x61gent.proto\x12\x05\x61gent\"\x18\n\x06Sha256\x12\x0e\n\x06sha256\x18\x01 \x01(\t\"-\n\tFileChunk\x12\x12\n\ntotal_size\x18\x01 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x32<\n\x0c\x41gentService\x12,\n\x07GetFile\x12\r.agent.Sha256\x1a\x10.agent.FileChunk0\x01\x62\x06proto3')
)
_SHA256 = _descriptor.Descriptor(
name='Sha256',
full_name='agent.Sha256',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='sha256', full_name='agent.Sha256.sha256', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=22,
serialized_end=46,
)
_FILECHUNK = _descriptor.Descriptor(
name='FileChunk',
full_name='agent.FileChunk',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='total_size', full_name='agent.FileChunk.total_size', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='data', full_name='agent.FileChunk.data', index=1,
number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=48,
serialized_end=93,
)
DESCRIPTOR.message_types_by_name['Sha256'] = _SHA256
DESCRIPTOR.message_types_by_name['FileChunk'] = _FILECHUNK
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Sha256 = _reflection.GeneratedProtocolMessageType('Sha256', (_message.Message,), dict(
DESCRIPTOR = _SHA256,
__module__ = 'agent_pb2'
# @@protoc_insertion_point(class_scope:agent.Sha256)
))
_sym_db.RegisterMessage(Sha256)
FileChunk = _reflection.GeneratedProtocolMessageType('FileChunk', (_message.Message,), dict(
DESCRIPTOR = _FILECHUNK,
__module__ = 'agent_pb2'
# @@protoc_insertion_point(class_scope:agent.FileChunk)
))
_sym_db.RegisterMessage(FileChunk)
_AGENTSERVICE = _descriptor.ServiceDescriptor(
name='AgentService',
full_name='agent.AgentService',
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=95,
serialized_end=155,
methods=[
_descriptor.MethodDescriptor(
name='GetFile',
full_name='agent.AgentService.GetFile',
index=0,
containing_service=None,
input_type=_SHA256,
output_type=_FILECHUNK,
options=None,
),
])
_sym_db.RegisterServiceDescriptor(_AGENTSERVICE)
DESCRIPTOR.services_by_name['AgentService'] = _AGENTSERVICE
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import agent_pb2 as agent__pb2
class AgentServiceStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.GetFile = channel.unary_stream(
'/agent.AgentService/GetFile',
request_serializer=agent__pb2.Sha256.SerializeToString,
response_deserializer=agent__pb2.FileChunk.FromString,
)
class AgentServiceServicer(object):
# missing associated documentation comment in .proto file
pass
def GetFile(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_AgentServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'GetFile': grpc.unary_stream_rpc_method_handler(
servicer.GetFile,
request_deserializer=agent__pb2.Sha256.FromString,
response_serializer=agent__pb2.FileChunk.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'agent.AgentService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
import grpc
from agent_pb2 import Sha256
import agent_pb2_grpc
# with nginx
address = '127.0.0.1:7777'
# without nginx
#address = '127.0.0.1:50051'
channel = grpc.insecure_channel(address)
stub = agent_pb2_grpc.AgentServiceStub(channel)
for file_chunk in stub.GetFile(Sha256(sha256='a'*64)):
print('.',end='',flush=True)
error_log /dev/stdout;
events {}
pid /dev/null;
daemon off;
http {
access_log off;
client_max_body_size 4000M;
grpc_read_timeout 1d;
grpc_send_timeout 1d;
# this seems to fix it; but see comment in README.md
#grpc_buffer_size 100m;
server {
listen 7777 default_server http2;
location / {
grpc_pass grpc://localhost:50051;
}
}
}
from concurrent import futures
import grpc
from time import sleep
from agent_pb2 import FileChunk
from agent_pb2 import Sha256
import agent_pb2_grpc
from os import stat
# remote client gets nothing
# local client gets 1 block
BLOCK_SIZE = 10**3* 512
# local client gets all blocks
# remote client gets 72 blocks
#BLOCK_SIZE = 10**3
class AgentServiceServicer(agent_pb2_grpc.AgentServiceServicer):
def GetFile(self, request, context):
metadata = dict(context.invocation_metadata())
size = 1000 * 1000 * 50
written = 0
while True:
block = b'F' * BLOCK_SIZE
if written >= size:
break
if not context.is_active():
break
written += BLOCK_SIZE;
yield FileChunk(
total_size=size,
data=block,
)
def serve():
# only overridden for tests
server = grpc.server(futures.ThreadPoolExecutor(max_workers=500))
agent_pb2_grpc.add_AgentServiceServicer_to_server(
AgentServiceServicer(),
server)
server.add_insecure_port('127.0.0.1:50051')
server.start()
try:
while True:
sleep(1)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment