Skip to content

Instantly share code, notes, and snippets.

@loganmzz
Created March 2, 2017 15:07
Show Gist options
  • Save loganmzz/8bdc88bf8decc1bc3898d021a3678fd2 to your computer and use it in GitHub Desktop.
Save loganmzz/8bdc88bf8decc1bc3898d021a3678fd2 to your computer and use it in GitHub Desktop.
Reactor - Task dispatching example
package com.github.loganmzz.labo.reactor.samples;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.Date;
import java.util.concurrent.ThreadLocalRandom;
public class ReactiveTaskDispatch {
public static void log(String pattern, Object... args) {
System.out.printf("%s - %s - %s%n", new Date().toInstant(), Thread.currentThread().getName(), String.format(pattern, args));
}
public static void main(String[] args) {
Flux<Long> emitter = Flux
.intervalMillis(200)
.map(wait -> ThreadLocalRandom.current().nextLong(800, 1000))
.take(4)
;
Scheduler scheduler = Schedulers.newParallel("Processor", 2);
ParallelFlux<String> processor = emitter
.parallel()
.runOn(scheduler)
.map(wait -> {
log("Waiting %dms", wait);
try {
Thread.sleep(wait);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return Thread.currentThread().getName() + ":" + wait;
});
for (int i = 0; i < 2; i++) {
int clientId = i;
processor.subscribe(message -> {
log("Client#%d received: %s", clientId, message);
});
}
processor.sequential().blockLast();
scheduler.dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment