Skip to content

Instantly share code, notes, and snippets.

@TanUkkii007
Last active October 17, 2016 02:57
Show Gist options
  • Save TanUkkii007/f3b5784df0f8f7c3b144c38279455a09 to your computer and use it in GitHub Desktop.
Save TanUkkii007/f3b5784df0f8f7c3b144c38279455a09 to your computer and use it in GitHub Desktop.
flatMapConcat modification of StreamDispatchers.scala from https://gist.github.com/johanandren/d55d022bff39ab65b170d8478219604a
package streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import com.typesafe.config.ConfigFactory
object StreamDispatchers extends App {
implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString(
"""
another {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
yet-another {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
mine {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
"""))
implicit val materializer = ActorMaterializer()
val source = Source.single(1).flatMapConcat { _ =>
Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine"))
}
.map { n => println("expected default (map): " + Thread.currentThread().getName); n }
.map { n => println("expected another: " + Thread.currentThread().getName); n }
.addAttributes(ActorAttributes.dispatcher("another"))
.map { n => println("expected yet-another: " + Thread.currentThread().getName); n }
.addAttributes(ActorAttributes.dispatcher("yet-another"))
.runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName))
}
class TestStage(n: Int) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("TestStage")
override def shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var i = 0
setHandler(out, new OutHandler {
override def onPull(): Unit = {
i += 1
println("expected mine: " + Thread.currentThread().getName)
if (i == n)
complete(out)
else
push(out, i)
}
})
}
}
package streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import com.typesafe.config.ConfigFactory
object StreamDispatchers extends App {
implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString(
"""
another {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
yet-another {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
mine {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
"""))
implicit val materializer = ActorMaterializer()
val source = Source.single(1).flatMapConcat { _ =>
Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine")).async //Just added async to above example
}
.map { n => println("expected default (map): " + Thread.currentThread().getName); n }
.map { n => println("expected another: " + Thread.currentThread().getName); n }
.addAttributes(ActorAttributes.dispatcher("another"))
.map { n => println("expected yet-another: " + Thread.currentThread().getName); n }
.addAttributes(ActorAttributes.dispatcher("yet-another"))
.runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName))
}
class TestStage(n: Int) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("TestStage")
override def shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var i = 0
setHandler(out, new OutHandler {
override def onPull(): Unit = {
i += 1
println("expected mine: " + Thread.currentThread().getName)
if (i == n)
complete(out)
else
push(out, i)
}
})
}
}
package streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import com.typesafe.config.ConfigFactory
object StreamDispatchers extends App {
implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString(
"""
another {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
yet-another {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
mine {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
}
"""))
implicit val materializer = ActorMaterializer()
val source = Source.single(1).flatMapConcat { _ =>
Source.fromGraph(new TestStage(20))
}.withAttributes(ActorAttributes.dispatcher("mine")) // dispatcher attribute attatched to flatMapConcat stage
.map { n => println("expected default (map): " + Thread.currentThread().getName); n }
.map { n => println("expected another: " + Thread.currentThread().getName); n }
.addAttributes(ActorAttributes.dispatcher("another"))
.map { n => println("expected yet-another: " + Thread.currentThread().getName); n }
.addAttributes(ActorAttributes.dispatcher("yet-another"))
.runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName))
}
class TestStage(n: Int) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("TestStage")
override def shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var i = 0
setHandler(out, new OutHandler {
override def onPull(): Unit = {
i += 1
println("expected mine: " + Thread.currentThread().getName)
if (i == n)
complete(out)
else
push(out, i)
}
})
}
}
@TanUkkii007
Copy link
Author

//example output from StreamDispatchersWithFlatMapConcat.scala

expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4

@TanUkkii007
Copy link
Author

//example output from StreamDispatchersWithFlatMapConcatAndAsyncBoundary.scala

expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4

@TanUkkii007
Copy link
Author

// output fom StreamDispatchersWithFlatMapConcatWithDispathcer.scala
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment