Skip to content

Instantly share code, notes, and snippets.

@giftig
Last active December 27, 2018 09:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save giftig/02f216b2341e95c2854242c0b363d218 to your computer and use it in GitHub Desktop.
Save giftig/02f216b2341e95c2854242c0b363d218 to your computer and use it in GitHub Desktop.
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
// Demo how to page through and group up matching rows in a source like a slick table, writing
// to some sink before proceeding to the next batch. Handles issues like matching entries being
// astride the boundary of pages or there being enough rows to group that it exceeds your standard
// page size
//
// N.B. to run, launch ammonite and $exec the script. Running directly with ammonite will cause
// problems due to a known bug in ammonite when running scripts with futures / awaits
val data = (1 to 100000).map { _ => Random.nextInt(500) }.sorted.toList
def batch(offset: Int, limit: Int): Future[List[Int]] = {
println(s"Taking batch [$offset:${offset + limit}]")
Future.successful(data.drop(offset).take(limit))
}
def writeBatch(b: List[Int]): Future[Unit] = Future.successful {
println(s"Writing batch of ${b.length} entries from ${b.head} to ${b.last}...")
}
/**
* Get a batch where we can guarantee we have all duplicates
*
* We'll trim off the last entry if we have more than one in the list, or else we'll keep reading
* until we either run out of data or we find another unique entry
*/
def computeBatch(offset: Int, batchSize: Int = 1000, acc: List[Int] = Nil): Future[List[Int]] = {
batch(offset, batchSize) flatMap {
case entries if entries.length == 0 =>
Future.successful(acc)
case entries =>
val combined = acc ++: entries
if (combined.head != combined.last) {
// Strip off the last unique entry to guarantee we've batched properly
Future.successful(combined takeWhile { _ != combined.last })
} else {
// All identical; get another batch and try again
computeBatch(offset + entries.length, batchSize, combined)
}
}
}
def writeBatches(): Future[Unit] = {
def nextBatch(
offset: Int = 0,
limit: Int = 1000
): Future[Unit] = {
computeBatch(offset, limit) flatMap {
case Nil =>
println("Found empty batch; completing")
Future.unit
case entries =>
writeBatch(entries) flatMap { _ => nextBatch(offset + entries.length) }
}
}
nextBatch()
}
writeBatches()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment