Created
April 21, 2021 16:59
-
-
Save Marthym/519ccdc0e3d65a978804d7652d0b8036 to your computer and use it in GitHub Desktop.
Split Reactor Flux
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fr.ght1pc9kc.baywatch; | |
import lombok.AllArgsConstructor; | |
import lombok.Value; | |
import org.junit.jupiter.api.Test; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Sinks; | |
import reactor.test.StepVerifier; | |
import java.util.List; | |
public class SplitReactorFluxTest { | |
@Test | |
void should_split_flux_with_groupby() { | |
Flux<String> actual = Flux.fromIterable(List.of( | |
CsvLine.of("DELETE", "1", "data 1"), | |
CsvLine.of("UPDATE", "2", "data 2"), | |
CsvLine.of("UPDATE", "3", "data 3"), | |
CsvLine.of("INSERT", "4", "data 4"), | |
CsvLine.of("DELETE", "5", "data 5") | |
)).groupBy(l -> l.type).flatMap(gf -> { | |
switch (gf.key()) { | |
case "DELETE": | |
return delete(gf); | |
case "UPDATE": | |
return update(gf); | |
case "INSERT": | |
return insert(gf); | |
default: | |
return Flux.error(new IllegalArgumentException("Unknown key " + gf.key())); | |
} | |
}); | |
StepVerifier.create(actual) | |
.expectNext( | |
"Delete data 1", | |
"Update data 2", | |
"Update data 3", | |
"Insert data 4", | |
"Delete data 5" | |
).expectComplete() | |
.verify(); | |
} | |
@Test | |
void should_split_flux_with_sinks() { | |
Sinks.Many<CsvLine> toDelete = Sinks.many().unicast().onBackpressureBuffer(); | |
Sinks.Many<CsvLine> toInsert = Sinks.many().unicast().onBackpressureBuffer(); | |
Sinks.Many<CsvLine> toUpdate = Sinks.many().unicast().onBackpressureBuffer(); | |
Flux<String> deleted = delete(toDelete.asFlux()); | |
Flux<String> inserted = insert(toInsert.asFlux()); | |
Flux<String> updated = update(toUpdate.asFlux()); | |
Flux<String> actual = Flux.fromIterable(List.of( | |
CsvLine.of("DELETE", "1", "data 1"), | |
CsvLine.of("UPDATE", "2", "data 2"), | |
CsvLine.of("UPDATE", "3", "data 3"), | |
CsvLine.of("INSERT", "4", "data 4"), | |
CsvLine.of("DELETE", "5", "data 5") | |
)).doOnNext(csvLine -> { | |
switch (csvLine.type) { | |
case "DELETE": | |
toDelete.tryEmitNext(csvLine); | |
break; | |
case "UPDATE": | |
toUpdate.tryEmitNext(csvLine); | |
break; | |
case "INSERT": | |
toInsert.tryEmitNext(csvLine); | |
break; | |
default: | |
Flux.error(new IllegalArgumentException("Unknown key " + csvLine.type)); | |
} | |
}).doOnComplete(() -> { | |
toDelete.tryEmitComplete(); | |
toUpdate.tryEmitComplete(); | |
toInsert.tryEmitComplete(); | |
}).thenMany(Flux.merge(deleted, inserted, updated)); | |
StepVerifier.create(actual) | |
.expectNext( | |
"Delete data 1", | |
"Delete data 5", | |
"Insert data 4", | |
"Update data 2", | |
"Update data 3" | |
).expectComplete() | |
.verify(); | |
} | |
public Flux<String> delete(Flux<CsvLine> toBeDeleted) { | |
return toBeDeleted.map(csvLine -> "Delete " + csvLine.data); | |
} | |
public Flux<String> update(Flux<CsvLine> toBeUpdated) { | |
return toBeUpdated.map(csvLine -> "Update " + csvLine.data); | |
} | |
public Flux<String> insert(Flux<CsvLine> toBeInserted) { | |
return toBeInserted.map(csvLine -> "Insert " + csvLine.data); | |
} | |
@Value | |
@AllArgsConstructor(staticName = "of") | |
public static class CsvLine { | |
public final String type; | |
public final String id; | |
public final String data; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment