Skip to content

Instantly share code, notes, and snippets.

@rxin
Last active December 25, 2015 01:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rxin/6896688 to your computer and use it in GitHub Desktop.
Save rxin/6896688 to your computer and use it in GitHub Desktop.
take async
def takeAsync(num: Int): FutureAction[Seq[T]] = {
val promise = new CancellablePromise[Seq[T]]
promise.run {
val buf = new ArrayBuffer[T](num)
val totalParts = self.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts && !promise.cancelled) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
if (buf.size == 0) {
numPartsToTry = totalParts - 1
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
}
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val job = self.context.submitJob(
self,
(it: Iterator[T]) => it.take(1).toArray,
p,
(index: Int, data: Array[T]) => buf ++= data.take(1),
() => Unit)
try {
Await.result(job, Duration.Inf)
} catch {
case e: InterruptedException =>
job.cancel()
logWarning("blah", e)
throw e
case e: Exception =>
logWarning("blah", e)
throw e
}
partsScanned += numPartsToTry
}
buf.toSeq
}
promise.future
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment