Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save artembilan/9be2da11539c88bd746e3cdd5dbbaa9a to your computer and use it in GitHub Desktop.
Save artembilan/9be2da11539c88bd746e3cdd5dbbaa9a to your computer and use it in GitHub Desktop.
private static class SubscribableChannelPublisherAdapter
implements Publisher<Message<?>>, Subscriber<Message<?>>, Subscription {
private final DirectProcessor<Message<?>> delegate = DirectProcessor.create();
private final MessageHandler subscriberAdapter = this.delegate.connectEmitter()::accept;
private final SubscribableChannel channel;
private Subscriber<? super Message<?>> actualSubscriber;
private Subscription actualSubscription;
private SubscribableChannelPublisherAdapter(SubscribableChannel channel) {
this.channel = channel;
}
@Override
public void subscribe(Subscriber<? super Message<?>> subscriber) {
this.actualSubscriber = subscriber;
this.delegate.subscribe(this);
this.channel.subscribe(this.subscriberAdapter);
}
@Override
public void onSubscribe(Subscription subscription) {
this.actualSubscription = subscription;
this.actualSubscriber.onSubscribe(this);
}
@Override
public void onNext(Message<?> message) {
this.actualSubscriber.onNext(message);
}
@Override
public void onError(Throwable t) {
this.actualSubscriber.onError(t);
}
@Override
public void onComplete() {
this.actualSubscriber.onComplete();
}
@Override
public void request(long n) {
this.actualSubscription.request(n);
}
@Override
public void cancel() {
this.channel.unsubscribe(this.subscriberAdapter);
this.actualSubscription.cancel();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment