Skip to content

Instantly share code, notes, and snippets.

@xsveda
Created December 21, 2016 21:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xsveda/8c556516079fde97d04b4b7e14a18463 to your computer and use it in GitHub Desktop.
Save xsveda/8c556516079fde97d04b4b7e14a18463 to your computer and use it in GitHub Desktop.
/**
* Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed
* to any other Observer.
* <p>
* This relay holds an unbounded internal buffer.
* <p>
* This relay allows only a single Observer at a time to be subscribed to it.
* <p>
* If more than one Observer attempts to subscribe to this Relay at the same time, they
* will receive an IllegalStateException.
*
* @param <T> the value type received and emitted by this Relay subclass
*/
public final class CacheRelay<T> extends Relay<T> {
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final PublishRelay<T> relay = PublishRelay.create();
private CacheRelay() {
}
public static <T> CacheRelay<T> create() {
return new CacheRelay<>();
}
@Override
public void accept(T value) {
if (relay.hasObservers()) {
relay.accept(value);
} else {
queue.add(value);
}
}
@Override
public boolean hasObservers() {
return relay.hasObservers();
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (hasObservers()) {
EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
} else {
for (T element; (element = queue.poll()) != null; ) {
observer.onNext(element);
}
relay.subscribeActual(observer);
}
}
}
public final class CacheRelayTest {
private static final Object VALUE_1 = new Object();
private static final Object VALUE_2 = new Object();
private static final Object VALUE_3 = new Object();
private static final Object VALUE_4 = new Object();
CacheRelay<Object> relay = CacheRelay.create();
@Test(expected = NullPointerException.class)
public void shouldThrowNPEForNullValueOnAcceptWhenHasObserver() {
relay.subscribe();
relay.accept(null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNPEForNullValueOnAcceptWhenHasNoObserver() {
relay.accept(null);
}
@Test
public void shouldThrowISEWhenSecondObserverSubscribesAndFirstStillSubscribed() {
relay.subscribe();
relay.test().assertError(IllegalStateException.class);
}
@Test
public void shouldCacheValuesAndEmitThemOnSubscribe() {
relay.accept(VALUE_1);
relay.accept(VALUE_2);
relay.test().assertValues(VALUE_1, VALUE_2);
}
@Test
public void shouldNotEmitCachedValuesToSecondObserverAfterFirstDisposes() {
relay.accept(VALUE_1);
relay.accept(VALUE_2);
relay.test().assertValues(VALUE_1, VALUE_2).dispose();
relay.test().assertNoValues();
}
@Test
public void shouldEmitValuesToSubscribedObserver() {
TestObserver<Object> observer = relay.test();
relay.accept(VALUE_1);
relay.accept(VALUE_2);
observer.assertValues(VALUE_1, VALUE_2);
}
@Test
public void shouldEmitValuesToSubscribedObserverAfterCachedValues() {
relay.accept(VALUE_1);
relay.accept(VALUE_2);
TestObserver<Object> observer = relay.test();
relay.accept(VALUE_3);
relay.accept(VALUE_4);
observer.assertValues(VALUE_1, VALUE_2, VALUE_3, VALUE_4);
}
@Test
public void shouldNotEmitStandardValuesToSecondObserverAfterFirstDisposes() {
Disposable disposable = relay.subscribe();
relay.accept(VALUE_1);
relay.accept(VALUE_2);
disposable.dispose();
relay.test().assertNoValues();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment