Skip to content

Instantly share code, notes, and snippets.

@osi
Created April 20, 2017 20:38
Show Gist options
  • Save osi/47b25c3bd48b760e1c68419049eb7917 to your computer and use it in GitHub Desktop.
Save osi/47b25c3bd48b760e1c68419049eb7917 to your computer and use it in GitHub Desktop.
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Scannable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.FluxProcessor;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
public class HotSwapProcessor<T> extends FluxProcessor<T, T> {
private final DirectProcessor<T> downstream = DirectProcessor.create();
private final AtomicReference<Subscription> upstream = new AtomicReference<>();
@Override
public void onSubscribe(Subscription s) {
Objects.requireNonNull(s, "s");
Subscription prior = upstream.getAndSet(s);
s.request(Long.MAX_VALUE);
if (null != prior) {
prior.cancel();
}
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void subscribe(Subscriber<? super T> s) {
downstream.subscribe(s);
}
@Override
public long getPrefetch() {
return downstream.getPrefetch();
}
@Override
public Stream<? extends Scannable> inners() {
return downstream.inners();
}
@Override
public boolean isTerminated() {
return downstream.isTerminated();
}
@Override
public long downstreamCount() {
return downstream.downstreamCount();
}
@Override
public boolean hasDownstreams() {
return downstream.hasDownstreams();
}
@Override
public Throwable getError() {
return downstream.getError();
}
@Override
public Object scan(Attr key) {
if (key == Attr.PARENT) {
return upstream.get();
}
return downstream.scan(key);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment