Skip to content

Instantly share code, notes, and snippets.

@kretes
Last active December 29, 2016 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kretes/8d5f2925de55b2a274148b69f79e55ac to your computer and use it in GitHub Desktop.
Save kretes/8d5f2925de55b2a274148b69f79e55ac to your computer and use it in GitHub Desktop.
Alternative implementation of tupledflow in akka-streams.. see http://stackoverflow.com/questions/41366030/elegant-way-of-reusing-akka-stream-flows/41380943
import akka.NotUsed
import akka.event.Logging
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, Zip, ZipWith2}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
object TupledFlow {
/**
* this is a decorator for a flow that keeps track of the input and return a pair (input, output)
*
*/
def tupledFlow[In,Out](flow: Flow[In, Out, _])(implicit materializer: Materializer, executionContext: ExecutionContext):Flow[In, (In,Out), NotUsed] = {
val v:Flow[In, Seq[(In, Out)], NotUsed] = Flow[In].mapAsync(4) { in: In =>
val outFuture: Future[Seq[Out]] = Source.single(in).via(flow).runWith(Sink.seq)
val bothFuture: Future[Seq[(In,Out)]] = outFuture.map( seqOfOut => seqOfOut.map((in,_)) )
bothFuture
}
val onlyDefined: Flow[In, (In, Out), NotUsed] = v.mapConcat[(In, Out)](seq => seq.to[scala.collection.immutable.Iterable])
onlyDefined
}
}
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class TupledFlowSpec extends FlatSpec with Matchers with MockitoSugar {
//given
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
behavior of "TupledFlow"
it should "process event in a flow and return input and an output" in {
val source: Source[Int, NotUsed] = Source(1 to 10)
val flow = Flow[Int].map(_ * 2)
val result: Seq[(Int, Int)] = Await.result(source.via(TupledFlow.tupledFlow(flow)(materializer,actorSystem.dispatcher)).runWith(Sink.seq),1 second)
result.size shouldBe 10
result shouldBe (1 to 10).map(a => (a,a*2))
}
it should "process events when a flow filters some values but we don't know how to make that" in {
val source: Source[Int, NotUsed] = Source(1 to 10)
val flow = Flow[Int].filter(_ % 3 == 1).map(_ * 2)
val result: Seq[(Int, Int)] = Await.result(source.via(TupledFlow.tupledFlow(flow)(materializer,actorSystem.dispatcher)).runWith(Sink.seq),1 second)
result.size shouldBe 4
result shouldBe (1 to 10).filter(_ % 3 == 1).map(a => (a,a*2))
}
it should "process events when a flow map async " in {
val source: Source[Int, NotUsed] = Source(1 to 10)
val flow = Flow[Int].mapAsync(2)(n => Future.apply(n * 2)(actorSystem.dispatcher))
val result: Seq[(Int, Int)] = Await.result(source.via(TupledFlow.tupledFlow(flow)(materializer,actorSystem.dispatcher)).runWith(Sink.seq),1 second)
result.size shouldBe 10
result shouldBe (1 to 10).map(a => (a,a*2))
}
it should "process events if wrapped flow emits more results" in {
val source: Source[Int, NotUsed] = Source(1 to 10)
val flow = Flow[Int].mapConcat(n => Seq(n,n))
val result: Seq[(Int, Int)] = Await.result(source.via(TupledFlow.tupledFlow(flow)(materializer,actorSystem.dispatcher)).runWith(Sink.seq),1 second)
result.size shouldBe 20
result shouldBe (1 to 10).flatMap(a => Seq((a,a),(a,a)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment