Skip to content

Instantly share code, notes, and snippets.

View aludwiko's full-sized avatar

Andrzej Ludwikowski aludwiko

View GitHub Profile
Source(List(1, 2, 3, 4))
.map(_ * 2)
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
.map(i => if (i == 2) throw new IllegalStateException("error"))
.runWith(Sink.ignore)
Source(List(1, 2, 3, 4))
.map(_ * 2)
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
.map(i => if (i == 2) throw new IllegalStateException("error"))
.runWith(Sink.ignore)
Source(infiniteTransactionsStream)
 .log(“got transaction”)
def saveHashAndReturnIt(hash: String): Future[String] = Future {
 if (Random.nextInt(5) == 1) {
  Thread.sleep(100000) //simulate database unavailability
 }
 hash
}
Source(1 to 5)
 .alsoTo(Sink.foreach{x =>
  Thread.sleep(100)
  println(s”elem:$x alsoTo”)}
 )
 .map { x =>
  println(s”elem:$x map”)
  x
 }
 .to(Sink.foreach(x => println(s”elem:$x to”)))
val roomsTemperatureReadings =
 Stream.continually(TemperatureReading(randomRoom, randomTemperature))
Source(roomsTemperatureReadings)
 .alsoTo(Sink.foreach(reading => createRoomIfNotExist(reading.roomId)))
 .to(Sink.foreach(saveReading))
 .run()
Source(List("1","2","a","4","5"))
 .map(_.toInt)
 .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
 .runForeach(println)
val nonLinearFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
 import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].map {
  case 5 => throw new RuntimeException(“BOOM!”)
  case x => x
 }
 val safeFlow = Flow[Int]
Source(1 to 10)
 .via(nonLinearFlow)
 .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
 .runForeach(println)
val dangerousFlow = Flow[Int].map {
  case 5 => Failure(new RuntimeException(“BOOM!”))
  case x => Try(x)
}
val safeFlow = Flow[Int].map(Try(_))
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Try[Int], Try[Int]])