Skip to content

Instantly share code, notes, and snippets.

@vasily-kirichenko
Created November 25, 2018 11:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vasily-kirichenko/68e73726a7a35ccefa62fdf9b9ba71ed to your computer and use it in GitHub Desktop.
Save vasily-kirichenko/68e73726a7a35ccefa62fdf9b9ba71ed to your computer and use it in GitHub Desktop.
def cache[In, Out](flow: Flow[In, Out, _]) = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
var cache: Map[In, Out] = Map.empty
val partition = b.add(Partition[In](2, in => if (cache.contains(in)) 0 else 1))
val merge = b.add(Merge[Out](2))
partition.out(0).map(in => cache(in)) ~> merge
val broadcast = b.add(Broadcast[In](2))
partition.out(1) ~> broadcast
val zip = b.add(Zip[In, Out]())
broadcast ~> flow ~> zip.in1
broadcast ~> zip.in0
zip.out.map { case (in, out) =>
cache += (in -> out)
out
} ~> merge
FlowShape(partition.in, merge.out)
})
Source(List(1, 2, 3, 2, 2, 4))
.via(cache(Flow[Int].map { in =>
val res = in * 100
println(s"$in => $res")
res
}))
.runForeach(out => println(s"Downstream: $out"))
1 => 100
Downstream: 100
2 => 200
Downstream: 200
Downstream: 200
3 => 300
Downstream: 200
Downstream: 300
4 => 400
Downstream: 400
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment