Skip to content

Instantly share code, notes, and snippets.

@eiel
Last active August 10, 2016 11:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eiel/a8edc165ad316868c75766a447d57237 to your computer and use it in GitHub Desktop.
Save eiel/a8edc165ad316868c75766a447d57237 to your computer and use it in GitHub Desktop.
akka stream で backpressureをdelayをつかって体験してみる - http://blog.eiel.info/blog/2016/08/08/ltdd-28-akka-stream/
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println) // 入力を画面に出力する
source.runWith(sink) // sourceとsinkをくっつけてRunnableGraphをつくって、runする
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.8"
)
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println) // 入力を画面に出力する
source.delay(1 second).runWith(sink)
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer,DelayOverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println)
source.delay(1 second, DelayOverflowStrategy.backpressure).runWith(sink) // 2つづつでてしまう。
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer,ActorMaterializerSettings,DelayOverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
def materializer(size: Int) = {
ActorMaterializer(ActorMaterializerSettings(actorSystem).withInputBuffer(initialSize = size, maxSize = size))
}
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println)
// バッファサイズを1にしてみる
source.delay(1 second, DelayOverflowStrategy.backpressure).runWith(sink)(materializer(1))
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
def materializer(size: Int) = {
ActorMaterializer(ActorMaterializerSettings(actorSystem).withInputBuffer(initialSize = size, maxSize = size))
}
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0)) // 数値が0から順番に流れてくるSource
var sink = Sink.foreach[Int](println) // 入力を画面に出力する
source.delay(1 second).runWith(sink)(materializer(8)) // バッファサイズ8
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materialize: ActorMaterializer = ActorMaterializer()
val source = Source.fromIterator(() => Iterator.from(0))
var sink = Sink.foreach[Int](println)
source.throttle(1, 1 second, 1, ThrottleMode.Shaping).runWith(sink)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment