Last active
December 25, 2015 01:39
-
-
Save rxin/6896688 to your computer and use it in GitHub Desktop.
take async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def takeAsync(num: Int): FutureAction[Seq[T]] = { | |
val promise = new CancellablePromise[Seq[T]] | |
promise.run { | |
val buf = new ArrayBuffer[T](num) | |
val totalParts = self.partitions.length | |
var partsScanned = 0 | |
while (buf.size < num && partsScanned < totalParts && !promise.cancelled) { | |
// The number of partitions to try in this iteration. It is ok for this number to be | |
// greater than totalParts because we actually cap it at totalParts in runJob. | |
var numPartsToTry = 1 | |
if (partsScanned > 0) { | |
// If we didn't find any rows after the first iteration, just try all partitions next. | |
// Otherwise, interpolate the number of partitions we need to try, but overestimate it | |
// by 50%. | |
if (buf.size == 0) { | |
numPartsToTry = totalParts - 1 | |
} else { | |
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt | |
} | |
} | |
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions | |
val left = num - buf.size | |
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) | |
val job = self.context.submitJob( | |
self, | |
(it: Iterator[T]) => it.take(1).toArray, | |
p, | |
(index: Int, data: Array[T]) => buf ++= data.take(1), | |
() => Unit) | |
try { | |
Await.result(job, Duration.Inf) | |
} catch { | |
case e: InterruptedException => | |
job.cancel() | |
logWarning("blah", e) | |
throw e | |
case e: Exception => | |
logWarning("blah", e) | |
throw e | |
} | |
partsScanned += numPartsToTry | |
} | |
buf.toSeq | |
} | |
promise.future | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment