Created
December 18, 2020 11:39
-
-
Save michael-simons/de1a608c2cc3c2a746f6099702be6c65 to your computer and use it in GitHub Desktop.
A streaming controller providing tailable queries, result sets…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo.movies; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import java.time.Duration; | |
import java.time.LocalDateTime; | |
import java.util.concurrent.atomic.AtomicReference; | |
import org.springframework.data.domain.Sort; | |
import org.springframework.http.MediaType; | |
import org.springframework.messaging.handler.annotation.MessageMapping; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
/** | |
* A streaming controller based on Project reactor and Spring Data Neo4j. | |
* Thanks to @nmervaillie and @bsideup | |
* | |
* https://github.com/spring-projects/spring-data-neo4j | |
*/ | |
@RestController | |
@RequestMapping("/api/movies") | |
public final class MoviesController { | |
private final MovieRepository movieRepository; | |
MoviesController(MovieRepository movieRepository) { | |
this.movieRepository = movieRepository; | |
} | |
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) | |
// @MessageMapping("movie.stream") This would be a RSocket annotation | |
public Flux<Movie> streamMovies() { | |
// First step: Give me everything that is currently there, sorted from old to new | |
return movieRepository.findAll(Sort.by("createdAt").ascending()) | |
// wait until this ^^ is complete and concat it with a new flux | |
.concatWith( | |
// Which must be deferred, to give an opportunity for | |
Flux.defer(() -> { | |
// A bit of state | |
var lastRequest = new AtomicReference<>(LocalDateTime.now()); | |
// We need to have one more indirection here, so that the state is actually | |
// re-requested when… | |
return Mono.fromSupplier(() -> lastRequest.getAndSet(LocalDateTime.now())) | |
.flatMapMany(movieRepository::findAllByCreatedAtAfter) | |
// we repeat the query after 3 seconds. | |
.repeatWhen(it -> it.delayElements(Duration.ofSeconds(3))); | |
}) | |
); | |
// When something subscribes to this endpoint, it first tries to satifies the requested number of items by | |
// what is already there in the database. With RSocket for example in requestn mode, the query will | |
// rerun every n seconds until the request number is delivered or the client goes away. | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment