Skip to content

Instantly share code, notes, and snippets.

@oillio
Created October 5, 2016 20:05
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save oillio/75ffadfd44c30497a4e8cf6476eff438 to your computer and use it in GitHub Desktop.
Save oillio/75ffadfd44c30497a4e8cf6476eff438 to your computer and use it in GitHub Desktop.
bi-directional stream gRPC with RxJava
public class MyServiceImpl extends DeviceServiceGrpc.DeviceServiceImplBase {
@Override
public StreamObserver<GetLocationRequest> getLocations(StreamObserver<GetLocationResponse> responseObserver) {
RequestBridge<GetLocationRequest> request = new RequestBridge<>();
ResponseBridge response = new ResponseBridge((ServerCallStreamObserver) responseObserver);
request.map(this::doGetLocation)
.subscribe(response);
return request;
}
}
public class RequestBridge<T> extends Flowable<T> implements StreamObserver<T> {
private final FlowableProcessor<T> processor = UnicastProcessor.create();
@Override
public void onNext(T value) {
processor.onNext(value);
}
@Override
public void onError(Throwable t) {
processor.onError(t);
}
@Override
public void onCompleted() {
processor.onComplete();
}
@Override
protected void subscribeActual(Subscriber<? super T> s) {
processor.subscribe(s);
}
}
@RequiredArgsConstructor
public class ResponseBridge<T> implements Subscriber<T> {
private final ServerCallStreamObserver<T> observer;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
observer.setOnCancelHandler(s::cancel);
observer.setOnReadyHandler(() -> s.request(1));
}
@Override
public void onNext(T t) {
observer.onNext(t);
if(observer.isReady()) {
subscription.request(1);
}
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
observer.onCompleted();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment