Created
August 14, 2014 14:38
-
-
Save metasim/0ea9de220fb0ac1aef41 to your computer and use it in GitHub Desktop.
Example of merging results from `Future`s created in an *akka-streams* `Flow` into another `Flow`..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.stream.actor.ActorPublisher | |
import akka.stream.scaladsl._ | |
import akka.stream.{ FlowMaterializer, MaterializerSettings } | |
import akka.testkit._ | |
import org.scalatest.FunSpecLike | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
class FutureMergeTest extends TestKit(ActorSystem()) with FunSpecLike with ImplicitSender { | |
val materializer = FlowMaterializer(MaterializerSettings()) | |
// Function that artificially inserts a delay in attempt to reorder results. | |
val maybeDelay = (i: Int) ⇒ { if (i % 2 == 0) Thread.sleep(2000); i } | |
implicit val ec = system.dispatcher | |
describe("Flow producing futures") { | |
it("should be able to feed futures to Flow after materialization") { | |
val futureProducingDuct = Duct[Int] | |
.map { i ⇒ Future { maybeDelay(i); i } } | |
val valueIngester = system.actorOf(Props[ValueIngester]) | |
val futureResolvingDuct = Duct[Future[Int]] | |
.map { futureInt ⇒ | |
futureInt.onSuccess { | |
case i: Int ⇒ | |
println("Posting Future value: " + i) | |
valueIngester ! i | |
} | |
futureInt | |
} | |
.mapFuture(identity) | |
val valueGatheringDuct = Duct[Int] | |
.fold(Seq.empty[Int])(_ :+ _) | |
val sendTestKitResultDuct = Duct[Seq[Int]] | |
.foreach(total ⇒ { | |
println("Collected values: " + total) | |
testActor ! total | |
}) | |
val numRange = 1 to 10 | |
val generationFlow = Flow(numRange.toIterator) | |
.append(futureProducingDuct) | |
.append(futureResolvingDuct) | |
val computeFlow = Flow(ActorPublisher[Int](valueIngester)) | |
.map(i ⇒ { println("Received value: " + i); i }) | |
.append(valueGatheringDuct) | |
.append(sendTestKitResultDuct) | |
note("Materializing flows") | |
computeFlow.consume(materializer) | |
generationFlow.onComplete(materializer) { _ ⇒ | |
println("Generation complete. Sending done message.") | |
valueIngester ! UpstreamDone | |
} | |
note("Awaiting answer") | |
val results = expectMsgType[Seq[Int]](30.seconds) | |
assert(results.size == numRange.size) | |
assert(results.sum == numRange.sum) | |
// Asserts that the ordering is not the same.... | |
// Assuming delay inserted in some arbitrary futures should | |
// allow others to come through | |
// as they are available | |
assert(results != numRange) | |
note("Ensuring single result") | |
expectNoMsg(3.seconds) | |
} | |
} | |
} | |
object UpstreamDone | |
/** | |
* Actor for coallating inputs from multiple sources (Flow, Future, etc.) into a new Flow. | |
*/ | |
class ValueIngester extends ActorPublisher[Int] with ActorLogging { | |
// Partially copied from [ActorProducerSpec.scala](http://goo.gl/maSBlE) | |
import akka.stream.actor.ActorPublisher._ | |
var buf = Vector.empty[Int] | |
def receive = active | |
def active: Receive = { | |
case i: Int ⇒ | |
if (buf.isEmpty && totalDemand > 0) | |
onNext(i) | |
else { | |
buf :+= i | |
deliverBuf() | |
} | |
case Request(_) ⇒ deliverBuf() | |
case Cancel ⇒ context.stop(self) | |
case UpstreamDone ⇒ { | |
log.debug("Entering shutdown mode") | |
context.become(shuttingDown) | |
if (buf.isEmpty) onComplete() | |
} | |
case e ⇒ log.info("Unknown message in active state: " + e) | |
} | |
def shuttingDown: Receive = { | |
case Request(_) ⇒ | |
deliverBuf() | |
if (buf.isEmpty) { | |
log.debug("Entering complete state") | |
onComplete() | |
} | |
case Cancel ⇒ context.stop(self) | |
case e ⇒ log.info("Unknown message in shuttingDown state: " + e) | |
} | |
def deliverBuf() { | |
if (totalDemand > 0) { | |
val (use, keep) = buf.splitAt(totalDemand) | |
buf = keep | |
use foreach onNext | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment