Skip to content

Instantly share code, notes, and snippets.

@mikea
Last active September 13, 2015 23:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikea/6493bdccad356297d471 to your computer and use it in GitHub Desktop.
Save mikea/6493bdccad356297d471 to your computer and use it in GitHub Desktop.
LazyGroupBy for RxJava
public static <K, T> Observable.Transformer<T, Pair<K, Observable<T>>> lazyGroupBy(
final Func1<? super T, ? extends K> keySelector) {
return new Observable.Transformer<T, Pair<K, Observable<T>>>() {
@Override
public Observable<Pair<K, Observable<T>>> call(Observable<T> observable) {
return Observable.create(s -> {
Map<K, Subscriber<? super T>> children = new HashMap<>();
Multimap<K, T> buffer = ArrayListMultimap.create();
Set<K> createdObservers = new HashSet<>();
observable.subscribe(new Subscriber<T>() {
boolean completed = false;
@Override
public synchronized void onCompleted() {
children.values().forEach(Observer::onCompleted);
s.onCompleted();
completed = true;
}
@Override
public synchronized void onError(Throwable e) {
for (Subscriber<? super T> child : children.values()) {
child.onError(e);
}
s.onError(e);
}
@Override
public synchronized void onNext(T t) {
Subscriber<T> self = this;
K key = keySelector.call(t);
Subscriber<? super T> subscriber = children.get(key);
if (subscriber != null) {
subscriber.onNext(t);
} else {
buffer.put(key, t);
if (!createdObservers.contains(key)) {
createdObservers.add(key);
Observable<T> childObservable = Observable.create(s -> {
synchronized (self) {
buffer.get(key).forEach(s::onNext);
buffer.removeAll(key);
if (!completed) {
children.put(key, s);
} else {
s.onCompleted();
}
}
});
Pair<K, Observable<T>> pair = new Pair<>(key, childObservable);
s.onNext(pair);
}
}
}
});
});
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment