Skip to content

Instantly share code, notes, and snippets.

@samschlegel
Created August 1, 2017 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samschlegel/ec52a80638f4148b0d7e94b7c53bf3df to your computer and use it in GitHub Desktop.
Save samschlegel/ec52a80638f4148b0d7e94b7c53bf3df to your computer and use it in GitHub Desktop.
Combine.perKey lifting test
import com.spotify.scio._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
object SlowFanOutThenCombine {
val LOG: Logger = LoggerFactory.getLogger(this.getClass)
def slowFanOut(n: Int): Iterator[(Int, (Int, Int))] = new Iterator[(Int, (Int, Int))] {
var i: Int = 0
override def hasNext: Boolean = i < 40
override def next(): (Int, (Int, Int)) = {
i += 1
Thread.sleep(1000)
LOG.info(s"Emit ($n, $i)")
(n, (n, i))
}
}
def seqOp(acc: (Int, Int), x: (Int, Int)): (Int, Int) = {
LOG.info(s"Seq $acc, $x")
(acc._1.max(x._1), acc._2 + x._2)
}
def combOp(x: (Int, Int), y: (Int, Int)): (Int, Int) = {
LOG.info(s"Comb $x, $y")
(x._1.max(y._1), x._2 + y._2)
}
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, _) = ContextAndArgs(cmdlineArgs)
val f =
sc.parallelize(1 to 8)
.flatMap(slowFanOut)
.aggregateByKey((0, 0))(seqOp, combOp)
.materialize
sc.close()
for (t <- f.waitForResult().value) {
println(t)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment