Skip to content

Instantly share code, notes, and snippets.

@osi
Created April 27, 2017 18:28
Show Gist options
  • Save osi/df0fb7a4cbb721baa0bd0ff9aa12b5f5 to your computer and use it in GitHub Desktop.
Save osi/df0fb7a4cbb721baa0bd0ff9aa12b5f5 to your computer and use it in GitHub Desktop.
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
public class DelayCancelPublisher<T> implements Publisher<T> {
private final Publisher<T> upstream;
private final Mono<Long> delay;
public DelayCancelPublisher(Publisher<T> upstream, Duration duration) {
this.upstream = upstream;
this.delay = Mono.delay(duration);
}
public static <T> Flux<T> delayCancel(Publisher<T> upstream, Duration duration) {
return Flux.from(new DelayCancelPublisher<>(upstream, duration));
}
@Override
public void subscribe(Subscriber<? super T> s) {
upstream.subscribe(new DelayCancelSubscriber<>(s, delay));
}
private static class DelayCancelSubscriber<T> implements Subscriber<T> {
private final Subscriber<? super T> subscriber;
private final Mono<Long> delay;
private final AtomicBoolean cancelled = new AtomicBoolean();
public DelayCancelSubscriber(Subscriber<? super T> subscriber, Mono<Long> delay) {
this.subscriber = subscriber;
this.delay = delay;
}
@Override
public void onSubscribe(Subscription upstream) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
upstream.request(n);
}
@Override
public void cancel() {
if (cancelled.compareAndSet(false, true)) {
delay.subscribe(v -> upstream.cancel());
}
}
});
}
@Override
public void onNext(T t) {
if (!cancelled.get()) {
subscriber.onNext(t);
}
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment