Skip to content

Instantly share code, notes, and snippets.

@hochgi
Last active June 13, 2016 09:07
Show Gist options
  • Save hochgi/596050e7249d2dd0118d7c0a77e1056b to your computer and use it in GitHub Desktop.
Save hochgi/596050e7249d2dd0118d7c0a77e1056b to your computer and use it in GitHub Desktop.
PartitionWith measure
name := "measure-partition-with"
scalaVersion := "2.11.8"
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/releases"
libraryDependencies ++= Seq(
"com.storm-enroute" %% "scalameter" % "0.7",
"com.typesafe.akka" %% "akka-stream" % "2.4.7"
)
cancelable in Global := true
testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework")
parallelExecution in Test := false
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import com.typesafe.config.ConfigFactory
import org.scalameter._
import scala.concurrent._ ,duration._
import scala.concurrent.ExecutionContext.Implicits.global
object PartitionWithMeter {
object PartitionWith {
def getGraph[In, Out0, Out1](f: In => Either[Out0, Out1]) = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val map0 = b.add(Flow[In].map(f andThen {_.left.get}))
val map1 = b.add(Flow[In].map(f andThen {_.right.get}))
val part = b.add(Partition[In](2,{ i =>
f(i).fold(_ => 0,_ => 1)
}))
part.out(0) ~> map0
part.out(1) ~> map1
new FanOutShape2(part.in, map0.out, map1.out)
}
def getGraphStage[In, Out0, Out1](p: In => Either[Out0, Out1]) = new GraphStage[FanOutShape2[In, Out0, Out1]] {
override val shape = new FanOutShape2[In, Out0, Out1]("partitionWith")
override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
import shape._
private var pending: Either[Out0, Out1] = null
setHandler(in, new InHandler {
override def onPush() = {
val elem = grab(in)
p(elem) match {
case Left(o) if isAvailable(out0) =>
push(out0, o)
if (isAvailable(out1))
pull(in)
case Right(o) if isAvailable(out1) =>
push(out1, o)
if (isAvailable(out0))
pull(in)
case either =>
pending = either
}
}
override def onUpstreamFinish() = {
if (pending eq null)
completeStage()
}
})
setHandler(out0, new OutHandler {
override def onPull() = if (pending ne null) pending.left.foreach { o =>
push(out0, o)
if (isClosed(in)) completeStage()
else {
pending = null
if (isAvailable(out1))
pull(in)
}
}
else if (!hasBeenPulled(in)) pull(in)
})
setHandler(out1, new OutHandler {
override def onPull() = if (pending ne null) pending.right.foreach { o =>
push(out1, o)
if (isClosed(in)) completeStage()
else {
pending = null
if (isAvailable(out0))
pull(in)
}
}
else if (!hasBeenPulled(in)) pull(in)
})
}
}
}
val counterStage = new GraphStage[BidiShape[Int, Int, Int, Int]] {
override val shape = BidiShape[Int, Int, Int, Int](Inlet[Int]("counter.source"),Outlet[Int]("counter.sink"),Inlet[Int]("counter.decrement"),Outlet[Int]("counter.complete"))
override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
import shape._
private var count: Int = 0
override def preStart() = pull(in2)
setHandler(in1, new InHandler {
override def onPush() = {
val i = grab(in1)
// println(s"got input=$i and count=$count")
push(out1,i)
count += 1
}
override def onUpstreamFinish() = {
if(count == 0)
completeStage()
}
})
setHandler(in2, new InHandler {
override def onPush() = {
val e = grab(in2)
count -= e
// println(s"decrement e=$e")
if(count == 0 && isClosed(in1)) completeStage()
else pull(in2)
}
})
setHandler(out1, new OutHandler {
override def onPull() =
if(!isClosed(in1))
pull(in1)
})
setHandler(out2, new OutHandler {
override def onPull() = {
println("should not happen!")
push(out2,0)
}
})
}
}
val interceptCompletion = new GraphStage[FanInShape2[Int, Int, Int]] {
override val shape = new FanInShape2[Int, Int, Int]("intercepter")
override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
import shape._
setHandler(in0, new InHandler {
override def onPush() =
push(out,grab(in0))
})
setHandler(in1, new InHandler {
override def onPush() = {
grab(in1)
pull(in1)
}
override def onUpstreamFinish() =
completeStage()
})
setHandler(out, new OutHandler {
override def onPull() =
pull(in0)
})
}
}
val standardConfig = config(
Key.exec.minWarmupRuns -> 10,
Key.exec.maxWarmupRuns -> 100,
Key.exec.benchRuns -> 100,
Key.verbose -> true
) withWarmer(new Warmer.Default)
def main(args: Array[String]) {
implicit val system = {
def config = ConfigFactory.parseString("akka.stream.materializer.auto-fusing=true")
.withFallback(ConfigFactory.load())
ActorSystem("default", config)
}
implicit val mat = ActorMaterializer()
val fun: Int => Either[Int,Int] = {
case 1 => Left(1)
case i if i % 2 == 0 => Right(i / 2)
case i => Right(i * 3 + 1)
}
val graphTime = standardConfig measure {
val f = RunnableGraph.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit b =>
snk =>
import GraphDSL.Implicits._
val src = b.add(Source(1 to 1000))
val cnt = b.add(counterStage)
val itc = b.add(interceptCompletion)
val mrg = b.add(MergePreferred[Int](1))
val prt = b.add(PartitionWith.getGraph(fun))
val bct = b.add(Broadcast[Int](2))
src ~> cnt.in1
cnt.out1 ~> mrg ~> prt.in
cnt.out2 ~> itc.in1
prt.out1 ~> itc.in0
itc.out ~> mrg.preferred
prt.out0 ~> bct ~> snk
bct ~> cnt.in2
ClosedShape
}).run()
Await.ready(f,Duration.Inf)
}
println(s"graphTime: $graphTime ms")
val graphStageTime = standardConfig measure {
val f = RunnableGraph.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit b =>
snk =>
import GraphDSL.Implicits._
val src = b.add(Source(1 to 1000))
val cnt = b.add(counterStage)
val itc = b.add(interceptCompletion)
val mrg = b.add(MergePreferred[Int](1))
val prt = b.add(PartitionWith.getGraphStage(fun))
val bct = b.add(Broadcast[Int](2))
src ~> cnt.in1
cnt.out1 ~> mrg ~> prt.in
cnt.out2 ~> itc.in1
prt.out1 ~> itc.in0
itc.out ~> mrg.preferred
prt.out0 ~> bct ~> snk
bct ~> cnt.in2
ClosedShape
}).run()
Await.ready(f,Duration.Inf)
}
println(s"graphStageTime: $graphStageTime ms")
system.shutdown()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment