Skip to content

Instantly share code, notes, and snippets.

@jclab-joseph
Created November 17, 2022 02:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jclab-joseph/48db0460a8089344dc308a3e7be4dbbf to your computer and use it in GitHub Desktop.
Save jclab-joseph/48db0460a8089344dc308a3e7be4dbbf to your computer and use it in GitHub Desktop.
GRPC ServerCallStreamObserver Problem
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import kr.jclab.grpcoverwebsocket.sample.model.GreeterGrpc;
import kr.jclab.grpcoverwebsocket.sample.model.HelloReply;
import kr.jclab.grpcoverwebsocket.sample.model.HelloRequest;
import lombok.extern.slf4j.Slf4j;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
@Slf4j
public class PlainClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = NettyChannelBuilder.forTarget("localhost:10001")
.keepAliveTime(1, TimeUnit.SECONDS)
.usePlaintext()
.build();
log.info("GreeterGrpc.newStub BEFORE");
GreeterGrpc.GreeterBlockingStub greeterGrpc = GreeterGrpc.newBlockingStub(channel);
log.info("GreeterGrpc.newStub AFTER");
while (true) {
// CompletableFuture<Void> completableFuture = new CompletableFuture<>();
log.error("RE SUBSCRINBE!");
Iterator<HelloReply> iterator = greeterGrpc.muchHello(
HelloRequest.newBuilder()
.setName("Banana")
.build()
// new StreamObserver<HelloReply>() {
// @Override
// public void onNext(HelloReply value) {
// log.info("greeterGrpc.muchHello REPLY / " + value);
// }
//
// @Override
// public void onError(Throwable t) {
// completableFuture.completeExceptionally(t);
// }
//
// @Override
// public void onCompleted() {
// completableFuture.complete(null);
// }
// }
);
try {
while (iterator.hasNext()) {
HelloReply reply = iterator.next();
log.info("REPLY {}", reply);
}
log.error("DONE!");
// log.error("AWAIT: 1");
// completableFuture.get();
// log.error("AWAIT: 2");
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
if (false) {
break;
}
}
log.info("clientChannel.shutdown BEFORE");
channel.shutdown();
log.info("clientChannel.shutdown AFTER");
}
}
import io.grpc.Context;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class PlainServer {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Server server = NettyServerBuilder.forPort(10001)
.keepAliveTime(1, TimeUnit.SECONDS)
.keepAliveTimeout(1, TimeUnit.SECONDS)
.addService(new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
super.sayHello(request, responseObserver);
}
@Override
public void muchHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
UUID uuid = UUID.randomUUID();
log.warn("muchHello Invoked (name = {})", request.getName());
try {
ServerCallStreamObserver<HelloReply> serverCallStreamObserver = ((ServerCallStreamObserver<HelloReply>) responseObserver);
Context.current().addListener((context) -> {
log.error("muchHello / Context.addListener");
}, executorService);
serverCallStreamObserver.setOnCloseHandler(() -> {
log.error("muchHello / serverCallStreamObserver.setOnCloseHandler");
});
serverCallStreamObserver.setOnCancelHandler(() -> {
log.error("muchHello / serverCallStreamObserver.setOnCancelHandler");
});
int count = 0;
while (true) {
int a = count++;
log.info("SESSION[{}] WRITT DO : {} || {}", uuid.toString(), Context.current().isCancelled(), serverCallStreamObserver.isCancelled());
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage("Bonjour: " + a)
.build()
);
log.info("SESSION[{}] WRITTEN", uuid.toString());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (a> 1000) {
break;
}
}
responseObserver.onCompleted();
// super.muchHello(request, responseObserver);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public StreamObserver<MessageRequest> talks(StreamObserver<MessageReply> responseObserver) {
return super.talks(responseObserver);
}
})
.build()
.start();
server.awaitTermination();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment