Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Last active January 26, 2022 01:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NiteshKant/62a912129ea47b657f7739776b0c3b1f to your computer and use it in GitHub Desktop.
Save NiteshKant/62a912129ea47b657f7739776b0c3b1f to your computer and use it in GitHub Desktop.
Demonstrates ServiceTalk multicast operator issue where if only one subscriber requests data, nothing is delivered
package com.apple.acs.traffic.servicediscovery.insights.aggregator;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.PublisherSource.Processor;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.TerminalNotification;
import javax.annotation.Nullable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static io.servicetalk.concurrent.api.Processors.newPublisherProcessor;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
public class MulticastBug {
public static void main(String[] args) throws InterruptedException {
final Processor<Integer, Integer> processor = newPublisherProcessor(128);
Publisher<Integer> source = fromSource(processor)
.beforeRequest(value -> System.out.println("Requesting upstream: " + value))
.multicast(1);
final IntegerSubscriber sub1 = new IntegerSubscriber();
toSource(source).subscribe(sub1);
final IntegerSubscriber sub2 = new IntegerSubscriber();
toSource(source).subscribe(sub2);
for (int i = 0; i < 5; i++) {
processor.onNext(i);
}
sub1.awaitSubscription.await();
sub1.subscription.request(1);
System.out.println("Awaiting signal for subscriber.");
sub1.signals.take();
System.out.println("Got signal for subscriber.");
}
private static class IntegerSubscriber implements PublisherSource.Subscriber<Integer> {
private static final AtomicInteger counter = new AtomicInteger();
private final int id = counter.incrementAndGet();
private final CountDownLatch awaitSubscription = new CountDownLatch(1);
private PublisherSource.Subscription subscription;
private final LinkedBlockingQueue<Object> signals = new LinkedBlockingQueue<>();
@Override
public void onSubscribe(PublisherSource.Subscription subscription) {
this.subscription = new PublisherSource.Subscription() {
@Override
public void request(long n) {
System.out.println("Subscriber: " + id + " requesting: " + n);
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
};
awaitSubscription.countDown();
}
@Override
public void onNext(@Nullable Integer integer) {
assert integer != null;
signals.offer(integer);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
signals.offer(TerminalNotification.error(t));
}
@Override
public void onComplete() {
signals.offer(TerminalNotification.complete());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment