Skip to content

Instantly share code, notes, and snippets.

@schauder
Last active March 24, 2017 10:56
Show Gist options
  • Save schauder/3f63f9f68e4dbe27c962f27ed31c4ca5 to your computer and use it in GitHub Desktop.
Save schauder/3f63f9f68e4dbe27c962f27ed31c4ca5 to your computer and use it in GitHub Desktop.
Reactor Thingies
// Do not use this: since the ChangeTrigger has state it can fail in all kinds of funny ways !!!!!
// similar to Flux::groupBy but closes the GroupedFlux whenever a different key value appears potentially opening another one for the same value later.
@Test
public void groupOnSwitch() {
StepVerifier
.create(
groupOnSwitch(
Flux.just("one", "two", "twenty", "tissue", "berta", "blot", "thousand"),
s -> s.substring(0, 1))
.flatMap(Flux::materialize)
.map(s -> s.isOnComplete() ? "WINDOW CLOSED" : s.get())
)
.expectNext("one", "WINDOW CLOSED")
.expectNext("two", "twenty", "tissue", "WINDOW CLOSED")
.expectNext("berta", "blot", "WINDOW CLOSED")
.expectNext("thousand", "WINDOW CLOSED")
.verifyComplete();
}
@Test
public void groupOnSwitchKeys() {
Flux<GroupedFlux<String, String>> fluxOfGroupedFluxes = groupOnSwitch(
Flux.just("one", "two", "twenty", "tissue", "berta", "blot", "thousand"),
s -> s.substring(0, 1));
StepVerifier.create(
fluxOfGroupedFluxes.map(gf -> gf.key())
)
.expectNext("o", "t", "b", "t")
.verifyComplete();
}
private static <T, X> Flux<GroupedFlux<X, T>> groupOnSwitch(Flux<T> flux, Function<T, X> keyFunction) {
ChangeTrigger changeTrigger = new ChangeTrigger();
Flux<GroupedFlux<T, T>> fluxOfGroupedFluxes = flux.windowUntil(l -> changeTrigger.test(keyFunction.apply(l)), true);
return fluxOfGroupedFluxes.flatMap(gf -> gf.groupBy(t -> keyFunction.apply(gf.key())));
}
private static class ChangeTrigger<T> {
T last = null;
boolean test(T value) {
boolean startNew = !Objects.equals(last, value);
last = value;
System.out.println(String.format("%s, %s", value, startNew));
return startNew;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment