-
Reactive Streams Spec: The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
- Publisher:
void subscribe(Subscriber<? super T> s)
- Subscriber:
void onSubscribe(Subscription s); void onNext(T t); void onError(Throwable t); void onComplete();
- Subscription:
void request(long n); void cancel();
- Processor:
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
- Publisher:
-
Publisher/Subscriber flow (push/pull communication model)
<------------ subscribe() ------------- ------ onSubscribe (Subscription) ----> Pub <------------ request(n) -------------- Sub ------------- onNext(data) -----------> ... (n times) ------------- onComplete() ----------->
-
Main types
- Flux: (0-n items async seq): https://projectreactor.io/docs/core/release/reference/images/flux.svg
- Mono: (0-1 async result): https://projectreactor.io/docs/core/release/reference/images/mono.svg
-
Notes:
- Flux stops emitting after an error
Flux.just("testin", "flux", "out") .concatWith(Flux.error(new RuntimeException("Some error"))) .concatWith(Flux.just("Won't be published")) .subscribe(System.out::println, System.err::println);
-
Schedulers: https://projectreactor.io/docs/core/release/reference/#schedulers
-
Operations
- flatMap vs concatMap vs flatMapSequential
- merge vs concat vs zip
-
Backpressure
- subscribe overload takes subscription handler, can be used to request n elements or cancel
- for custom logic provide implementation of
BaseSubscriber
and overridehookOnNext
finiteFlux.subscribe(new BaseSubscriber<Integer>() { @Override protected void hookOnNext(Integer value) { request(1); System.out.println("Value received is : " + value); if(value == 4){ cancel(); } } });
-
Hot Publisher
Flux<String> stringFlux = Flux.just("A","B","C","D","E","F") .delayElements(Duration.ofSeconds(1)); ConnectableFlux<String> connectableFlux = stringFlux.publish(); connectableFlux.connect(); connectableFlux.subscribe(s -> System.out.println("Subscriber 1 : " + s)); Thread.sleep(3000); connectableFlux.subscribe(s -> System.out.println("Subscriber 2 : " + s)); // does not emit the values from beginning Thread.sleep(4000); /* Output Subscriber 1 : A Subscriber 1 : B Subscriber 1 : C Subscriber 2 : C Subscriber 1 : D Subscriber 2 : D Subscriber 1 : E Subscriber 2 : E Subscriber 1 : F Subscriber 2 : F */
-
Spring WebFlux Internals
- Netty event loop
- Reactive streams adapter
- WebFilter
- WebHandler (DispatcherHandler)
- Controller/Router
-
Spring WebFlux Controller:
- Return flux as stream:
@GetMapping(value = "/fluxstream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<Long> returnInfiniteFluxStream() { return Flux.interval(Duration.ofSeconds(1)); }
- Test class to be annotated with
@RunWith(SpringRunner.class)
&@WebFluxTest
- Doesn't scan
@Component
,@Repository
or@Service
, only@Controller
- Doesn't scan
- Testing approaches for static flux using
webTestClient.get().uri("/flux").accept(MediaType.APPLICATION_JSON_UTF8).exchange().expectStatus().isOk()
:- Get flux using
.returnResult(<Class>).getResponseBody()
and useStepVerifier
- Assert size using
.expectBodyList(<Class>).hasSize(<n>)
- Get result using
.expectBodyList(<Class>).returnResult().getResponseBody()
and use assert with expected list - Get result consumer using
.expectBodyList(<Class>).sonsumeWith(response -> /* assert with expected list */)
- Get flux using
- Testing flux as stream:
Flux<Long> longStreamFlux = webTestClient.get().uri("/fluxstream") .accept(MediaType.APPLICATION_STREAM_JSON) .exchange() .expectStatus().isOk() .returnResult(Long.class) .getResponseBody(); StepVerifier.create(longStreamFlux) .expect(0L) .expect(1L) .expect(2L) .expect(3L) .thenCancel() .verify();
- Testing mono: use approach 4 for static flux
- Exception Handling:
@ExceptionHandler(Class<T extends Exception>)
annotated method returningResponseEntity
(NotMono<ResponseEntity>
) in controller class for local handling or in@ControllerAdvice
annotated class for global handling- Use
.expectStatus().is5xxServerError()
for testing
- Return flux as stream:
-
Spring WebFlux Router:
- Use
@SpringBootTest
instead of@WebFluxTest
in tests to scan services etc. for routers/handlers- Need to explicitly add
@AutoConfigureWebTestClient
too
- Need to explicitly add
- Exception Handling:
DefaultErrorWebExceptionHandler
used by default- Extend
AbstractErrorWebExceptionHandler
- Add
ServerCodecConfigurer
to constructor, and set writers & readers on super - Override
getRoutingFunction
, useRouterFunctions.route(RequestPredicates.all(), this::myExceptionHandler)
- Use
getErrorAttributes
to map of error attributes like the key-values in the default error response json - Example: https://programmer.help/blogs/webflux-rest-api-global-exception-handling-error-handling.html
- Add
- Use
- Netty
- Channels used for communication b/w server and client through events
- Stages:
- Created
- Registered with event loop (So event loop can forward events to it in future)
- Active (C/S connected, ready to send & receive events)
- InAcitve (C/S are not connected)
- Unregistered from event loop
- Stages:
- Non-blocking client requests data, netty returns promise
- Event queue is processed by event loop
- Inbound events: Request for data, post data
- Outbound events: Connection open/close, response
- Channels used for communication b/w server and client through events
- Streaming Endpoint (SSE: Server Sent Events)
- Connection with the client remains open and server sends items/events as and when they come
- MongoDB:
- Tailable Cursors: Connection remains open even after all retrieved (
@Tailable
annotation onFlux
returning methods in the repository) - Capped Collections: Fixed size collections that preserve insertion order (Can't be used for permanent storage)
- Tailable Cursors: Connection remains open even after all retrieved (
MediaType.APPLICATION_STREAM_JSON_VALUE
&@Tailable
: All entries in MongoDB get delivered to client real time
- Tidbits
- Spring has
CommandLineRunner
interface, to run tasks on startup @DataMongoTest
can be used for mongo-only tests (Does not create service & component beans)@Profile("!test")
can be used to create service bean for profiles other than test@ResponseStatus(HttpStatus.CREATED)
can be used on a controller method to change default status code when return type isT
instead ofResponseEntity<T>
- Spring has