Skip to content

Instantly share code, notes, and snippets.

@tantaman
Last active August 29, 2015 13:59
Show Gist options
  • Save tantaman/10798202 to your computer and use it in GitHub Desktop.
Save tantaman/10798202 to your computer and use it in GitHub Desktop.
Rx Observable that doesn't start publishing until it has a subscriber. Observers that subscribe after the first subscriber only see items that are emitted after they have subscribed.
public class Temp {
public static final ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
public static void main(String[] args) throws InterruptedException {
final ConnectableObservable<Long> source = Observable.create(new OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> t1) {
final AtomicInteger n = new AtomicInteger(0);
s.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
t1.onNext((long)n.incrementAndGet());
}
}, 1, 1, TimeUnit.SECONDS);
}
}).publish();
Observable<Long> result = Observable.create(new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> arg0) {
source.subscribe(arg0);
source.connect();
}
});
result.subscribe(new Action1<Long>() {
@Override
public void call(Long t1) {
System.out.println("A" + t1);
}
});
Thread.sleep(3000);
result.subscribe(new Action1<Long>() {
@Override
public void call(Long t1) {
System.out.println("B" + t1);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment