Reading cloud event with reactivex and vertx:
@Override
public void start() throws Exception {
final HttpServer server = vertx.createHttpServer();
final Flowable<HttpServerRequest> requestFlowable = server.requestStream().toFlowable();
requestFlowable.subscribe(httpServerRequest -> {
final Observable<CloudEvent> observable = httpServerRequest.
toObservable().
compose(io.vertx.reactivex.core.ObservableHelper.unmarshaller(CloudEventImpl.class));
observable.subscribe(cloudEvent -> {
System.out.println("CE type: " + cloudEvent.getEventType());
});
});
server.listen(8081);
}