Skip to content

Instantly share code, notes, and snippets.

@parthmistry
Created November 30, 2022 09:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parthmistry/c1e22cb638130c3f9ba81e6390b987bd to your computer and use it in GitHub Desktop.
Save parthmistry/c1e22cb638130c3f9ba81e6390b987bd to your computer and use it in GitHub Desktop.
ReactiveProcessData
import reactor.core.publisher.Flux;
class ReactiveProcessData {
private static ElapsedTimeMonitor elapsedTimeMonitor;
public static void main(String[] args) throws Exception {
int prefetchSize = 100;
var enrichedPersonDataIterable = PrefetchDemoUtil.getR2dbcConnection().flatMapMany(connection -> {
elapsedTimeMonitor = new ElapsedTimeMonitor();
return Flux.from(connection.createStatement("select * from persons")
.fetchSize(100)
.execute()
).flatMap(result -> {
return result.map((row, metadata) -> {
return new PersonData(row.get("id", Integer.class), row.get("name", String.class));
});
});
}).flatMap(ReactivePersonDataService::getEnrichedPersonData, 50, prefetchSize).toIterable();
for (var enrichedPersonData : enrichedPersonDataIterable) {
System.out.println(enrichedPersonData.id() + " - " + enrichedPersonData.detail() + " -- " + elapsedTimeMonitor.getElapsedTimeMillis());
}
System.out.println("total duration: " + elapsedTimeMonitor.getElapsedTimeMillis());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment