Skip to content

Instantly share code, notes, and snippets.

@richardimaoka
Last active June 24, 2017 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save richardimaoka/83d49a08aacc25659cad8e0daca3461d to your computer and use it in GitHub Desktop.
Save richardimaoka/83d49a08aacc25659cad8e0daca3461d to your computer and use it in GitHub Desktop.
Demonstrate Akka Stream's throttle
lazy val root = (project in file(".")).
settings(
name := "root",
version := "1.0",
scalaVersion := "2.12.2",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.3",
"com.typesafe.akka" %% "akka-agent" % "2.5.3",
"com.typesafe.akka" %% "akka-camel" % "2.5.3",
"com.typesafe.akka" %% "akka-cluster" % "2.5.3",
"com.typesafe.akka" %% "akka-cluster-metrics" % "2.5.3",
"com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.3",
"com.typesafe.akka" %% "akka-cluster-tools" % "2.5.3",
"com.typesafe.akka" %% "akka-distributed-data" % "2.5.3",
"com.typesafe.akka" %% "akka-multi-node-testkit" % "2.5.3",
"com.typesafe.akka" %% "akka-osgi" % "2.5.3",
"com.typesafe.akka" %% "akka-persistence" % "2.5.3",
"com.typesafe.akka" %% "akka-persistence-query" % "2.5.3",
"com.typesafe.akka" %% "akka-persistence-tck" % "2.5.3",
"com.typesafe.akka" %% "akka-remote" % "2.5.3",
"com.typesafe.akka" %% "akka-slf4j" % "2.5.3",
"com.typesafe.akka" %% "akka-stream" % "2.5.3",
"com.typesafe.akka" %% "akka-stream-testkit" % "2.5.3",
"com.typesafe.akka" %% "akka-testkit" % "2.5.3",
"com.typesafe.akka" %% "akka-typed" % "2.5.3",
"com.typesafe.akka" %% "akka-http" % "10.0.6",
"com.typesafe.akka" %% "akka-contrib" % "2.5.3"
)
)
  • See MyThrottle.scala and its def main method

    • that tests the throttle GraphStage with different maxBurst values
    • elements = 8, per = 2 seconds are fixed, which result in nanosBetweenTokens = 250,000,000
  • Then look at the output.txt

    • you see *** nanoseconds have passed messages indicates time lapse almost equal to nanosBetweenTokens = 250,000,000
    • and see the differentce between cases with backpressure from downstream and withOUT it
    • also you can see how maxBurst affects how the first few elements are processed
package my.stream
import java.text.NumberFormat
import java.util.Locale
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSource
import akka.stream.{Attributes, _}
import scala.concurrent.Await
import scala.concurrent.duration._
class MyTimer[T] extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("MyTimer.in")
val out = Outlet[T]("MyTimer.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = Attributes.name("MyTimer")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var currentTimestamp: Long = 0
var howManyElementsProcessed: Int = 0
override def preStart(): Unit = {
currentTimestamp = System.nanoTime()
println("initialized!")
}
override def onPush(): Unit = {
val previousTimestamp = currentTimestamp
currentTimestamp = System.nanoTime()
val passed = NumberFormat.getNumberInstance(Locale.US).format(currentTimestamp - previousTimestamp)
val elem = grab(in)
push(out, elem)
if(howManyElementsProcessed == 0)
println(s"processed element = ${elem}")
else
println(s"processed element = ${elem}, ${passed} nanoseconds have passed from the previous element")
howManyElementsProcessed = howManyElementsProcessed + 1
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
object MyThrottle {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
/**
* This constructs and runs a stream where the Sink requests 10 elements and then the Source sends 10 too.
* (i.e.) NO backpressure from downstream, but there is backpressure from the throttle stage
*/
def throttleTest(elements: Int, per: FiniteDuration, maxBurst: Int, costPerElement: Int)(): Unit ={
println("--------------testing a throttle GraphStage in a stream with the following parameters: -------------")
val nanoBetweenTokens = NumberFormat.getNumberInstance(Locale.US).format(per.toNanos / elements)
println(s"elements = ${elements}, per = ${per}, maxBurst = ${maxBurst}, costCalc = ${costPerElement}, nanosBetweenTokens = $nanoBetweenTokens")
val ((sourcePublisher, fut), sinkPublisher) = TestSource.probe[Int]
.throttle(elements, per, maxBurst, (_: Int) => costPerElement, ThrottleMode.Shaping)
.via(new MyTimer)
.watchTermination()(Keep.both)
.toMat(Sink.asPublisher(false))(Keep.both)
.run()
val sinkSubscriber = TestSubscriber.manualProbe[Int]()
sinkPublisher.subscribe(sinkSubscriber)
val sinkSubscription = sinkSubscriber.expectSubscription()
/**
* Request 10 elements from downstream, where the upstream sends 10 elements
* (i.e.) NO backpressure from downstream
*/
sinkSubscription.request(10)
for(i <- 1 to 10)
sourcePublisher.sendNext(i)
sourcePublisher.sendComplete()
try{
val result = Await.result(fut, 10 seconds)
println(s"The stream finished with result = ${result}")
}
catch{
case e: Exception => println(s"The stream failed with ${e}")
}
}
/**
* This constructs and runs a stream where the Sink requests only 5 elements but the Source sends 10.
* (i.e.) There is backpressure from downstream, as well as backpressure from the throttle stage
*/
def throttleTestWithDownstreamBackPressure(elements: Int, per: FiniteDuration, maxBurst: Int, costPerElement: Int)(): Unit ={
println("--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : -------------")
val nanoBetweenTokens = NumberFormat.getNumberInstance(Locale.US).format(per.toNanos / elements)
println(s"elements = ${elements}, per = ${per}, maxBurst = ${maxBurst}, costCalc = ${costPerElement}, nanosBetweenTokens = $nanoBetweenTokens")
val ((sourcePublisher, fut), sinkPublisher) = TestSource.probe[Int]
.throttle(elements, per, maxBurst, (_: Int) => costPerElement, ThrottleMode.Shaping)
.via(new MyTimer)
.watchTermination()(Keep.both)
.toMat(Sink.asPublisher(false))(Keep.both)
.run()
val sinkSubscriber = TestSubscriber.manualProbe[Int]()
sinkPublisher.subscribe(sinkSubscriber)
val sinkSubscription = sinkSubscriber.expectSubscription()
/**
* Request only 5 elements from downstream, where the upstream sends 10 elements
* (i.e.) From the 6-th (next to 5) element, there is backpressure from downstream
*/
sinkSubscription.request(5) //************ THIS IS THE DIFFERENCE FROM throttleTest *************
for(i <- 1 to 10)
sourcePublisher.sendNext(i)
sourcePublisher.sendComplete()
/**
* This Await will fail with timeout exception, since the Future (fut) is Future[Done]
* which completes when the stream completes, but this stream does not complete as
* the last 5 elements are still pending due to backpressure from downstream
*/
try{
val result = Await.result(fut, 5 seconds)
println(s"The stream finished with result = ${result}")
}
catch{
case e: Exception => println(s"The stream failed with ${e}")
}
}
def main(args: Array[String]): Unit = {
try {
throttleTest(elements = 8, per = 2 seconds, maxBurst = 1, costPerElement = 1)
throttleTest(elements = 8, per = 2 seconds, maxBurst = 3, costPerElement = 1)
throttleTest(elements = 8, per = 2 seconds, maxBurst = 5, costPerElement = 1)
throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 1, costPerElement = 1)
throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 3, costPerElement = 1)
throttleTestWithDownstreamBackPressure(elements = 8, per = 2 seconds, maxBurst = 5, costPerElement = 1)
}
finally {
system.terminate()
}
}
}
[info] Compiling 1 Scala source to C:\Users\**********\target\scala-2.12\classes...
[warn] there were 8 feature warnings; re-run with -feature for details
[warn] one warning found
[info] Running my.stream.MyThrottle
--------------testing a throttle GraphStage in a stream with the following parameters: -------------
elements = 8, per = 2 seconds, maxBurst = 1, costCalc = 1, nanosBetweenTokens = 250,000,000
initialized!
processed element = 1
processed element = 2, 262,791,512 nanoseconds have passed from the previous element
processed element = 3, 244,556,619 nanoseconds have passed from the previous element
processed element = 4, 252,227,606 nanoseconds have passed from the previous element
processed element = 5, 247,281,277 nanoseconds have passed from the previous element
processed element = 6, 251,096,038 nanoseconds have passed from the previous element
processed element = 7, 251,129,089 nanoseconds have passed from the previous element
processed element = 8, 251,470,422 nanoseconds have passed from the previous element
processed element = 9, 245,753,390 nanoseconds have passed from the previous element
processed element = 10, 252,221,596 nanoseconds have passed from the previous element
The stream finished with result = Done
--------------testing a throttle GraphStage in a stream with the following parameters: -------------
elements = 8, per = 2 seconds, maxBurst = 3, costCalc = 1, nanosBetweenTokens = 250,000,000
initialized!
processed element = 1
processed element = 2, 154,441 nanoseconds have passed from the previous element
processed element = 3, 99,455 nanoseconds have passed from the previous element //due to maxBurst = 3, 1 to 3 are processed quickly
processed element = 4, 257,667,305 nanoseconds have passed from the previous element
processed element = 5, 255,733,784 nanoseconds have passed from the previous element
processed element = 6, 250,064,526 nanoseconds have passed from the previous element
processed element = 7, 251,597,521 nanoseconds have passed from the previous element
processed element = 8, 252,456,864 nanoseconds have passed from the previous element
processed element = 9, 245,772,920 nanoseconds have passed from the previous element
processed element = 10, 242,263,137 nanoseconds have passed from the previous element
The stream finished with result = Done
--------------testing a throttle GraphStage in a stream with the following parameters: -------------
elements = 8, per = 2 seconds, maxBurst = 5, costCalc = 1, nanosBetweenTokens = 250,000,000
initialized!
processed element = 1
processed element = 2, 266,216 nanoseconds have passed from the previous element
processed element = 3, 466,028 nanoseconds have passed from the previous element
processed element = 4, 235,268 nanoseconds have passed from the previous element
processed element = 5, 209,728 nanoseconds have passed from the previous element //due to maxBurst = 5, 1 to 5 are processed quickly
processed element = 6, 258,053,108 nanoseconds have passed from the previous element
processed element = 7, 261,073,427 nanoseconds have passed from the previous element
processed element = 8, 239,345,878 nanoseconds have passed from the previous element
processed element = 9, 250,545,878 nanoseconds have passed from the previous element
processed element = 10, 249,929,014 nanoseconds have passed from the previous element
The stream finished with result = Done
--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : -------------
elements = 8, per = 2 seconds, maxBurst = 1, costCalc = 1, nanosBetweenTokens = 250,000,000
initialized!
processed element = 1
processed element = 2, 249,419,117 nanoseconds have passed from the previous element
processed element = 3, 259,014,911 nanoseconds have passed from the previous element
processed element = 4, 254,103,436 nanoseconds have passed from the previous element
processed element = 5, 250,239,700 nanoseconds have passed from the previous element
The stream failed with java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : -------------
elements = 8, per = 2 seconds, maxBurst = 3, costCalc = 1, nanosBetweenTokens = 250,000,000
initialized!
processed element = 1
processed element = 2, 575,099 nanoseconds have passed from the previous element
processed element = 3, 543,850 nanoseconds have passed from the previous element //due to maxBurst = 3, 1 to 3 are processed quickly
processed element = 4, 254,740,432 nanoseconds have passed from the previous element
processed element = 5, 262,664,112 nanoseconds have passed from the previous element
The stream failed with java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
--------------testing a throttle GraphStage in a stream with the following parameters and backpressure from downstream : -------------
elements = 8, per = 2 seconds, maxBurst = 5, costCalc = 1, nanosBetweenTokens = 250,000,000
initialized!
processed element = 1
processed element = 2, 590,122 nanoseconds have passed from the previous element
processed element = 3, 529,727 nanoseconds have passed from the previous element
processed element = 4, 548,357 nanoseconds have passed from the previous element
processed element = 5, 463,625 nanoseconds have passed from the previous element //due to maxBurst = 5, 1 to 5 are processed quickly
The stream failed with java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
[success] Total time: 22 s, completed Jun 24, 2017 10:55:00 PM
51. Waiting for source changes... (press enter to interrupt)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment