Skip to content

Instantly share code, notes, and snippets.

@sirpy
Created September 26, 2016 18:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sirpy/f11808cc21db0c4d89b36afeb3aebed7 to your computer and use it in GitHub Desktop.
Save sirpy/f11808cc21db0c4d89b36afeb3aebed7 to your computer and use it in GitHub Desktop.
mapasyncunordered doesnt cap parallelism
package sknil.flow.extract
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Source
import scala.concurrent.Future
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
object MapAsyncTest {
val system = ActorSystem()
val threadCount = new AtomicInteger(0)
val threadCount2 = new AtomicInteger(0)
import system.dispatcher
private implicit val materializer = ActorMaterializer.create(ActorMaterializerSettings.create(system).withInputBuffer(1, 1),system)
val src = Source.repeat("x")
val flow = src.mapAsyncUnordered(5)(el => {
Future {
val cur = threadCount.incrementAndGet()
println("Thread Count:"+cur)
Thread.sleep(3000)
el
}
})
val flow2 = Flow[String].mapAsyncUnordered(10)(x => Future {
val cur = threadCount2.incrementAndGet()
println("Thread Count2:"+cur)
Thread.sleep(3000)
x
})
val sink = Sink.foreach[String] { x => threadCount.decrementAndGet();threadCount2.decrementAndGet(); }
val graph = flow.via(flow2).to(sink)
def main(args:Array[String]) {
graph.run()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment