Created
December 15, 2016 06:58
-
-
Save smdmts/ac5e0cda9e57afd37cb3d345bc605c1b to your computer and use it in GitHub Desktop.
example flow with future.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream.{ActorAttributes, ActorMaterializer, FlowShape, Supervision} | |
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source} | |
import scala.concurrent.Future | |
/** | |
* StreamFlowWithFuture. | |
*/ | |
object StreamFlowWithFuture { | |
val requestGraph = { | |
val flow = GraphDSL.create() { implicit builder => | |
import GraphDSL.Implicits._ | |
val input = builder.add(Flow[String]) | |
val requestFlow = builder.add(Flow[String].mapAsync(1) { request => | |
toFuture(request) | |
}) | |
val lastFlow = builder.add(Flow[Int].map(lastHandle)) | |
input ~> requestFlow ~> lastFlow | |
FlowShape(input.in, lastFlow.out) | |
} | |
flow.withAttributes(ActorAttributes.supervisionStrategy(_ => Supervision.stop)) | |
} | |
def toFuture(input:String):Future[Int] = { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
Future { | |
input.toInt | |
} | |
} | |
def lastHandle(last: Int) = { | |
println(last) | |
} | |
def main(args: Array[String]): Unit = { | |
implicit val system = ActorSystem("example") | |
implicit val materializer = ActorMaterializer() | |
// Main. | |
Source.single("1") | |
.via(requestGraph) | |
.to(Sink.ignore) | |
.run() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment