Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@pakoito
Forked from weefbellington/RxJavaTests.java
Last active November 4, 2015 20:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pakoito/79512353db749c08f9e9 to your computer and use it in GitHub Desktop.
Save pakoito/79512353db749c08f9e9 to your computer and use it in GitHub Desktop.
Filtering and throttling signals by id
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Func3;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
/**
* Created by weefbellington on 11/1/15.
*/
public class RxJavaTests {
@Test
public void testCombineLatest() throws Exception {
PublishSubject<Data<String>> hotSignal = PublishSubject.create();
Data<String> emptyData = new Data<>(-1, "None");
Observable<Data<String>> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
Observable<Data<String>> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
Observable<Data<String>> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);
Observable<String> latestData = Observable.combineLatest(
signal1, signal2, signal3,
(data1, data2, data3) -> mergeData(data1, data2, data3)
);
latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));
Data<String> a = new Data<>(1, "foo");
Data<String> b = new Data<>(3, "bar");
Data<String> c = new Data<>(2, "baz");
Data<String> d = new Data<>(1, "qux");
hotSignal.onNext(a);
Thread.sleep(1500);
hotSignal.onNext(b);
Thread.sleep(1000);
hotSignal.onNext(c);
Thread.sleep(1000);
hotSignal.onNext(d);
Thread.sleep(1000);
}
@Test
public void testScan() throws Exception {
PublishSubject<Data<String>> source = PublishSubject.create();
source.scan(new ConcurrentHashMap<>(), (output, data) -> {
output.put(data.id, data.content);
return output;
}).sample(1, TimeUnit.SECONDS).subscribe(output -> System.out.println(output));
Data<String> a = new Data<>(1, "foo");
Data<String> aa = new Data<>(1, "oof");
Data<String> b = new Data<>(3, "bar");
Data<String> c = new Data<>(2, "baz");
Data<String> d = new Data<>(1, "qux");
source.onNext(a);
source.onNext(aa);
Thread.sleep(1500);
source.onNext(b);
Thread.sleep(1000);
source.onNext(c);
Thread.sleep(1000);
source.onNext(d);
Thread.sleep(1000);
}
private String mergeData(Data...items) {
return String.format("source1:%s,source2:%s,source3:%s", items);
}
private static class Data<T> {
public final int id;
public final T content;
public Data(int id, T content) {
this.id = id;
this.content = content;
}
public static <T> Data<T> empty(T defaultContent) {
return new Data<>(-1, defaultContent);
}
@Override
public String toString() {
return "Data{" +
"id=" + id +
", content=" + content +
'}';
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment