Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Notes for Reactor/ Spring Boot WebFlux (Made while studying
  • Reactive Streams Spec: The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

    • Publisher:
      void subscribe(Subscriber<? super T> s)
    • Subscriber:
      void onSubscribe(Subscription s);
      void onNext(T t);
      void onError(Throwable t);
      void onComplete();
    • Subscription:
      void request(long n);
      void cancel();
    • Processor:
      interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
  • Publisher/Subscriber flow (push/pull communication model)

              <------------ subscribe() -------------
              ------ onSubscribe (Subscription) ---->
      Pub     <------------ request(n) --------------   Sub
              ------------- onNext(data) ----------->
                               ... (n times)
              ------------- onComplete() ----------->
  • Main types

  • Notes:

    • Flux stops emitting after an error
    Flux.just("testin", "flux", "out")
            .concatWith(Flux.error(new RuntimeException("Some error")))
            .concatWith(Flux.just("Won't be published"))
            .subscribe(System.out::println, System.err::println);
  • Schedulers:

  • Operations

    • flatMap vs concatMap vs flatMapSequential
    • merge vs concat vs zip
  • Backpressure

    • subscribe overload takes subscription handler, can be used to request n elements or cancel
    • for custom logic provide implementation of BaseSubscriber and override hookOnNext
      finiteFlux.subscribe(new BaseSubscriber<Integer>() {
            protected void hookOnNext(Integer value) {
                System.out.println("Value received is : " + value);
                if(value == 4){
  • Hot Publisher

    Flux<String> stringFlux = Flux.just("A","B","C","D","E","F")
    ConnectableFlux<String> connectableFlux = stringFlux.publish();
    connectableFlux.subscribe(s -> System.out.println("Subscriber 1 : " + s));
    connectableFlux.subscribe(s -> System.out.println("Subscriber 2 : " + s)); // does not emit the values from beginning
    /* Output
    Subscriber 1 : A
    Subscriber 1 : B
    Subscriber 1 : C
    Subscriber 2 : C
    Subscriber 1 : D
    Subscriber 2 : D
    Subscriber 1 : E
    Subscriber 2 : E
    Subscriber 1 : F
    Subscriber 2 : F

  • Spring WebFlux Internals

    • Netty event loop
    • Reactive streams adapter
    • WebFilter
    • WebHandler (DispatcherHandler)
    • Controller/Router
  • Spring WebFlux Controller:

    • Return flux as stream:
      @GetMapping(value = "/fluxstream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
      public Flux<Long> returnInfiniteFluxStream() {
          return Flux.interval(Duration.ofSeconds(1));
    • Test class to be annotated with @RunWith(SpringRunner.class) & @WebFluxTest
      • Doesn't scan @Component, @Repository or @Service, only @Controller
    • Testing approaches for static flux using webTestClient.get().uri("/flux").accept(MediaType.APPLICATION_JSON_UTF8).exchange().expectStatus().isOk():
      1. Get flux using .returnResult(<Class>).getResponseBody() and use StepVerifier
      2. Assert size using .expectBodyList(<Class>).hasSize(<n>)
      3. Get result using .expectBodyList(<Class>).returnResult().getResponseBody() and use assert with expected list
      4. Get result consumer using .expectBodyList(<Class>).sonsumeWith(response -> /* assert with expected list */)
    • Testing flux as stream:
      Flux<Long> longStreamFlux = webTestClient.get().uri("/fluxstream")
    • Testing mono: use approach 4 for static flux
    • Exception Handling:
      • @ExceptionHandler(Class<T extends Exception>) annotated method returning ResponseEntity (Not Mono<ResponseEntity>) in controller class for local handling or in@ControllerAdvice annotated class for global handling
      • Use .expectStatus().is5xxServerError() for testing
  • Spring WebFlux Router:

    • Use @SpringBootTest instead of @WebFluxTest in tests to scan services etc. for routers/handlers
      • Need to explicitly add @AutoConfigureWebTestClient too
    • Exception Handling:
      • DefaultErrorWebExceptionHandler used by default
      • Extend AbstractErrorWebExceptionHandler

  • Netty
    • Channels used for communication b/w server and client through events
      • Stages:
        1. Created
        2. Registered with event loop (So event loop can forward events to it in future)
        3. Active (C/S connected, ready to send & receive events)
        4. InAcitve (C/S are not connected)
        5. Unregistered from event loop
    • Non-blocking client requests data, netty returns promise
    • Event queue is processed by event loop
      • Inbound events: Request for data, post data
      • Outbound events: Connection open/close, response

  • Streaming Endpoint (SSE: Server Sent Events)
    • Connection with the client remains open and server sends items/events as and when they come
    • MongoDB:
      • Tailable Cursors: Connection remains open even after all retrieved (@Tailable annotation on Flux returning methods in the repository)
      • Capped Collections: Fixed size collections that preserve insertion order (Can't be used for permanent storage)
    • MediaType.APPLICATION_STREAM_JSON_VALUE & @Tailable: All entries in MongoDB get delivered to client real time

  • Tidbits
    • Spring has CommandLineRunner interface, to run tasks on startup
    • @DataMongoTest can be used for mongo-only tests (Does not create service & component beans)
    • @Profile("!test") can be used to create service bean for profiles other than test
    • @ResponseStatus(HttpStatus.CREATED) can be used on a controller method to change default status code when return type is T instead of ResponseEntity<T>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment