Created
June 4, 2019 15:17
-
-
Save anish749/fd12ee02f32c7df7e2b08ccc0b89ed55 to your computer and use it in GitHub Desktop.
QScio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package qscio | |
import java.util.UUID | |
import com.spotify.scio.values.SCollection | |
import com.spotify.scio.{ScioContext, ScioMetrics, ScioResult} | |
import com.typesafe.config.{Config, ConfigFactory} | |
import org.apache.beam.runners.dataflow.DataflowRunner | |
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions | |
import org.apache.beam.runners.direct.DirectRunner | |
import org.apache.beam.sdk.metrics.Counter | |
import scala.collection.mutable | |
import scala.reflect.ClassTag | |
import scala.util.Random | |
/** | |
* Trait which adds utilities for easily writing simple benchmarking jobs. | |
*/ | |
trait DataflowBenchmark { | |
lazy val conf: Config = ConfigFactory.load("src/main/resources/application.conf") | |
lazy val devNull: String = "gs://subs-platform-testing/dev-null" // conf.getString("gcp.dev-null") | |
lazy val gcpProject: String = "subs-platform-testing" // conf.getString("gcp.project") | |
lazy val defaultWorker: String = "n1-standard-8" | |
lazy val defaultNumWorkers: Int = 4 | |
lazy val defaultMaxNumWorkers: Int = 20 | |
private lazy val pathsToCleanLater = mutable.ListBuffer.empty[String] | |
private var lastContext: ScioContext = _ // The last context that was created. Used by helpers | |
def counter(name: String): Counter = ScioMetrics.counter(name) | |
def jobName: String = { | |
val className = getClass.toString | |
className | |
.replace("class", "") | |
.replace(" ", "") | |
.replace(".", "") | |
.replace("$", "") | |
.toLowerCase | |
} | |
// TODO For local system paths, this should go to actual dev null. | |
def getNewDevNull: String = { | |
val devNullPath = s"$devNull/${Random.nextString(6)}" | |
pathsToCleanLater += devNullPath | |
devNullPath | |
} | |
/** | |
* Tmp paths that were used in this run. | |
* Exposes an immutable object. | |
*/ | |
def getPathsToClean: List[String] = pathsToCleanLater.toList | |
implicit class QuickScioCollection[T: ClassTag](self: SCollection[T]) { | |
def toDevNull: Unit = | |
self | |
.withName("dev-null") | |
.saveAsTextFile(getNewDevNull) | |
} | |
def benchLocal[T](fn: ScioContext => T): T = { | |
val sc = ScioContext() | |
lastContext = sc | |
sc.options.setRunner(classOf[DirectRunner]) | |
val t = fn(sc) | |
val scioResult = sc.close().waitUntilDone() | |
warnUndeletedFiles() | |
t | |
} | |
def bench[T](fn: ScioContext => T): T = { | |
val sc = context | |
val t = fn(sc) | |
val scioResult = sc.close() | |
warnUndeletedFiles() | |
t | |
} | |
def bench[T](numWorkers: Int, maxNumWorkers: Int)(fn: ScioContext => T): T = { | |
val sc = context(defaultWorker, numWorkers, maxNumWorkers) | |
val t = fn(sc) | |
val scioResult = sc.close() | |
warnUndeletedFiles() | |
t | |
} | |
def bench[T](workerType: String, numWorkers: Int, maxNumWorkers: Int)( | |
fn: ScioContext => T): T = { | |
val sc = context(workerType, numWorkers, maxNumWorkers) | |
val t = fn(sc) | |
val scioResult = sc.close() | |
warnUndeletedFiles() | |
t | |
} | |
private def warnUndeletedFiles(): Unit = { | |
println("Closing Context without waiting.") | |
if (pathsToCleanLater.nonEmpty) { | |
println(s"Following files should be deleted manually: $getPathsToClean") | |
} | |
} | |
def context: ScioContext = context(defaultWorker, defaultNumWorkers, defaultMaxNumWorkers) | |
def context(workerType: String, numWorkers: Int, maxNumWorkers: Int): ScioContext = { | |
val sc = ScioContext() | |
sc.options.setJobName(jobName) | |
sc.options.setTempLocation(s"$devNull/tmp") | |
sc.options.setRunner(classOf[DataflowRunner]) | |
sc.options.as(classOf[DataflowPipelineOptions]).setProject(gcpProject) | |
sc.options.as(classOf[DataflowPipelineOptions]).setStagingLocation(s"$devNull/tmp") | |
sc.options.as(classOf[DataflowPipelineOptions]).setRegion("europe-west1") | |
sc.options.as(classOf[DataflowPipelineOptions]).setWorkerMachineType(workerType) | |
sc.options.as(classOf[DataflowPipelineOptions]).setNumWorkers(numWorkers) | |
sc.options.as(classOf[DataflowPipelineOptions]).setMaxNumWorkers(maxNumWorkers) | |
lastContext = sc | |
sc | |
} | |
def randuid = UUID.randomUUID().toString | |
def randLong = Random.nextLong() | |
sealed trait datasets { | |
val longs: SCollection[Long] | |
lazy val users: SCollection[User] = longs.map(User(_)) | |
lazy val artists: SCollection[Artist] = longs.map(Artist(_)) | |
} | |
object thousand extends datasets { | |
lazy val longs: SCollection[Long] = lastContext.parallelize((1L to 1000).toSeq) | |
} | |
object million extends datasets { | |
lazy val longs: SCollection[Long] = | |
thousand.longs.transform("million")(_.flatMap(i => (1L to 1000).toList.map(_ + i * 1000))) | |
} | |
object hundredMillion extends datasets { | |
lazy val longs: SCollection[Long] = | |
million.longs.transform("hundredMillion")(_.flatMap(i => | |
(1L to 100).toList.map(_ + i * 1000))) | |
} | |
object billion extends datasets { | |
lazy val longs: SCollection[Long] = | |
million.longs.transform("billion")(_.flatMap(i => (1L to 1000).toList.map(_ + i * 1000))) | |
} | |
} | |
case class User(userId: String, userIdLong: Long, fields: List[String]) | |
object User { | |
def apply(id: Long): User = | |
User(UUID.randomUUID().toString, id, List("one two three").flatMap(s => (1 to 10).map(_ => s))) | |
} | |
case class Artist(artistId: String, artistIdLong: Long, artistName: String, fields: List[String]) | |
object Artist { | |
def apply(id: Long): Artist = | |
Artist(UUID.randomUUID().toString, | |
id, | |
UUID.randomUUID().toString, | |
List("one two three").flatMap(s => (1 to 10).map(_ => s))) | |
} | |
case class Track(trackId: String, trackName: String, artistId: String) | |
case class Plays(trackId: String, userId: String, playedAt: Long) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment