Skip to content

Instantly share code, notes, and snippets.

@cedricvidal
Last active September 5, 2016 14:51
Show Gist options
  • Save cedricvidal/410b2c0131bae742959f to your computer and use it in GitHub Desktop.
Save cedricvidal/410b2c0131bae742959f to your computer and use it in GitHub Desktop.
RxJava Observables Cartesian Product
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
/**
* Computes the cartesian product of Observables.
*
* @author <a href="mailto:cedric.vidal@quicksign.com">Cedric Vidal, Quicksign</a>
*/
public class ObservablesCartesianProduct {
/**
* Computes the cartesian product of a variable number of observables
*
* @param observables
* @return
*/
public static <T> Observable<Observable<T>> cartesianProduct(Observable<Observable<T>> observables) {
Observable<Observable<T>> head = observables
.take(1) // take first stream of T, this is a Observable<Observable<T>>
.flatMap(new Func1<Observable<T>, Observable<Observable<T>>>() { // wrap each T in a singleton stream
@Override
public Observable<Observable<T>> call(Observable<T> o) {
return o.map(QsObservables.<T>singletonF());
}
});
Observable<Observable<T>> tail = observables.skip(1); // take tail
return Observable.merge(tail.reduce(head, new Func2<Observable<Observable<T>>, Observable<T>, Observable<Observable<T>>>() {
@Override
public Observable<Observable<T>> call(Observable<Observable<T>> i1, Observable<T> i2) {
return doCartesianProduct(i1, i2);
}
}));
}
private static <T> Observable<Observable<T>> doCartesianProduct(Observable<Observable<T>> i1, final Observable<T> i2) {
return i1.flatMap(new Func1<Observable<T>, Observable<Observable<T>>>() {
@Override
public Observable<Observable<T>> call(final Observable<T> s1) {
return i2.map(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T s2) {
return Observable.merge(s1, Observable.from(singleton(s2)));
}
});
}
});
}
public static <T> Observable<Observable<T>> cartesianProduct(Observable<T> i1, final Observable<T> i2) {
return i1.flatMap(new Func1<T, Observable<Observable<T>>>() {
@Override
public Observable<Observable<T>> call(final T s1) {
return i2.map(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T s2) {
return Observable.from(asList(s1, s2));
}
});
}
});
}
private static <T> Func1<T, Observable<T>> singletonF() {
return new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T s) {
return Observable.from(singleton(s));
}
};
}
}
package com.quicksign.pdfmetareader.drools.functions.util;
import com.google.common.collect.Iterables;
import org.junit.Test;
import rx.Observable;
import rx.functions.Func1;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
public class ObservablesCartesianProductTest {
@Test
public void testCartesianProductRxJava() {
Observable<String> i1 = Observable.from(asList("a1", "b1"));
final Observable<String> i2 = Observable.from(asList("a2", "b2"));
Observable<Observable<String>> cartesian = ObservablesCartesianProduct.cartesianProduct(i1, i2);
Iterable<Iterable<String>> cartesianIt = toIterable(cartesian);
assertEquals(4, Iterables.size(cartesianIt));
}
@Test
public void testCartesianProductRxJavaN3() {
Observable<String> i1 = Observable.from(asList("a1", "b1"));
Observable<String> i2 = Observable.from(asList("a2", "b2"));
Observable<String> i3 = Observable.from(asList("a3", "b3"));
Observable<Observable<String>> cartesian = ObservablesCartesianProduct.cartesianProduct(Observable.from(asList(i1, i2, i3)));
Iterable<Iterable<String>> cartesianIt = toIterable(cartesian);
assertEquals(8, Iterables.size(cartesianIt));
}
public static Iterable<Iterable<String>> toIterable(Observable<Observable<String>> cartesian) {
return cartesian.map(new Func1<Observable<String>, Iterable<String>>() {
@Override
public Iterable<String> call(Observable<String> o) {
return o.toBlocking().toIterable();
}
}).toBlocking().toIterable();
}
}
@trevjonez
Copy link

I was playing with this and translated it to kotlin:

import rx.Observable
import rx.functions.Func1
import java.util.Arrays.asList
import java.util.Collections.singleton

fun <T> cartesianProduct(obs1: Observable<T>, obs2: Observable<T>): Observable<Observable<T>> {
  return obs1.flatMap { outerIt -> obs2.map { Observable.from(asList(outerIt, it)) } }
}

fun <T> cartesianProduct(observables: Observable<Observable<T>>): Observable<Observable<T>> {
  val head = observables.take(1).flatMap { it.map(singletonF<T>()) }
  val tail = observables.skip(1)

  return Observable.merge(tail.reduce(head) { i1, i2 -> doCartesianProduct(i1, i2) })
}

fun <T> doCartesianProduct(t1: Observable<Observable<T>>, t2: Observable<T>): Observable<Observable<T>> {
  return t1.flatMap { outerIt -> t2.map { Observable.merge(outerIt, Observable.from(singleton(it))) } }
}

fun <T> singletonF(): Func1<T, Observable<T>> = Func1 { Observable.from(singleton(it)) }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment