Skip to content

Instantly share code, notes, and snippets.

@radium226
Last active September 6, 2017 12:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save radium226/eb55024dd7282ae42c4b9684178ff406 to your computer and use it in GitHub Desktop.
Save radium226/eb55024dd7282ae42c4b9684178ff406 to your computer and use it in GitHub Desktop.
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler}
import scala.concurrent._
import scala.concurrent.duration._
object IntSource {
trait Controller {
def update(value: Int): Unit
def stop()
}
def apply(initialValue: Int): Source[Int, Controller] = Source.fromGraph(new GraphStageWithMaterializedValue[SourceShape[Int], Controller] {
val out = Outlet[Int]("IntSource")
override def shape = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val logic = new GraphStageLogic(shape) {
var currentValue = initialValue
var shouldComplete = false
val updater = new Controller {
override def update(value: Int): Unit = {
println(s" --> currentValue=${value}")
currentValue = value
}
override def stop(): Unit = {
println(" --> shouldComplete=true")
shouldComplete = true
}
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (shouldComplete) complete(out)
else push(out, currentValue)
}
})
}
(logic, logic.updater)
}
})
.throttle(5, 1 second, 5, ThrottleMode.shaping)
}
object TryIntSource extends App {
def sleep(duration: Duration): Unit = {
Thread.sleep(duration.toMillis)
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = materializer.executionContext
val intSource = IntSource(5)
val graph = intSource.toMat(Sink.foreach(println))(Keep.both)
println(" ==> Let's start")
val (controller, graphRunning) = graph.run()
Future {
sleep(5 seconds)
println(" ==> Let's update to 10")
controller.update(10)
sleep(5 seconds)
println(" ==> Let's stop! ")
controller.stop()
}
Await.result(graphRunning, Duration.Inf)
println(" ==> Let's quit! ")
materializer.shutdown()
system.terminate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment