Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Created April 7, 2017 15:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IgorBerman/3c2a3392586b81fa7928f31ee5ec6c37 to your computer and use it in GitHub Desktop.
Save IgorBerman/3c2a3392586b81fa7928f31ee5ec6c37 to your computer and use it in GitHub Desktop.
Substreams with order and dispatcher, instead of ConsistentHashingPool with ask+timeout with blocking processing
//without backpressure:
ActorRef myActor = system.actorOf(
new akka.routing.ConsistentHashingPool(maxParallelism)
.withHashMapper(hashMapper)
.props(MyActor.props(blLogic).withDispatcher("my-dispatcher"))
, "my-ordered-processor-pool");
final Timeout timeout = Timeout.apply(1000L, TimeUnit.MILLISECONDS);
Flow.of(Event.class)
.mapAsyncUnordered(maxParallelism, (Event e) -> ask(myActor, e, timeout));
//First try
Flow.of(Event.class)
.groupBy(maxParallelism, e -> Math.abs((long)hashMapper.hashKey(e)) % maxParallelism)
.map(e -> bLogic(e))
.mergeSubstreams()
.withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.async();
//define async and dispatcher on substream: Both parallelism and backpressure
Flow.of(Event.class)
.groupBy(maxParallelism, e -> Math.abs((long)hashMapper.hashKey(e)) % maxParallelism)
.map(e -> bLogic(e))
.withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.async()
.mergeSubstreams();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment