Skip to content

Instantly share code, notes, and snippets.

@rmichela
Last active January 27, 2019 03:18
Show Gist options
  • Save rmichela/470880b2d67858700dad7dc244a81ad8 to your computer and use it in GitHub Desktop.
Save rmichela/470880b2d67858700dad7dc244a81ad8 to your computer and use it in GitHub Desktop.
gRPC backpressure
syntax = "proto3";
package servicelibs;
option java_package = "servicelibs";
option java_outer_classname = "NumberProto";
import "google/protobuf/empty.proto";
service Numbers {
rpc RequestPressure (stream Number) returns (google.protobuf.Empty) {}
}
message Number {
int32 number = 1;
}
import com.google.protobuf.Empty;
import io.grpc.Channel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
public class BackpressureTest {
public static void main(String[] args) throws Exception {
final Object lock = new Object();
Server server = InProcessServerBuilder.forName("svc").addService(new NumbersImpl()).build().start();
Channel channel = InProcessChannelBuilder.forName("svc").build();
NumbersGrpc.NumbersStub stub = NumbersGrpc.newStub(channel);
final Iterator<Integer> seq = new Sequence(50).iterator();
StreamObserver<NumberProto.Number> requestObserver = stub.requestPressure(new ClientResponseObserver<NumberProto.Number, Empty>() {
public void beforeStart(final ClientCallStreamObserver<NumberProto.Number> requestStream) {
requestStream.setOnReadyHandler(new Runnable() {
public void run() {
if (seq.hasNext()) {
int i = seq.next();
System.out.println(i + " -->");
requestStream.onNext(asNum(i));
} else {
requestStream.onCompleted();
}
}
});
}
public void onNext(Empty value) {
System.out.println("Got empty");
}
public void onError(Throwable t) {
t.printStackTrace();
}
public void onCompleted() {
synchronized (lock) {
lock.notify();
}
}
});
synchronized (lock) {
lock.wait();
}
}
private static class NumbersImpl extends NumbersGrpc.NumbersImplBase {
@Override
public StreamObserver<NumberProto.Number> requestPressure(final StreamObserver<Empty> responseObserver) {
final ServerCallStreamObserver<Empty> serverCallStreamObserver = (ServerCallStreamObserver<Empty>) responseObserver;
serverCallStreamObserver.disableAutoInboundFlowControl();
serverCallStreamObserver.request(1);
return new StreamObserver<NumberProto.Number>() {
public void onNext(NumberProto.Number value) {
try { Thread.sleep(500); } catch (InterruptedException e) {}
System.out.println(" --> " + value.getNumber());
serverCallStreamObserver.request(1);
}
public void onError(Throwable t) {
}
public void onCompleted() {
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
}
private static NumberProto.Number asNum(int i) {
return NumberProto.Number.newBuilder().setNumber(i).build();
}
}
import java.util.Iterator;
public class Sequence implements Iterable<Integer> {
private final int max;
public Sequence(int max) {
this.max = max;
}
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
int i = 0;
public void remove() {
throw new UnsupportedOperationException();
}
public boolean hasNext() {
return i < max;
}
public Integer next() {
try { Thread.sleep(250); } catch (InterruptedException e) {}
return i++;
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment