Skip to content

Instantly share code, notes, and snippets.

@michael-simons
Created December 18, 2020 11:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michael-simons/de1a608c2cc3c2a746f6099702be6c65 to your computer and use it in GitHub Desktop.
Save michael-simons/de1a608c2cc3c2a746f6099702be6c65 to your computer and use it in GitHub Desktop.
A streaming controller providing tailable queries, result sets…
package com.example.demo.movies;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.data.domain.Sort;
import org.springframework.http.MediaType;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* A streaming controller based on Project reactor and Spring Data Neo4j.
* Thanks to @nmervaillie and @bsideup
*
* https://github.com/spring-projects/spring-data-neo4j
*/
@RestController
@RequestMapping("/api/movies")
public final class MoviesController {
private final MovieRepository movieRepository;
MoviesController(MovieRepository movieRepository) {
this.movieRepository = movieRepository;
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
// @MessageMapping("movie.stream") This would be a RSocket annotation
public Flux<Movie> streamMovies() {
// First step: Give me everything that is currently there, sorted from old to new
return movieRepository.findAll(Sort.by("createdAt").ascending())
// wait until this ^^ is complete and concat it with a new flux
.concatWith(
// Which must be deferred, to give an opportunity for
Flux.defer(() -> {
// A bit of state
var lastRequest = new AtomicReference<>(LocalDateTime.now());
// We need to have one more indirection here, so that the state is actually
// re-requested when…
return Mono.fromSupplier(() -> lastRequest.getAndSet(LocalDateTime.now()))
.flatMapMany(movieRepository::findAllByCreatedAtAfter)
// we repeat the query after 3 seconds.
.repeatWhen(it -> it.delayElements(Duration.ofSeconds(3)));
})
);
// When something subscribes to this endpoint, it first tries to satifies the requested number of items by
// what is already there in the database. With RSocket for example in requestn mode, the query will
// rerun every n seconds until the request number is delivered or the client goes away.
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment