Skip to content

Instantly share code, notes, and snippets.

View aludwiko's full-sized avatar

Andrzej Ludwikowski aludwiko

View GitHub Profile
Source(List(1, 2, 3, 4))
.map(_ * 2)
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
.map(i => if (i == 2) throw new IllegalStateException("error"))
.runWith(Sink.ignore)
Source(List(1, 2, 3, 4))
.map(_ * 2)
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
.map(i => if (i == 2) throw new IllegalStateException("error"))
.runWith(Sink.ignore)
val infiniteTransactionsStream =
 Stream.continually(s"$randomUser() sent $randomValue() BTC to $randomUser()")
Source(infiniteTransactionsStream)
 .grouped(5) //group transactions into 5 element blocks
 .scan(0.hashCode.toString) { (previousBlockHash, transactions) =>
//hash for block is defined as hash of transactions hash and previous block hash
  (transactions.hashCode(), previousBlockHash).hashCode.toString
 }
.mapAsync(1)(saveHashAndReturnIt)
Source(infiniteTransactionsStream)
 .log(“got transaction”)
Source(infiniteTransactionsStream)
 .log(“got transaction”)
 .grouped(5)
 .log(“grouped transactions”)
 .scan(0.hashCode.toString) { (previousBlockHash, transactions) => 
  (transactions.hashCode(), previousBlockHash).hashCode.toString 
 }
 .log(“hashed block”)
 .mapAsync(1)(saveHashAndReturnIt)
 .log(“saved block”)
def saveHashAndReturnIt(hash: String): Future[String] = Future {
 if (Random.nextInt(5) == 1) {
  Thread.sleep(100000) //simulate database unavailability
 }
 hash
}
Source(1 to 5)
 .alsoTo(Sink.foreach{x =>
  Thread.sleep(100)
  println(s”elem:$x alsoTo”)}
 )
 .map { x =>
  println(s”elem:$x map”)
  x
 }
 .to(Sink.foreach(x => println(s”elem:$x to”)))
val roomsTemperatureReadings =
 Stream.continually(TemperatureReading(randomRoom, randomTemperature))
Source(roomsTemperatureReadings)
 .alsoTo(Sink.foreach(reading => createRoomIfNotExist(reading.roomId)))
 .to(Sink.foreach(saveReading))
 .run()
Source(List("1","2","a","4","5"))
 .map(_.toInt)
 .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
 .runForeach(println)
val nonLinearFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
 import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].map {
  case 5 => throw new RuntimeException(“BOOM!”)
  case x => x
 }
 val safeFlow = Flow[Int]