Skip to content

Instantly share code, notes, and snippets.

@Marthym
Last active October 15, 2022 21:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Marthym/a90e5dffae9779ffb09c290a14f4d314 to your computer and use it in GitHub Desktop.
Save Marthym/a90e5dffae9779ffb09c290a14f4d314 to your computer and use it in GitHub Desktop.
package fr.ght1pc9kc.baywatch.notify.domain;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.f4b6a3.ulid.UlidCreator;
import fr.ght1pc9kc.baywatch.notify.api.NotifyManager;
import fr.ght1pc9kc.baywatch.notify.api.NotifyService;
import fr.ght1pc9kc.baywatch.notify.api.model.BasicEvent;
import fr.ght1pc9kc.baywatch.notify.api.model.EventType;
import fr.ght1pc9kc.baywatch.notify.api.model.ReactiveEvent;
import fr.ght1pc9kc.baywatch.notify.api.model.ServerEvent;
import fr.ght1pc9kc.baywatch.notify.domain.model.ByUserEventPublisherCacheEntry;
import fr.ght1pc9kc.baywatch.security.api.AuthenticationFacade;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class NotifyServiceImpl implements NotifyService, NotifyManager {
private final AuthenticationFacade authFacade;
private final Sinks.Many<ServerEvent<Object>> multicast;
private final Cache<String, ByUserEventPublisherCacheEntry> cache;
public NotifyServiceImpl(AuthenticationFacade authenticationFacade) {
this.authFacade = authenticationFacade;
this.multicast = Sinks.many().multicast().directBestEffort();
this.cache = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMinutes(30))
.maximumSize(1000)
.<String, ByUserEventPublisherCacheEntry>evictionListener((key, value, cause) -> {
if (value != null) {
value.sink().tryEmitComplete();
Subscription subscription = value.subscription().getAndSet(null);
if (subscription != null) {
subscription.cancel();
}
}
})
.build();
}
@Override
public Flux<ServerEvent<Object>> subscribe() {
if (multicast.isScanAvailable() && Boolean.TRUE.equals(multicast.scan(Scannable.Attr.TERMINATED))) {
return Flux.error(() -> new IllegalStateException("Publisher was closed !"));
}
return authFacade.getConnectedUser().flatMapMany(u ->
Objects.requireNonNull(cache.get(u.id, id -> {
Sinks.Many<ServerEvent<Object>> sink = Sinks.many().multicast().directBestEffort();
AtomicReference<Subscription> subscription = new AtomicReference<>();
Flux<ServerEvent<Object>> multicastFlux = this.multicast.asFlux().doOnSubscribe(subscription::set);
Flux<ServerEvent<Object>> eventPublisher = Flux.merge(sink.asFlux(), multicastFlux)
.takeWhile(e -> cache.asMap().containsKey(id))
.map(e -> {
log.debug("Event: {}", e);
return e;
}).cache(0);
return new ByUserEventPublisherCacheEntry(subscription, sink, eventPublisher);
})).flux());
}
@Override
public Mono<Boolean> unsubscribe() {
return authFacade.getConnectedUser()
.filter(u -> cache.asMap().containsKey(u.id))
.map(u -> {
log.debug("Dispose SSE Subscription for {}", u.self.login);
cache.invalidate(u.id);
return true;
});
}
@Override
public void close() {
this.multicast.tryEmitComplete();
this.cache.invalidateAll();
this.cache.cleanUp();
}
@Override
@SuppressWarnings("unchecked")
public <T> BasicEvent<T> send(String userId, EventType type, T data) {
BasicEvent<T> event = new BasicEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
Optional.ofNullable(cache.getIfPresent(userId))
.map(ByUserEventPublisherCacheEntry::sink)
.ifPresent(sk -> emit(sk, (ServerEvent<Object>) event));
return event;
}
@Override
@SuppressWarnings("unchecked")
public <T> ReactiveEvent<T> send(String userId, EventType type, Mono<T> data) {
ReactiveEvent<T> event = new ReactiveEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
Optional.ofNullable(cache.getIfPresent(userId))
.map(ByUserEventPublisherCacheEntry::sink)
.ifPresent(sk -> emit(sk, (ServerEvent<Object>) event));
return event;
}
@Override
@SuppressWarnings("unchecked")
public <T> BasicEvent<T> broadcast(EventType type, T data) {
BasicEvent<T> event = new BasicEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
emit(this.multicast, (ServerEvent<Object>) event);
return event;
}
@Override
@SuppressWarnings("unchecked")
public <T> ReactiveEvent<T> broadcast(EventType type, Mono<T> data) {
ReactiveEvent<T> event = new ReactiveEvent<>(UlidCreator.getMonotonicUlid().toString(), type, data);
emit(this.multicast, (ServerEvent<Object>) event);
return event;
}
public long countCacheEntries() {
return cache.estimatedSize();
}
private void emit(@Nullable Sinks.Many<ServerEvent<Object>> sink, ServerEvent<Object> event) {
if (sink == null) {
log.debug("No subscriber listening the SSE entry point.");
return;
}
EmitResult result = sink.tryEmitNext(event);
if (result.isFailure()) {
if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
log.debug("No subscriber listening the SSE entry point.");
} else {
log.warn("{} on emit notification", result);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment