Skip to content

Instantly share code, notes, and snippets.

@jeantil
Created June 30, 2015 08:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeantil/ec68e9699bc90ba62d90 to your computer and use it in GitHub Desktop.
Save jeantil/ec68e9699bc90ba62d90 to your computer and use it in GitHub Desktop.
Generates an enumerator from a sequence of futures
//"experimental, needs to be checked for cpu consumption", "2015-02-20"
object Enumerators {
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random
import scala.concurrent.duration._
import play.api.libs.iteratee._
type OUT[T] = Option[T]
type ACC[T] = Seq[Future[T]]
type STEP[T] = Future[Option[(ACC[T], OUT[T])]]
def toEnumerator[A](seed: ACC[A]): Enumerator[OUT[A]] = Enumerator.unfoldM[ACC[A], OUT[A]](seed.toList) { innerFutures =>
if (innerFutures.isEmpty) Future.successful(None)
else {
val stringFO: Option[Future[A]] = innerFutures.find(_.isCompleted)
val default: STEP[A] = Future.successful(Some((innerFutures, None)))
val a: Option[STEP[A]] = stringFO.map { completedFuture =>
val remaining: ACC[A] = innerFutures.filterNot(_ == completedFuture)
completedFuture.map(s => Some((remaining, Option(s))))
}
blocking { Thread.sleep(10); a.getOrElse(default) }
}
}
def run(max: Int) = {
val range = (1 to max).toList
val delays = range.map(i => Random.nextInt(1000))
val rangeDelays = range zip delays
val a = rangeDelays.map {
case (i, d) =>
Future { blocking { Thread.sleep(d); s"${i} -> ${d}" } }
}
val enumerator = toEnumerator(a) &> Enumeratee.filter[Option[String]](_.isDefined) &> Enumeratee.map[Option[String]](_.get) &> Enumeratee.take[String](a.size + 1)
(enumerator |>> Iteratee.foreach(println)).flatMap(_.run)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment