Skip to content

Instantly share code, notes, and snippets.

@gjesse
Last active January 5, 2016 20:11
Show Gist options
  • Save gjesse/72a478e5b8248861dd2c to your computer and use it in GitHub Desktop.
Save gjesse/72a478e5b8248861dd2c to your computer and use it in GitHub Desktop.
package com.boundary.sargon.rx.operators;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.internal.operators.BufferUntilSubscriber;
import rx.subscriptions.Subscriptions;
import java.util.Objects;
/**
* Windows an observable, with boundaries determined when the
* result of the keySelector changes.
*
* windows are *eagerly* dispatched, that is
* the result of the keySelector changes, the current item is included in the current
* window, which is then immediately closed!
*
* A new window is then created when the next item is received
*
* heavily borrowed from rx.internal.operators.OperatorWindowWithSize
*
* @param <T>
* @param <U>
*/
public class OperatorWindowUntilChanged<T, U> implements Observable.Operator<Observable<T>, T> {
private final Func1<T, U> keySelector;
public OperatorWindowUntilChanged(Func1<T, U> keySelector) {
this.keySelector = keySelector;
}
@Override
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
EagerSubscriber e = new EagerSubscriber(child);
e.init();
return e;
}
/**
* creates exact/non overlapping window bounds
* that are eagerly dispatched
* */
final class EagerSubscriber extends Subscriber<T> {
private final Subscriber<? super Observable<T>> child;
private BufferUntilSubscriber<T> window;
private boolean noWindow = true;
private U previousKey = null;
private boolean hasPrevious;
public EagerSubscriber(Subscriber<? super Observable<T>> child) {
/**
* See https://github.com/ReactiveX/RxJava/issues/1546
* We cannot compose through a Subscription because unsubscribing
* applies to the outer, not the inner.
*/
this.child = child;
/*
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
*/
}
void init() {
child.add(Subscriptions.create(() -> {
// if no window we unsubscribe up otherwise wait until window ends
if (noWindow) {
unsubscribe();
}
}));
}
@Override
public void onNext(T t) {
final U currentKey = previousKey;
final U key = keySelector.call(t);
previousKey = key;
if (window == null) {
createWindow();
}
window.onNext(t);
if (hasPrevious && !Objects.equals(key, currentKey)) {
window.onCompleted();
window = null;
}
hasPrevious = true;
}
private void createWindow() {
noWindow = false;
window = BufferUntilSubscriber.create();
child.onNext(window);
}
@Override
public void onError(Throwable e) {
if (window != null) {
window.onError(e);
}
child.onError(e);
}
@Override
public void onCompleted() {
if (window != null) {
window.onCompleted();
}
child.onCompleted();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment