Skip to content

Instantly share code, notes, and snippets.

@perezd
Created February 21, 2016 19:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save perezd/eaaac8b6fe7bfbf2266c to your computer and use it in GitHub Desktop.
Save perezd/eaaac8b6fe7bfbf2266c to your computer and use it in GitHub Desktop.
Observable.merge(FetchBz2Observable.create(context, TYPES_ENDPOINT),
FetchBz2Observable.create(context, LOCATIONS_ENDPOINT))
.observeOn(Schedulers.io())
.flatMap(new Func1<Response, Observable<Row>>() {
@Override
public Observable<Row> call(Response response) {
return CsvRowEmitterObservable.create(response.reader(), response.endpoint());
}
})
.window(100)
.flatMap(new Func1<Observable<Row>, Observable<Map<String, List<Row>>>>() {
@Override
public Observable<Map<String, List<Row>>> call(Observable<Row> window) {
return window
.groupBy(new Func1<Row, String>() {
@Override
public String call(Row row) {
return row.id();
}
})
.flatMap(new Func1<GroupedObservable<String, Row>, Observable<Map<String, List<Row>>>>() {
@Override
public Observable<Map<String, List<Row>>> call(final GroupedObservable<String, Row> grp) {
return grp
.collect(new Func0<List<Row>>() {
@Override
public List<Row> call() {
return new ArrayList<>();
}
},
new Action2<List<Row>, Row>() {
@Override
public void call(List<Row> acc, Row row) {
acc.add(row);
}
})
.map(new Func1<List<Row>, Map<String, List<Row>>>() {
@Override
public Map<String, List<Row>> call(List<Row> rows) {
return ImmutableMap.of(grp.getKey(), rows);
}
});
}
});
}
})
.forEach(new Action1<Map<String, List<Row>>>() {
@Override
public void call(Map<String, List<Row>> chunk) {
for (String key : chunk.keySet()) {
Log.d(TAG, String.format("%s->%d", key, chunk.get(key).size()));
}
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment