-
-
Save TanUkkii007/f3b5784df0f8f7c3b144c38279455a09 to your computer and use it in GitHub Desktop.
package streams | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
import com.typesafe.config.ConfigFactory | |
object StreamDispatchers extends App { | |
implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
""" | |
another { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
yet-another { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
mine { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
""")) | |
implicit val materializer = ActorMaterializer() | |
val source = Source.single(1).flatMapConcat { _ => | |
Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine")) | |
} | |
.map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
.map { n => println("expected another: " + Thread.currentThread().getName); n } | |
.addAttributes(ActorAttributes.dispatcher("another")) | |
.map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
.addAttributes(ActorAttributes.dispatcher("yet-another")) | |
.runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
} | |
class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
val out: Outlet[Int] = Outlet("TestStage") | |
override def shape: SourceShape[Int] = SourceShape(out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
var i = 0 | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
i += 1 | |
println("expected mine: " + Thread.currentThread().getName) | |
if (i == n) | |
complete(out) | |
else | |
push(out, i) | |
} | |
}) | |
} | |
} |
package streams | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
import com.typesafe.config.ConfigFactory | |
object StreamDispatchers extends App { | |
implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
""" | |
another { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
yet-another { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
mine { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
""")) | |
implicit val materializer = ActorMaterializer() | |
val source = Source.single(1).flatMapConcat { _ => | |
Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine")).async //Just added async to above example | |
} | |
.map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
.map { n => println("expected another: " + Thread.currentThread().getName); n } | |
.addAttributes(ActorAttributes.dispatcher("another")) | |
.map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
.addAttributes(ActorAttributes.dispatcher("yet-another")) | |
.runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
} | |
class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
val out: Outlet[Int] = Outlet("TestStage") | |
override def shape: SourceShape[Int] = SourceShape(out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
var i = 0 | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
i += 1 | |
println("expected mine: " + Thread.currentThread().getName) | |
if (i == n) | |
complete(out) | |
else | |
push(out, i) | |
} | |
}) | |
} | |
} |
package streams | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
import com.typesafe.config.ConfigFactory | |
object StreamDispatchers extends App { | |
implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
""" | |
another { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
yet-another { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
mine { | |
type = Dispatcher | |
executor = "thread-pool-executor" | |
thread-pool-executor { | |
fixed-pool-size = 1 | |
} | |
} | |
""")) | |
implicit val materializer = ActorMaterializer() | |
val source = Source.single(1).flatMapConcat { _ => | |
Source.fromGraph(new TestStage(20)) | |
}.withAttributes(ActorAttributes.dispatcher("mine")) // dispatcher attribute attatched to flatMapConcat stage | |
.map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
.map { n => println("expected another: " + Thread.currentThread().getName); n } | |
.addAttributes(ActorAttributes.dispatcher("another")) | |
.map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
.addAttributes(ActorAttributes.dispatcher("yet-another")) | |
.runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
} | |
class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
val out: Outlet[Int] = Outlet("TestStage") | |
override def shape: SourceShape[Int] = SourceShape(out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
var i = 0 | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
i += 1 | |
println("expected mine: " + Thread.currentThread().getName) | |
if (i == n) | |
complete(out) | |
else | |
push(out, i) | |
} | |
}) | |
} | |
} |
//example output from StreamDispatchersWithFlatMapConcatAndAsyncBoundary.scala
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
// output fom StreamDispatchersWithFlatMapConcatWithDispathcer.scala
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
//example output from StreamDispatchersWithFlatMapConcat.scala
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4