Skip to content

Instantly share code, notes, and snippets.

@mpenick
Created May 17, 2021 17:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mpenick/aec03128af80e668a4676705773cc707 to your computer and use it in GitHub Desktop.
Save mpenick/aec03128af80e668a4676705773cc707 to your computer and use it in GitHub Desktop.
import io.reactivex.Flowable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.immutables.value.Value;
public class Main {
@Value.Immutable
interface Row {
String id();
List<Integer> values();
}
@Value.Immutable
interface ResultSet {
List<Row> rows();
}
public static class Document {
public final String id;
public final List<Row> rows;
private Document(String id, List<Row> rows) {
this.id = id;
this.rows = rows;
}
}
public static class Accumulator {
public final String id;
public final List<Row> rows;
private Accumulator(String id) {
this.id = id;
this.rows = new ArrayList<>();
}
private Document toDoc() {
return new Document(id, rows);
}
}
public static void main(String args[]) {
Flowable<ResultSet> results = Flowable.just(
ImmutableResultSet.builder().rows(Arrays.asList(
ImmutableRow.builder().id("a").values(Arrays.asList(1, 2)).build(),
ImmutableRow.builder().id("a").values(Arrays.asList(7, 8)).build(),
ImmutableRow.builder().id("b").values(Arrays.asList(1, 2)).build()
)).build(),
ImmutableResultSet.builder().rows(Arrays.asList(
ImmutableRow.builder().id("a").values(Arrays.asList(3, 4)).build(),
ImmutableRow.builder().id("b").values(Arrays.asList(3, 4)).build()
)).build(),
ImmutableResultSet.builder().rows(Arrays.asList(
ImmutableRow.builder().id("a").values(Arrays.asList(5, 6)).build(),
ImmutableRow.builder().id("b").values(Arrays.asList(5, 6)).build()
)).build());
results
.collect(HashMap<String, Accumulator>::new, (map, rs) -> {
for (Row row : rs.rows()) {
map.computeIfAbsent(row.id(), Accumulator::new).rows.add(row);
}
})
.map(HashMap::values)
.flattenAsFlowable(a -> a)
.map(Accumulator::toDoc)
.subscribe(d -> {
System.out.println(d.id);
System.out.println(d.rows);
}).dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment