Skip to content

Instantly share code, notes, and snippets.

@bsideup
Last active March 16, 2024 20:06
Show Gist options
  • Save bsideup/bd62eb19e7d7cbc590fad07b140c9201 to your computer and use it in GitHub Desktop.
Save bsideup/bd62eb19e7d7cbc590fad07b140c9201 to your computer and use it in GitHub Desktop.
package io.grpc.clientchannel.example;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.clientchannel.ClientChannelService;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.protobuf.SimpleRequest;
import io.grpc.testing.protobuf.SimpleResponse;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import io.grpc.util.MutableHandlerRegistry;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class TunnelClient {
public static Metadata.Key<String> PREFIX_HEADER = Metadata.Key.of("zone", Metadata.ASCII_STRING_MARSHALLER);
public static void main(String[] args) throws Exception {
io.grpc.ManagedChannel networkChannel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
Metadata headers = new Metadata();
headers.put(PREFIX_HEADER, "Value: ");
ClientChannelService.registerServer(
networkChannel,
headers,
b -> {
MutableHandlerRegistry registry = new MutableHandlerRegistry();
registry.addService(new VMDriverService());
b.fallbackHandlerRegistry(registry);
}
);
Thread.currentThread().join();
}
static class VMDriverService extends SimpleServiceGrpc.SimpleServiceImplBase {
@Override
public void serverStreamingRpc(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
AtomicInteger counter = new AtomicInteger(5);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
() -> {
int i = counter.getAndDecrement();
if (i > 0) {
responseObserver.onNext(
SimpleResponse.newBuilder()
.setResponseMessage("Hello " + request.getRequestMessage() + " " + i)
.build()
);
} else {
responseObserver.onCompleted();
throw new RuntimeException("Completed");
}
},
0,
1,
TimeUnit.SECONDS
);
}
}
}
package io.grpc.clientchannel.example;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.clientchannel.ClientChannelService;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.protobuf.SimpleRequest;
import io.grpc.testing.protobuf.SimpleResponse;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceStub;
import static io.grpc.clientchannel.example.TunnelClient.PREFIX_HEADER;
public class TunnelServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder
.forPort(50051)
.addService(
new ClientChannelService() {
@Override
protected void onChannel(ManagedChannel channel, Metadata headers) {
String prefix = headers.get(PREFIX_HEADER);
SimpleServiceStub stub = SimpleServiceGrpc.newStub(channel);
stub.serverStreamingRpc(
SimpleRequest.newBuilder().setRequestMessage("foo").build(),
new StreamObserver<SimpleResponse>() {
@Override
public void onNext(SimpleResponse value) {
System.out.println(prefix + value.getResponseMessage());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Completed");
}
}
);
}
}
)
.build()
.start();
server.awaitTermination();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment