Skip to content

Instantly share code, notes, and snippets.

@awekuit
Created September 5, 2017 02:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save awekuit/2a75bab1d800890fbd9d9b495a3f91d0 to your computer and use it in GitHub Desktop.
Save awekuit/2a75bab1d800890fbd9d9b495a3f91d0 to your computer and use it in GitHub Desktop.
Materialize behavior
import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.testkit.TestKit
import akka.util.Timeout
import org.scalatest.{FreeSpecLike, Matchers}
import scala.concurrent.duration._
class Hoge extends TestKit(ActorSystem("Hoge")) with FreeSpecLike with Matchers {
implicit val mat = ActorMaterializer()
implicit val ec = system.dispatcher
implicit val askTimeout = Timeout(1 hour)
case class MetricsContext(startedAt: Instant)
def viaRecordStartedAt[T](startedAt: Instant): Flow[T, (T, MetricsContext), NotUsed] = {
Flow[T].map(_ -> MetricsContext(startedAt))
}
def mapRecordStartedAt1[T](startedAt: Instant)(message: T): (T, MetricsContext) = {
message -> MetricsContext(startedAt)
}
def mapRecordStartedAt2[T](startedAt: Instant, message: T): (T, MetricsContext) = {
message -> MetricsContext(startedAt)
}
"via" in {
val switch1 = Source
.tick(0 second, 1 second, Unit)
.via(viaRecordStartedAt(Instant.now))
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(x => println(s"viaRecordStartedAt: $x")))(Keep.left)
.run
Thread.sleep(1000 * 5)
switch1.shutdown()
val switch2 = Source
.tick(0 second, 1 second, Unit)
.map(mapRecordStartedAt1(Instant.now))
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(x => println(s"mapRecordStartedAt1: $x")))(Keep.left)
.run
Thread.sleep(1000 * 5)
switch2.shutdown()
val switch3 = Source
.tick(0 second, 1 second, Unit)
.map(mapRecordStartedAt2(Instant.now, _))
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(x => println(s"mapRecordStartedAt2: $x")))(Keep.left)
.run
Thread.sleep(1000 * 5)
switch3.shutdown()
}
}
viaRecordStartedAt: (object scala.Unit,MetricsContext(2017-09-05T02:44:42.166Z))
viaRecordStartedAt: (object scala.Unit,MetricsContext(2017-09-05T02:44:42.166Z))
viaRecordStartedAt: (object scala.Unit,MetricsContext(2017-09-05T02:44:42.166Z))
viaRecordStartedAt: (object scala.Unit,MetricsContext(2017-09-05T02:44:42.166Z))
viaRecordStartedAt: (object scala.Unit,MetricsContext(2017-09-05T02:44:42.166Z))
mapRecordStartedAt1: (object scala.Unit,MetricsContext(2017-09-05T02:44:47.226Z))
mapRecordStartedAt1: (object scala.Unit,MetricsContext(2017-09-05T02:44:47.226Z))
mapRecordStartedAt1: (object scala.Unit,MetricsContext(2017-09-05T02:44:47.226Z))
mapRecordStartedAt1: (object scala.Unit,MetricsContext(2017-09-05T02:44:47.226Z))
mapRecordStartedAt1: (object scala.Unit,MetricsContext(2017-09-05T02:44:47.226Z))
mapRecordStartedAt2: (object scala.Unit,MetricsContext(2017-09-05T02:44:52.239Z))
mapRecordStartedAt2: (object scala.Unit,MetricsContext(2017-09-05T02:44:53.241Z))
mapRecordStartedAt2: (object scala.Unit,MetricsContext(2017-09-05T02:44:54.248Z))
mapRecordStartedAt2: (object scala.Unit,MetricsContext(2017-09-05T02:44:55.259Z))
mapRecordStartedAt2: (object scala.Unit,MetricsContext(2017-09-05T02:44:56.248Z))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment