Skip to content

Instantly share code, notes, and snippets.

@eiel eiel/Build.sbt
Last active Aug 10, 2016

Embed
What would you like to do?
akka stream で backpressureをdelayをつかって体験してみる - http://blog.eiel.info/blog/2016/08/08/ltdd-28-akka-stream/
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println) // 入力を画面に出力する
source.runWith(sink) // sourceとsinkをくっつけてRunnableGraphをつくって、runする
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.8"
)
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println) // 入力を画面に出力する
source.delay(1 second).runWith(sink)
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer,DelayOverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println)
source.delay(1 second, DelayOverflowStrategy.backpressure).runWith(sink) // 2つづつでてしまう。
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer,ActorMaterializerSettings,DelayOverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
def materializer(size: Int) = {
ActorMaterializer(ActorMaterializerSettings(actorSystem).withInputBuffer(initialSize = size, maxSize = size))
}
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println)
// バッファサイズを1にしてみる
source.delay(1 second, DelayOverflowStrategy.backpressure).runWith(sink)(materializer(1))
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
def materializer(size: Int) = {
ActorMaterializer(ActorMaterializerSettings(actorSystem).withInputBuffer(initialSize = size, maxSize = size))
}
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println) // 入力を画面に出力する
source.delay(1 second).runWith(sink)(materializer(8)) // バッファサイズ8
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0))
var sink = Sink.foreach[Int](println)
source.throttle(1, 1 second, 1, ThrottleMode.Shaping).runWith(sink)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.