package hello;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
public class HelloStreamClient {
private final ManagedChannel channel;
private final ClientCallStreamObserver<test.HelloRequest> clientCallStreamObserver;
public HelloStreamClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
clientCallStreamObserver = (ClientCallStreamObserver<test.HelloRequest>) HelloStreamGrpc
new StreamObserver<>() {
public void onNext(test.HelloResponse value) {
public void onError(Throwable t) {
public void onCompleted() {
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
public void sayHello() {
int runCount = 10000000;
for (int i = 0; i < runCount; ++i) {
public static void main(String[] args) throws InterruptedException {
HelloStreamClient client = new HelloStreamClient("", 50051);
try {
} catch (Exception e) {
package hello;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
public class HelloStreamServer {
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new HelloStreamImpl())
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
} catch (InterruptedException e) {
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
public static void main(String[] args) throws IOException, InterruptedException {
final HelloStreamServer server = new HelloStreamServer();
private class HelloStreamImpl extends HelloStreamGrpc.HelloStreamImplBase {
public StreamObserver<test.HelloRequest> sayHelloStream(StreamObserver<test.HelloResponse> responseStreamObserver) {
return new StreamObserver<>() {
public void onNext(test.HelloRequest value) {
//NOTE: here DO NOT response anything to reproduce the client side memory leak problem
public void onError(Throwable throwable) {
public void onCompleted() {
syntax = "proto3";
package hello;
option java_package = "hello";
option java_outer_classname = "test";
message HelloRequest {
optional int64 traceId = 1;
service HelloStream {
rpc SayHelloStream (stream HelloRequest) returns (stream HelloResponse) {}
message HelloResponse {
