Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple sample how to connect a Flow to Source and keep going the stream using Akka Stream
package main
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import GraphDSL.Implicits._
import com.typesafe.config.{ Config, ConfigFactory }
object SrcToFlow extends App {
// implicit actor system
implicit val system = ActorSystem()
// implicit actor materializer
implicit val materializer = ActorMaterializer()
val graph = RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore) { implicit b => sink =>
//Simple Source
val src = Source(List(1, 2, 3))
val fromSrc = b.add(Flow[Int].log("from-Src"))
//Emit new list per element received
val flow = b.add(Flow[Int].flatMapConcat{ element =>
Source(List(4, 5, 6))
})
//¯\_(ツ)_/¯ nothing to do, just for sample purposes
val map = b.add(Flow[Int].map{ element =>
element
})
src ~> fromSrc ~>flow ~> map ~> sink
ClosedShape
})
graph.run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment