Skip to content

Instantly share code, notes, and snippets.

@ulyssesdotcodes
Last active August 29, 2015 14:20
Show Gist options
  • Save ulyssesdotcodes/14f913a0144814f5d42b to your computer and use it in GitHub Desktop.
Save ulyssesdotcodes/14f913a0144814f5d42b to your computer and use it in GitHub Desktop.
BufferUntil
public void testBuffer() {
PublishSubject<String> stream = PublishSubject.create();
PublishSubject<Boolean> gate = PublishSubject.create();
ReplaySubject<String> test = ReplaySubject.create();
stream
.lift(TRx.bufferUntil(gate))
.subscribe(test);
List<String> results = new ArrayList<>();
test.subscribe(results::add);
assertEquals(0, results.size());
stream.onNext("one");
assertEquals(0, results.size());
gate.onNext(true);
assertEquals(1, results.size());
stream.onNext("two");
assertEquals(2, results.size());
}
public <T, U> OperatorBufferUntil extends Observable.Operator<T, T>() {
private final Observable<? extends U> other;
public OperatorBufferUntil(final Observable<? extends U> other) {
this.other = other;
}
ReplaySubject<T> mReplaySubject = ReplaySubject.create();
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
final Subscriber<T> buffer = new Subscriber<T>(child, false) {
@Override public void onCompleted() {
try {
serial.onCompleted();
}
finally {
serial.unsubscribe();
}
}
@Override public void onError(Throwable e) {
try {
serial.onError(e);
}
finally {
serial.unsubscribe();
}
}
@Override public void onNext(T t) {
mReplaySubject.onNext(t);
}
};
Subscriber<U> u = new Subscriber<U>() {
@Override public void onCompleted() {
unsubscribe();
}
@Override public void onError(Throwable e) {
serial.onError(e);
unsubscribe();
}
@Override public void onNext(U u) {
mReplaySubject.subscribe(serial);
unsubscribe();
}
};
serial.add(u);
serial.add(buffer);
child.add(serial);
other.unsafeSubscribe(u);
return buffer;
}
}
public <T, U> TransformBufferUntil extends Observable.Operator<T, T>() {
private final Observable<? extends U> other;
ReplaySubject<T> mReplaySubject = ReplaySubject.create();
public TransformBufferUntil(Observable<? extends U> other) {
this.other = other;
}
@Override public Observable<T> call(Observable<T> tObservable) {
tObservable.subscribe(mReplaySubject);
return other.take(1).flatMap(__ -> mReplaySubject);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment