Skip to content

Instantly share code, notes, and snippets.

@abachar
Created February 14, 2019 23:05
Show Gist options
  • Save abachar/15bc25e5ff74b9d3f0f642efda116c78 to your computer and use it in GitHub Desktop.
Save abachar/15bc25e5ff74b9d3f0f642efda116c78 to your computer and use it in GitHub Desktop.
Reactive Batch
package consulting.crafters.reactive.batch.reactivebatch;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Value;
import lombok.val;
import org.assertj.core.util.Lists;
import org.junit.Test;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
public class ReactiveBatchApplicationTest {
@Test
public void should_do_something() {
val dealStream = Flux.just(
new Deal("L1", 1, "Deal L1 V1"),
new Deal("L1", 2, "Deal L1 V2"),
new Deal("L1", 3, "Deal L1 V2"),
new Deal("L1", 4, "Deal L1 V4"),
new Deal("L2", 1, "Deal L2 V1"),
new Deal("L3", 1, "Deal L3 V1"),
new Deal("L3", 2, "Deal L3 V2"),
new Deal("L3", 3, "Deal L3 V3"),
new Deal("L3", 4, "Deal L3 V4")
);
Function<Flux<Deal>, Flux<List<Deal>>> bufferUntilDealCodeChanged = f ->
f.publish(q -> q.buffer(q.distinctUntilChanged(d -> d.dealCode)));
dealStream
.compose(bufferUntilDealCodeChanged)
.map(versions -> Lists.newArrayList(
Flux.fromIterable(versions)
.distinctUntilChanged(d -> d.name)
.toIterable()
))
// Print
.subscribe(System.out::println);
}
@Getter
@AllArgsConstructor
private static class Deal {
private String dealCode;
private Integer version;
private String name;
@Override
public String toString() {
return "Deal(" + dealCode + ", " + version + ", " + name + ")";
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment