Created
February 14, 2019 23:05
-
-
Save abachar/15bc25e5ff74b9d3f0f642efda116c78 to your computer and use it in GitHub Desktop.
Reactive Batch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package consulting.crafters.reactive.batch.reactivebatch; | |
import lombok.AllArgsConstructor; | |
import lombok.Getter; | |
import lombok.Value; | |
import lombok.val; | |
import org.assertj.core.util.Lists; | |
import org.junit.Test; | |
import reactor.core.publisher.Flux; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.function.Function; | |
public class ReactiveBatchApplicationTest { | |
@Test | |
public void should_do_something() { | |
val dealStream = Flux.just( | |
new Deal("L1", 1, "Deal L1 V1"), | |
new Deal("L1", 2, "Deal L1 V2"), | |
new Deal("L1", 3, "Deal L1 V2"), | |
new Deal("L1", 4, "Deal L1 V4"), | |
new Deal("L2", 1, "Deal L2 V1"), | |
new Deal("L3", 1, "Deal L3 V1"), | |
new Deal("L3", 2, "Deal L3 V2"), | |
new Deal("L3", 3, "Deal L3 V3"), | |
new Deal("L3", 4, "Deal L3 V4") | |
); | |
Function<Flux<Deal>, Flux<List<Deal>>> bufferUntilDealCodeChanged = f -> | |
f.publish(q -> q.buffer(q.distinctUntilChanged(d -> d.dealCode))); | |
dealStream | |
.compose(bufferUntilDealCodeChanged) | |
.map(versions -> Lists.newArrayList( | |
Flux.fromIterable(versions) | |
.distinctUntilChanged(d -> d.name) | |
.toIterable() | |
)) | |
.subscribe(System.out::println); | |
} | |
@Getter | |
@AllArgsConstructor | |
private static class Deal { | |
private String dealCode; | |
private Integer version; | |
private String name; | |
@Override | |
public String toString() { | |
return "Deal(" + dealCode + ", " + version + ", " + name + ")"; | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment