-
See
MyThrottle.scala
and itsdef main
method- that tests the throttle
GraphStage
with differentmaxBurst
values elements = 8, per = 2 seconds
are fixed, which result innanosBetweenTokens = 250,000,000
- that tests the throttle
-
Then look at the
output.txt
- you see
*** nanoseconds have passed
messages indicates time lapse almost equal tonanosBetweenTokens = 250,000,000
- and see the differentce between cases with backpressure from downstream and withOUT it
- also you can see how
maxBurst
affects how the first few elements are processed
- you see
Last active
June 24, 2017 14:15
-
-
Save richardimaoka/83d49a08aacc25659cad8e0daca3461d to your computer and use it in GitHub Desktop.
Demonstrate Akka Stream's throttle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lazy val root = (project in file(".")). | |
settings( | |
name := "root", | |
version := "1.0", | |
scalaVersion := "2.12.2", | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-actor" % "2.5.3", | |
"com.typesafe.akka" %% "akka-agent" % "2.5.3", | |
"com.typesafe.akka" %% "akka-camel" % "2.5.3", | |
"com.typesafe.akka" %% "akka-cluster" % "2.5.3", | |
"com.typesafe.akka" %% "akka-cluster-metrics" % "2.5.3", | |
"com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.3", | |
"com.typesafe.akka" %% "akka-cluster-tools" % "2.5.3", | |
"com.typesafe.akka" %% "akka-distributed-data" % "2.5.3", | |
"com.typesafe.akka" %% "akka-multi-node-testkit" % "2.5.3", | |
"com.typesafe.akka" %% "akka-osgi" % "2.5.3", | |
"com.typesafe.akka" %% "akka-persistence" % "2.5.3", | |
"com.typesafe.akka" %% "akka-persistence-query" % "2.5.3", | |
"com.typesafe.akka" %% "akka-persistence-tck" % "2.5.3", | |
"com.typesafe.akka" %% "akka-remote" % "2.5.3", | |
"com.typesafe.akka" %% "akka-slf4j" % "2.5.3", | |
"com.typesafe.akka" %% "akka-stream" % "2.5.3", | |
"com.typesafe.akka" %% "akka-stream-testkit" % "2.5.3", | |
"com.typesafe.akka" %% "akka-testkit" % "2.5.3", | |
"com.typesafe.akka" %% "akka-typed" % "2.5.3", | |
"com.typesafe.akka" %% "akka-http" % "10.0.6", | |
"com.typesafe.akka" %% "akka-contrib" % "2.5.3" | |
) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package my.stream | |
import java.text.NumberFormat | |
import java.util.Locale | |
import akka.actor.ActorSystem | |
import akka.stream.scaladsl.{Keep, Sink} | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} | |
import akka.stream.testkit.TestSubscriber | |
import akka.stream.testkit.scaladsl.TestSource | |
import akka.stream.{Attributes, _} | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
class MyTimer[T] extends GraphStage[FlowShape[T, T]] { | |
val in = Inlet[T]("MyTimer.in") | |
val out = Outlet[T]("MyTimer.out") | |
override val shape = FlowShape(in, out) | |
override def initialAttributes: Attributes = Attributes.name("MyTimer") | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) with InHandler with OutHandler { | |
var currentTimestamp: Long = 0 | |
var howManyElementsProcessed: Int = 0 | |
override def preStart(): Unit = { | |
currentTimestamp = System.nanoTime() | |
println("initialized!") | |
} | |
override def onPush(): Unit = { | |
val previousTimestamp = currentTimestamp | |
currentTimestamp = System.nanoTime() | |
val passed = NumberFormat.getNumberInstance(Locale.US).format(currentTimestamp - previousTimestamp) | |
val elem = grab(in) | |
push(out, elem) | |
if(howManyElementsProcessed == 0) | |
println(s"processed element = ${elem}") | |
else | |
println(s"processed element = ${elem}, ${passed} nanoseconds have passed from the previous element") | |
howManyElementsProcessed = howManyElementsProcessed + 1 | |
} | |
override def onPull(): Unit = pull(in) | |
setHandlers(in, out, this) | |
} | |
} | |
object MyThrottle { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
/** | |
* This constructs and runs a stream where the Sink requests 10 elements and then the Source sends 10 too. | |
* (i.e.) NO backpressure from downstream, but there is backpressure from the throttle stage | |
*/ | |
def throttleTest(elements: Int, per: FiniteDuration, maxBurst: Int, costPerElement: Int)(): Unit ={ | |
println("--------------testing a throttle GraphStage in a stream with the following parameters: -------------") | |
val nanoBetweenTokens = NumberFormat.getNumberInstance(Locale.US).format(per.toNanos / elements) | |
println(s"elements = ${elements}, per = ${per}, maxBurst = ${maxBurst}, costCalc = ${costPerElement}, nanosBetweenTokens = $nanoBetweenTokens") | |
val ((sourcePublisher, fut), sinkPublisher) = TestSource.probe[Int] | |
.throttle(elements, per, maxBurst, (_: Int) => costPerElement, ThrottleMode.Shaping) | |
.via(new MyTimer) | |
.watchTermination()(Keep.both) | |
.toMat(Sink.asPublisher(false))(Keep.both) | |
.run() | |
val sinkSubscriber = TestSubscriber.manualProbe[Int]() | |
sinkPublisher.subscribe(sinkSubscriber) | |
val sinkSubscription = sinkSubscriber.expectSubscription() | |
/** | |
* Request 10 elements from downstream, where the upstream sends 10 elements | |
* (i.e.) NO backpressure from downstream | |
*/ | |
sinkSubscription.request(10) | |
for(i <- 1 to 10) | |
sourcePublisher.sendNext(i) | |
sourcePublisher.sendComplete() | |
try{ | |
val result = Await.result(fut, 10 seconds) | |
println(s"The stream finished with result = ${result}") | |
} | |
catch{ | |
case e: Exception => println(s"The stream failed with ${e}") | |
} | |
} | |
/** | |
* This constructs and runs a stream where the Sink requests only 5 elements but the Source sends 10. | |
* (i.e.) There is backpressure from downstream, as well as backpressure from the throttle stage | |
*/ | |
def throttleTestWithDownstreamBackPressure(elements: Int, per: FiniteDuration, maxBurst: Int, costPerElement: Int)(): Unit ={ | |
println("--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : -------------") | |
val nanoBetweenTokens = NumberFormat.getNumberInstance(Locale.US).format(per.toNanos / elements) | |
println(s"elements = ${elements}, per = ${per}, maxBurst = ${maxBurst}, costCalc = ${costPerElement}, nanosBetweenTokens = $nanoBetweenTokens") | |
val ((sourcePublisher, fut), sinkPublisher) = TestSource.probe[Int] | |
.throttle(elements, per, maxBurst, (_: Int) => costPerElement, ThrottleMode.Shaping) | |
.via(new MyTimer) | |
.watchTermination()(Keep.both) | |
.toMat(Sink.asPublisher(false))(Keep.both) | |
.run() | |
val sinkSubscriber = TestSubscriber.manualProbe[Int]() | |
sinkPublisher.subscribe(sinkSubscriber) | |
val sinkSubscription = sinkSubscriber.expectSubscription() | |
/** | |
* Request only 5 elements from downstream, where the upstream sends 10 elements | |
* (i.e.) From the 6-th (next to 5) element, there is backpressure from downstream | |
*/ | |
sinkSubscription.request(5) //************ THIS IS THE DIFFERENCE FROM throttleTest ************* | |
for(i <- 1 to 10) | |
sourcePublisher.sendNext(i) | |
sourcePublisher.sendComplete() | |
/** | |
* This Await will fail with timeout exception, since the Future (fut) is Future[Done] | |
* which completes when the stream completes, but this stream does not complete as | |
* the last 5 elements are still pending due to backpressure from downstream | |
*/ | |
try{ | |
val result = Await.result(fut, 5 seconds) | |
println(s"The stream finished with result = ${result}") | |
} | |
catch{ | |
case e: Exception => println(s"The stream failed with ${e}") | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
try { | |
throttleTest(elements = 8, per = 2 seconds, maxBurst = 1, costPerElement = 1) | |
throttleTest(elements = 8, per = 2 seconds, maxBurst = 3, costPerElement = 1) | |
throttleTest(elements = 8, per = 2 seconds, maxBurst = 5, costPerElement = 1) | |
throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 1, costPerElement = 1) | |
throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 3, costPerElement = 1) | |
throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 5, costPerElement = 1) | |
} | |
finally { | |
system.terminate() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[info] Compiling 1 Scala source to C:\Users\**********\target\scala-2.12\classes... | |
[warn] there were 8 feature warnings; re-run with -feature for details | |
[warn] one warning found | |
[info] Running my.stream.MyThrottle | |
--------------testing a throttle GraphStage in a stream with the following parameters: ------------- | |
elements = 8, per = 2 seconds, maxBurst = 1, costCalc = 1, nanosBetweenTokens = 250,000,000 | |
initialized! | |
processed element = 1 | |
processed element = 2, 262,791,512 nanoseconds have passed from the previous element | |
processed element = 3, 244,556,619 nanoseconds have passed from the previous element | |
processed element = 4, 252,227,606 nanoseconds have passed from the previous element | |
processed element = 5, 247,281,277 nanoseconds have passed from the previous element | |
processed element = 6, 251,096,038 nanoseconds have passed from the previous element | |
processed element = 7, 251,129,089 nanoseconds have passed from the previous element | |
processed element = 8, 251,470,422 nanoseconds have passed from the previous element | |
processed element = 9, 245,753,390 nanoseconds have passed from the previous element | |
processed element = 10, 252,221,596 nanoseconds have passed from the previous element | |
The stream finished with result = Done | |
--------------testing a throttle GraphStage in a stream with the following parameters: ------------- | |
elements = 8, per = 2 seconds, maxBurst = 3, costCalc = 1, nanosBetweenTokens = 250,000,000 | |
initialized! | |
processed element = 1 | |
processed element = 2, 154,441 nanoseconds have passed from the previous element | |
processed element = 3, 99,455 nanoseconds have passed from the previous element //due to maxBurst = 3, 1 to 3 are processed quickly | |
processed element = 4, 257,667,305 nanoseconds have passed from the previous element | |
processed element = 5, 255,733,784 nanoseconds have passed from the previous element | |
processed element = 6, 250,064,526 nanoseconds have passed from the previous element | |
processed element = 7, 251,597,521 nanoseconds have passed from the previous element | |
processed element = 8, 252,456,864 nanoseconds have passed from the previous element | |
processed element = 9, 245,772,920 nanoseconds have passed from the previous element | |
processed element = 10, 242,263,137 nanoseconds have passed from the previous element | |
The stream finished with result = Done | |
--------------testing a throttle GraphStage in a stream with the following parameters: ------------- | |
elements = 8, per = 2 seconds, maxBurst = 5, costCalc = 1, nanosBetweenTokens = 250,000,000 | |
initialized! | |
processed element = 1 | |
processed element = 2, 266,216 nanoseconds have passed from the previous element | |
processed element = 3, 466,028 nanoseconds have passed from the previous element | |
processed element = 4, 235,268 nanoseconds have passed from the previous element | |
processed element = 5, 209,728 nanoseconds have passed from the previous element //due to maxBurst = 5, 1 to 5 are processed quickly | |
processed element = 6, 258,053,108 nanoseconds have passed from the previous element | |
processed element = 7, 261,073,427 nanoseconds have passed from the previous element | |
processed element = 8, 239,345,878 nanoseconds have passed from the previous element | |
processed element = 9, 250,545,878 nanoseconds have passed from the previous element | |
processed element = 10, 249,929,014 nanoseconds have passed from the previous element | |
The stream finished with result = Done | |
--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : ------------- | |
elements = 8, per = 2 seconds, maxBurst = 1, costCalc = 1, nanosBetweenTokens = 250,000,000 | |
initialized! | |
processed element = 1 | |
processed element = 2, 249,419,117 nanoseconds have passed from the previous element | |
processed element = 3, 259,014,911 nanoseconds have passed from the previous element | |
processed element = 4, 254,103,436 nanoseconds have passed from the previous element | |
processed element = 5, 250,239,700 nanoseconds have passed from the previous element | |
The stream failed with java.util.concurrent.TimeoutException: Futures timed out after [5 seconds] | |
--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : ------------- | |
elements = 8, per = 2 seconds, maxBurst = 3, costCalc = 1, nanosBetweenTokens = 250,000,000 | |
initialized! | |
processed element = 1 | |
processed element = 2, 575,099 nanoseconds have passed from the previous element | |
processed element = 3, 543,850 nanoseconds have passed from the previous element //due to maxBurst = 3, 1 to 3 are processed quickly | |
processed element = 4, 254,740,432 nanoseconds have passed from the previous element | |
processed element = 5, 262,664,112 nanoseconds have passed from the previous element | |
The stream failed with java.util.concurrent.TimeoutException: Futures timed out after [5 seconds] | |
--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : ------------- | |
elements = 8, per = 2 seconds, maxBurst = 5, costCalc = 1, nanosBetweenTokens = 250,000,000 | |
initialized! | |
processed element = 1 | |
processed element = 2, 590,122 nanoseconds have passed from the previous element | |
processed element = 3, 529,727 nanoseconds have passed from the previous element | |
processed element = 4, 548,357 nanoseconds have passed from the previous element | |
processed element = 5, 463,625 nanoseconds have passed from the previous element //due to maxBurst = 5, 1 to 5 are processed quickly | |
The stream failed with java.util.concurrent.TimeoutException: Futures timed out after [5 seconds] | |
[success] Total time: 22 s, completed Jun 24, 2017 10:55:00 PM | |
51. Waiting for source changes... (press enter to interrupt) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment