Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
QScio
package qscio
import java.util.UUID
import com.spotify.scio.values.SCollection
import com.spotify.scio.{ScioContext, ScioMetrics, ScioResult}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.runners.direct.DirectRunner
import org.apache.beam.sdk.metrics.Counter
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Random
/**
* Trait which adds utilities for easily writing simple benchmarking jobs.
*/
trait DataflowBenchmark {
lazy val conf: Config = ConfigFactory.load("src/main/resources/application.conf")
lazy val devNull: String = "gs://subs-platform-testing/dev-null" // conf.getString("gcp.dev-null")
lazy val gcpProject: String = "subs-platform-testing" // conf.getString("gcp.project")
lazy val defaultWorker: String = "n1-standard-8"
lazy val defaultNumWorkers: Int = 4
lazy val defaultMaxNumWorkers: Int = 20
private lazy val pathsToCleanLater = mutable.ListBuffer.empty[String]
private var lastContext: ScioContext = _ // The last context that was created. Used by helpers
def counter(name: String): Counter = ScioMetrics.counter(name)
def jobName: String = {
val className = getClass.toString
className
.replace("class", "")
.replace(" ", "")
.replace(".", "")
.replace("$", "")
.toLowerCase
}
// TODO For local system paths, this should go to actual dev null.
def getNewDevNull: String = {
val devNullPath = s"$devNull/${Random.nextString(6)}"
pathsToCleanLater += devNullPath
devNullPath
}
/**
* Tmp paths that were used in this run.
* Exposes an immutable object.
*/
def getPathsToClean: List[String] = pathsToCleanLater.toList
implicit class QuickScioCollection[T: ClassTag](self: SCollection[T]) {
def toDevNull: Unit =
self
.withName("dev-null")
.saveAsTextFile(getNewDevNull)
}
def benchLocal[T](fn: ScioContext => T): T = {
val sc = ScioContext()
lastContext = sc
sc.options.setRunner(classOf[DirectRunner])
val t = fn(sc)
val scioResult = sc.close().waitUntilDone()
warnUndeletedFiles()
t
}
def bench[T](fn: ScioContext => T): T = {
val sc = context
val t = fn(sc)
val scioResult = sc.close()
warnUndeletedFiles()
t
}
def bench[T](numWorkers: Int, maxNumWorkers: Int)(fn: ScioContext => T): T = {
val sc = context(defaultWorker, numWorkers, maxNumWorkers)
val t = fn(sc)
val scioResult = sc.close()
warnUndeletedFiles()
t
}
def bench[T](workerType: String, numWorkers: Int, maxNumWorkers: Int)(
fn: ScioContext => T): T = {
val sc = context(workerType, numWorkers, maxNumWorkers)
val t = fn(sc)
val scioResult = sc.close()
warnUndeletedFiles()
t
}
private def warnUndeletedFiles(): Unit = {
println("Closing Context without waiting.")
if (pathsToCleanLater.nonEmpty) {
println(s"Following files should be deleted manually: $getPathsToClean")
}
}
def context: ScioContext = context(defaultWorker, defaultNumWorkers, defaultMaxNumWorkers)
def context(workerType: String, numWorkers: Int, maxNumWorkers: Int): ScioContext = {
val sc = ScioContext()
sc.options.setJobName(jobName)
sc.options.setTempLocation(s"$devNull/tmp")
sc.options.setRunner(classOf[DataflowRunner])
sc.options.as(classOf[DataflowPipelineOptions]).setProject(gcpProject)
sc.options.as(classOf[DataflowPipelineOptions]).setStagingLocation(s"$devNull/tmp")
sc.options.as(classOf[DataflowPipelineOptions]).setRegion("europe-west1")
sc.options.as(classOf[DataflowPipelineOptions]).setWorkerMachineType(workerType)
sc.options.as(classOf[DataflowPipelineOptions]).setNumWorkers(numWorkers)
sc.options.as(classOf[DataflowPipelineOptions]).setMaxNumWorkers(maxNumWorkers)
lastContext = sc
sc
}
def randuid = UUID.randomUUID().toString
def randLong = Random.nextLong()
sealed trait datasets {
val longs: SCollection[Long]
lazy val users: SCollection[User] = longs.map(User(_))
lazy val artists: SCollection[Artist] = longs.map(Artist(_))
}
object thousand extends datasets {
lazy val longs: SCollection[Long] = lastContext.parallelize((1L to 1000).toSeq)
}
object million extends datasets {
lazy val longs: SCollection[Long] =
thousand.longs.transform("million")(_.flatMap(i => (1L to 1000).toList.map(_ + i * 1000)))
}
object hundredMillion extends datasets {
lazy val longs: SCollection[Long] =
million.longs.transform("hundredMillion")(_.flatMap(i =>
(1L to 100).toList.map(_ + i * 1000)))
}
object billion extends datasets {
lazy val longs: SCollection[Long] =
million.longs.transform("billion")(_.flatMap(i => (1L to 1000).toList.map(_ + i * 1000)))
}
}
case class User(userId: String, userIdLong: Long, fields: List[String])
object User {
def apply(id: Long): User =
User(UUID.randomUUID().toString, id, List("one two three").flatMap(s => (1 to 10).map(_ => s)))
}
case class Artist(artistId: String, artistIdLong: Long, artistName: String, fields: List[String])
object Artist {
def apply(id: Long): Artist =
Artist(UUID.randomUUID().toString,
id,
UUID.randomUUID().toString,
List("one two three").flatMap(s => (1 to 10).map(_ => s)))
}
case class Track(trackId: String, trackName: String, artistId: String)
case class Plays(trackId: String, userId: String, playedAt: Long)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.