Skip to content

Instantly share code, notes, and snippets.

@metasim
Created August 14, 2014 14:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save metasim/0ea9de220fb0ac1aef41 to your computer and use it in GitHub Desktop.
Save metasim/0ea9de220fb0ac1aef41 to your computer and use it in GitHub Desktop.
Example of merging results from `Future`s created in an *akka-streams* `Flow` into another `Flow`..
import akka.actor._
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl._
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.testkit._
import org.scalatest.FunSpecLike
import scala.concurrent.Future
import scala.concurrent.duration._
class FutureMergeTest extends TestKit(ActorSystem()) with FunSpecLike with ImplicitSender {
val materializer = FlowMaterializer(MaterializerSettings())
// Function that artificially inserts a delay in attempt to reorder results.
val maybeDelay = (i: Int) ⇒ { if (i % 2 == 0) Thread.sleep(2000); i }
implicit val ec = system.dispatcher
describe("Flow producing futures") {
it("should be able to feed futures to Flow after materialization") {
val futureProducingDuct = Duct[Int]
.map { i ⇒ Future { maybeDelay(i); i } }
val valueIngester = system.actorOf(Props[ValueIngester])
val futureResolvingDuct = Duct[Future[Int]]
.map { futureInt ⇒
futureInt.onSuccess {
case i: Int ⇒
println("Posting Future value: " + i)
valueIngester ! i
}
futureInt
}
.mapFuture(identity)
val valueGatheringDuct = Duct[Int]
.fold(Seq.empty[Int])(_ :+ _)
val sendTestKitResultDuct = Duct[Seq[Int]]
.foreach(total ⇒ {
println("Collected values: " + total)
testActor ! total
})
val numRange = 1 to 10
val generationFlow = Flow(numRange.toIterator)
.append(futureProducingDuct)
.append(futureResolvingDuct)
val computeFlow = Flow(ActorPublisher[Int](valueIngester))
.map(i ⇒ { println("Received value: " + i); i })
.append(valueGatheringDuct)
.append(sendTestKitResultDuct)
note("Materializing flows")
computeFlow.consume(materializer)
generationFlow.onComplete(materializer) { _ ⇒
println("Generation complete. Sending done message.")
valueIngester ! UpstreamDone
}
note("Awaiting answer")
val results = expectMsgType[Seq[Int]](30.seconds)
assert(results.size == numRange.size)
assert(results.sum == numRange.sum)
// Asserts that the ordering is not the same....
// Assuming delay inserted in some arbitrary futures should
// allow others to come through
// as they are available
assert(results != numRange)
note("Ensuring single result")
expectNoMsg(3.seconds)
}
}
}
object UpstreamDone
/**
* Actor for coallating inputs from multiple sources (Flow, Future, etc.) into a new Flow.
*/
class ValueIngester extends ActorPublisher[Int] with ActorLogging {
// Partially copied from [ActorProducerSpec.scala](http://goo.gl/maSBlE)
import akka.stream.actor.ActorPublisher._
var buf = Vector.empty[Int]
def receive = active
def active: Receive = {
case i: Int ⇒
if (buf.isEmpty && totalDemand > 0)
onNext(i)
else {
buf :+= i
deliverBuf()
}
case Request(_) ⇒ deliverBuf()
case Cancel ⇒ context.stop(self)
case UpstreamDone ⇒ {
log.debug("Entering shutdown mode")
context.become(shuttingDown)
if (buf.isEmpty) onComplete()
}
case e ⇒ log.info("Unknown message in active state: " + e)
}
def shuttingDown: Receive = {
case Request(_) ⇒
deliverBuf()
if (buf.isEmpty) {
log.debug("Entering complete state")
onComplete()
}
case Cancel ⇒ context.stop(self)
case e ⇒ log.info("Unknown message in shuttingDown state: " + e)
}
def deliverBuf() {
if (totalDemand > 0) {
val (use, keep) = buf.splitAt(totalDemand)
buf = keep
use foreach onNext
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment