Skip to content

Instantly share code, notes, and snippets.

@smdmts
Created December 15, 2016 06:58
Show Gist options
  • Save smdmts/ac5e0cda9e57afd37cb3d345bc605c1b to your computer and use it in GitHub Desktop.
Save smdmts/ac5e0cda9e57afd37cb3d345bc605c1b to your computer and use it in GitHub Desktop.
example flow with future.
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ActorMaterializer, FlowShape, Supervision}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import scala.concurrent.Future
/**
* StreamFlowWithFuture.
*/
object StreamFlowWithFuture {
val requestGraph = {
val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(Flow[String])
val requestFlow = builder.add(Flow[String].mapAsync(1) { request =>
toFuture(request)
})
val lastFlow = builder.add(Flow[Int].map(lastHandle))
input ~> requestFlow ~> lastFlow
FlowShape(input.in, lastFlow.out)
}
flow.withAttributes(ActorAttributes.supervisionStrategy(_ => Supervision.stop))
}
def toFuture(input:String):Future[Int] = {
import scala.concurrent.ExecutionContext.Implicits.global
Future {
input.toInt
}
}
def lastHandle(last: Int) = {
println(last)
}
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
// Main.
Source.single("1")
.via(requestGraph)
.to(Sink.ignore)
.run()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment