Skip to content

Instantly share code, notes, and snippets.

@timjonesdev
Created May 21, 2019 20:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timjonesdev/f8b5635e68f9abbf4fe45e72082fd392 to your computer and use it in GitHub Desktop.
Save timjonesdev/f8b5635e68f9abbf4fe45e72082fd392 to your computer and use it in GitHub Desktop.
An example of how to configure a reactive changestream watcher with Spring Boot 2 and Reactive MongoDB
// uses Spring Boot 2, Spring Webflux, Reactive Mongo Repository
public class ChangeStream {
@Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;
public Flux<SampleDao> watchChanges() {
// set changestream options to watch for any changes to the sample collection
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(SampleDao.class,
Aggregation.match(
Criteria.where("operationType").is("replace")
)
)).returnFullDocumentOnUpdate().build();
// return a flux that watches the changestream and returns the full document
return reactiveMongoTemplate.changeStream("samples", options, SampleDao.class)
.map(ChangeStreamEvent::getBody)
.doOnError(throwable -> log.error("Error with the samples changestream event: " + throwable.getMessage(), throwable));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment