Skip to content

Instantly share code, notes, and snippets.

@darkfrog26
Created February 3, 2021 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darkfrog26/481be7428fd1ebec1a0f5b05001467a8 to your computer and use it in GitHub Desktop.
Save darkfrog26/481be7428fd1ebec1a0f5b05001467a8 to your computer and use it in GitHub Desktop.
import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.{Failure, Success}
trait ParallelIterator[T] extends Iterator[T] { iterator =>
private val buffer = new ConcurrentLinkedQueue[T]
private var finished = false
private var failure: Option[Throwable] = None
private lazy val maxTimeoutMillis = maxTimeout.toMillis
protected def maxTimeout: FiniteDuration
protected def createFutures(): List[Future[Seq[T]]]
protected implicit def executionContext: ExecutionContext
buildFutures()
private def buildFutures(): Unit = recurseFutures(createFutures())
private def recurseFutures(futures: List[Future[Seq[T]]]): Unit = if (futures.nonEmpty) {
val future = futures.head
future.onComplete {
case Success(results) => {
results.foreach(buffer.add)
recurseFutures(futures.tail)
}
case Failure(throwable) => {
failure = Some(throwable)
finished = true
}
}
} else {
finished = true
}
override def hasNext: Boolean = {
if (!buffer.isEmpty) {
true
} else if (!finished) {
try {
val start = System.currentTimeMillis()
while (!finished && buffer.isEmpty) {
if (System.currentTimeMillis() - start > maxTimeoutMillis) {
val exc = new TimeoutException("Max timeout expired")
failure = Some(exc)
finished = true
throw exc
}
Thread.sleep(10)
}
!buffer.isEmpty
} catch {
case t: Throwable => {
finished = true
buffer.clear()
throw t
}
}
} else {
failure match {
case Some(t) => throw t
case None => false
}
}
}
override def next(): T = buffer.poll()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment