Created
December 21, 2016 21:15
-
-
Save xsveda/8c556516079fde97d04b4b7e14a18463 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed | |
* to any other Observer. | |
* <p> | |
* This relay holds an unbounded internal buffer. | |
* <p> | |
* This relay allows only a single Observer at a time to be subscribed to it. | |
* <p> | |
* If more than one Observer attempts to subscribe to this Relay at the same time, they | |
* will receive an IllegalStateException. | |
* | |
* @param <T> the value type received and emitted by this Relay subclass | |
*/ | |
public final class CacheRelay<T> extends Relay<T> { | |
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); | |
private final PublishRelay<T> relay = PublishRelay.create(); | |
private CacheRelay() { | |
} | |
public static <T> CacheRelay<T> create() { | |
return new CacheRelay<>(); | |
} | |
@Override | |
public void accept(T value) { | |
if (relay.hasObservers()) { | |
relay.accept(value); | |
} else { | |
queue.add(value); | |
} | |
} | |
@Override | |
public boolean hasObservers() { | |
return relay.hasObservers(); | |
} | |
@Override | |
protected void subscribeActual(Observer<? super T> observer) { | |
if (hasObservers()) { | |
EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer); | |
} else { | |
for (T element; (element = queue.poll()) != null; ) { | |
observer.onNext(element); | |
} | |
relay.subscribeActual(observer); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class CacheRelayTest { | |
private static final Object VALUE_1 = new Object(); | |
private static final Object VALUE_2 = new Object(); | |
private static final Object VALUE_3 = new Object(); | |
private static final Object VALUE_4 = new Object(); | |
CacheRelay<Object> relay = CacheRelay.create(); | |
@Test(expected = NullPointerException.class) | |
public void shouldThrowNPEForNullValueOnAcceptWhenHasObserver() { | |
relay.subscribe(); | |
relay.accept(null); | |
} | |
@Test(expected = NullPointerException.class) | |
public void shouldThrowNPEForNullValueOnAcceptWhenHasNoObserver() { | |
relay.accept(null); | |
} | |
@Test | |
public void shouldThrowISEWhenSecondObserverSubscribesAndFirstStillSubscribed() { | |
relay.subscribe(); | |
relay.test().assertError(IllegalStateException.class); | |
} | |
@Test | |
public void shouldCacheValuesAndEmitThemOnSubscribe() { | |
relay.accept(VALUE_1); | |
relay.accept(VALUE_2); | |
relay.test().assertValues(VALUE_1, VALUE_2); | |
} | |
@Test | |
public void shouldNotEmitCachedValuesToSecondObserverAfterFirstDisposes() { | |
relay.accept(VALUE_1); | |
relay.accept(VALUE_2); | |
relay.test().assertValues(VALUE_1, VALUE_2).dispose(); | |
relay.test().assertNoValues(); | |
} | |
@Test | |
public void shouldEmitValuesToSubscribedObserver() { | |
TestObserver<Object> observer = relay.test(); | |
relay.accept(VALUE_1); | |
relay.accept(VALUE_2); | |
observer.assertValues(VALUE_1, VALUE_2); | |
} | |
@Test | |
public void shouldEmitValuesToSubscribedObserverAfterCachedValues() { | |
relay.accept(VALUE_1); | |
relay.accept(VALUE_2); | |
TestObserver<Object> observer = relay.test(); | |
relay.accept(VALUE_3); | |
relay.accept(VALUE_4); | |
observer.assertValues(VALUE_1, VALUE_2, VALUE_3, VALUE_4); | |
} | |
@Test | |
public void shouldNotEmitStandardValuesToSecondObserverAfterFirstDisposes() { | |
Disposable disposable = relay.subscribe(); | |
relay.accept(VALUE_1); | |
relay.accept(VALUE_2); | |
disposable.dispose(); | |
relay.test().assertNoValues(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment