Skip to content

Instantly share code, notes, and snippets.

@iamhatesz
Last active March 21, 2017 23:32
Show Gist options
  • Save iamhatesz/fcc7604345b27d62ede8fdce1576cecf to your computer and use it in GitHub Desktop.
Save iamhatesz/fcc7604345b27d62ede8fdce1576cecf to your computer and use it in GitHub Desktop.
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
object ConditionalVia extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
def conditionalVia[E](predicate: E => Boolean, flow: Flow[E, E, NotUsed]): Flow[E, E, NotUsed] =
Flow[E]
.flatMapConcat {
case elem if predicate(elem) => Source.single(elem).via(flow)
case elem => Source.single(elem)
}
Source(1 to 10)
.via(conditionalVia[Int](_ % 2 == 0, Flow[Int].map(_ * 2)))
.via(conditionalVia[Int](_ % 3 == 0, Flow[Int].map(_ * 3)))
.runWith(Sink.foreach(println))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment