Skip to content

Instantly share code, notes, and snippets.

@matzew
Created June 11, 2018 12:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matzew/e6a97efaa3a67bdcb941df6d46458170 to your computer and use it in GitHub Desktop.
Save matzew/e6a97efaa3a67bdcb941df6d46458170 to your computer and use it in GitHub Desktop.

Reading cloud event with reactivex and vertx:

    @Override
    public void start() throws Exception {

        final HttpServer server = vertx.createHttpServer();
        final Flowable<HttpServerRequest> requestFlowable = server.requestStream().toFlowable();

        requestFlowable.subscribe(httpServerRequest -> {

            final Observable<CloudEvent> observable = httpServerRequest.
                    toObservable().
                    compose(io.vertx.reactivex.core.ObservableHelper.unmarshaller(CloudEventImpl.class));

            observable.subscribe(cloudEvent -> {

                System.out.println("CE type:        " + cloudEvent.getEventType());
            });

        });

        server.listen(8081);
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment