Skip to content

Instantly share code, notes, and snippets.

@sergiitk
Created October 31, 2023 20:54
Show Gist options
  • Save sergiitk/a4b58a2fb958191638750b408b1ad421 to your computer and use it in GitHub Desktop.
Save sergiitk/a4b58a2fb958191638750b408b1ad421 to your computer and use it in GitHub Desktop.
package hello;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
public class HelloStreamClient {
private final ManagedChannel channel;
private final ClientCallStreamObserver<test.HelloRequest> clientCallStreamObserver;
public HelloStreamClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
clientCallStreamObserver = (ClientCallStreamObserver<test.HelloRequest>) HelloStreamGrpc
.newStub(channel)
.sayHelloStream(
new StreamObserver<>() {
@Override
public void onNext(test.HelloResponse value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public void sayHello() {
int runCount = 10000000;
for (int i = 0; i < runCount; ++i) {
clientCallStreamObserver.onNext(test.HelloRequest.newBuilder().setTraceId(i).build());
}
}
public static void main(String[] args) throws InterruptedException {
HelloStreamClient client = new HelloStreamClient("127.0.0.1", 50051);
client.sayHello();
try {
Thread.sleep(1000000);
} catch (Exception e) {
}
client.shutdown();
}
}
package hello;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class HelloStreamServer {
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new HelloStreamImpl())
.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}));
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final HelloStreamServer server = new HelloStreamServer();
server.start();
server.blockUntilShutdown();
}
private class HelloStreamImpl extends HelloStreamGrpc.HelloStreamImplBase {
@Override
public StreamObserver<test.HelloRequest> sayHelloStream(StreamObserver<test.HelloResponse> responseStreamObserver) {
return new StreamObserver<>() {
@Override
public void onNext(test.HelloRequest value) {
//NOTE: here DO NOT response anything to reproduce the client side memory leak problem
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
};
}
}
}
hello.proto:
syntax = "proto3";
package hello;
option java_package = "hello";
option java_outer_classname = "test";
message HelloRequest {
optional int64 traceId = 1;
}
service HelloStream {
rpc SayHelloStream (stream HelloRequest) returns (stream HelloResponse) {}
}
message HelloResponse {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment