Skip to content

Instantly share code, notes, and snippets.

@ova2
Created July 27, 2021 19:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ova2/b600a548ea124afa7bfd38072a989e64 to your computer and use it in GitHub Desktop.
Save ova2/b600a548ea124afa7bfd38072a989e64 to your computer and use it in GitHub Desktop.
@Service
@Slf4j
@RequiredArgsConstructor
public class SspDefinitionenStore implements SspDefinitionConsumer {
private CacheMono<VersionedId, SspDefinition, SspDefinition> sspDefinitionCache;
private FluxSink<SspDefinition> sspDefinitionSink;
@PostConstruct
public void initialize() {
sspDefinitionCache = CacheMono.fromPublisher(
Flux.create(sink -> sspDefinitionSink = sink),
SspDefinition::getId);
}
@Override
public void accept(SspDefinition sspDefinition) {
sspDefinitionSink.next(sspDefinition);
}
public Mono<SspDefinition> lookupSspDefinition(VersionedId sspId) {
return sspDefinitionCache.lookup(sspId)
.doOnNext(sspTopology -> log.info(
"SspDefinition was found by lookup with key {}", sspId))
.onErrorResume(err -> {
log.error("Error on lookup SspDefinition by key {}, message: {}",
sspId, err.getMessage());
return Mono.empty();
});
}
public Optional<SspDefinition> findSspDefinition(VersionedId sspId) {
return sspDefinitionCache.getValue(sspId);
}
public Flux<SspDefinition> findSspDefinitions() {
return Flux.fromStream(sspDefinitionCache.getValues().filter(Objects::nonNull));
}
...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment