Skip to content

Instantly share code, notes, and snippets.

@diyan
Created December 8, 2020 23:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diyan/3aa3964ea6cd823d5984525482f520c1 to your computer and use it in GitHub Desktop.
Save diyan/3aa3964ea6cd823d5984525482f520c1 to your computer and use it in GitHub Desktop.
package com.example.akka.stream
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.javadsl.Balance
import akka.stream.javadsl.Flow
import akka.stream.javadsl.GraphDSL
import akka.stream.javadsl.Merge
// Most likely those classes are useless
// unless .mapAsyncUnordered(workerCount) + .async() can not distribute work across multiple hosts
object BalancerAsync {
fun <In, Out> create(worker: Flow<In, Out, NotUsed>, workerCount: Int): Flow<In, Out, NotUsed> {
return Flow.fromGraph(GraphDSL.create { builder ->
// NOTE Implicit waitForAllDownstreams=false at https://github.com/rucek/akka-streams-in-practice-kotlin
// NOTE Explicit waitForAllDownstreams=true at https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#cookbook-balance
val balance = builder.add(Balance.create<In>(workerCount, true))
// val balance = builder.add(Balance.create<In>(workerCount))
val merge = builder.add(Merge.create<Out>(workerCount))
for (i in 0.until(workerCount)) {
// NOTE Sync at https://github.com/rucek/akka-streams-in-practice-kotlin
// NOTE Async at https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#cookbook-balance
val workerStage = builder.add(worker.async())
// val workerStage = builder.add(worker)
builder.from(balance.out(i)).via<Out>(workerStage).toInlet(merge.`in`(i))
}
FlowShape.of(balance.`in`(), merge.out())
})
}
}
object BalancerSync {
fun <In, Out> create(worker: Flow<In, Out, NotUsed>, workerCount: Int): Flow<In, Out, NotUsed> {
return Flow.fromGraph(GraphDSL.create { builder ->
// NOTE Implicit waitForAllDownstreams=false at https://github.com/rucek/akka-streams-in-practice-kotlin
// NOTE Explicit waitForAllDownstreams=true at https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#cookbook-balance
// val balance = builder.add(Balance.create<In>(workerCount, true))
val balance = builder.add(Balance.create<In>(workerCount))
val merge = builder.add(Merge.create<Out>(workerCount))
for (i in 0.until(workerCount)) {
// NOTE Sync at https://github.com/rucek/akka-streams-in-practice-kotlin
// NOTE Async at https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#cookbook-balance
// val workerStage = builder.add(worker.async())
val workerStage = builder.add(worker)
builder.from(balance.out(i)).via<Out>(workerStage).toInlet(merge.`in`(i))
}
FlowShape.of(balance.`in`(), merge.out())
})
}
}
object BalancerPool {
fun <In, Out> create(workers: List<Flow<In, Out, NotUsed>>): Flow<In, Out, NotUsed> {
return Flow.fromGraph(GraphDSL.create { builder ->
// NOTE Implicit waitForAllDownstreams=false at https://github.com/rucek/akka-streams-in-practice-kotlin
// NOTE Explicit waitForAllDownstreams=true at https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#cookbook-balance
val balance = builder.add(Balance.create<In>(workers.count(), true))
// val balance = builder.add(Balance.create<In>(workerCount))
val merge = builder.add(Merge.create<Out>(workers.count()))
for ((index, worker) in workers.withIndex()) {
// NOTE Sync at https://github.com/rucek/akka-streams-in-practice-kotlin
// NOTE Async at https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#cookbook-balance
val workerStage = builder.add(worker.async())
// val workerStage = builder.add(worker)
builder.from(balance.out(index)).via<Out>(workerStage).toInlet(merge.`in`(index))
}
FlowShape.of(balance.`in`(), merge.out())
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment