Skip to content

Instantly share code, notes, and snippets.

@JonNorman
Last active March 18, 2019 10:57
Show Gist options
  • Save JonNorman/09a583fbcec8d5491378c5bd7a212496 to your computer and use it in GitHub Desktop.
Save JonNorman/09a583fbcec8d5491378c5bd7a212496 to your computer and use it in GitHub Desktop.
Useful script for running backfills on specific tables/data/dates
package com.gu.emr
import java.time.LocalDate
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.services.ec2.model.InstanceType
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceAsyncClientBuilder
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig
import com.gu.datalake.aws.LatestAWSSpotPricing
import com.gu.emr.model.ClusterDefinition.EC2SubnetID
import com.gu.emr.model._
import com.gu.emr.model.configuration.Classification.SparkHiveSite
import com.gu.emr.model.configuration.ClusterConfiguration
import scala.collection.JavaConverters._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
object CustomBackfill extends App {
import scala.concurrent.duration._
import cats.instances.future._
import CustomBackfillHelpers._
val runConfiguration = RunConfiguration.default("s3://ophan-temp/emr/logs")
val maxStepsPerCluster = 250
val jarPath = "s3://data-tech-temp/jon/pageview.jar"
val className = "com.gu.datalake.etl.pageview.regularity.BrowserVisitsJob"
val outputTable = "internal.browser_visits"
val outputDataPath = "s3://ophan-internal-browser-visits/build-1/"
val generateSpecificStep: String => EmrStep = generateGenericStep(_, jarPath, className, outputTable, outputDataPath)
val stepsByCluster: List[ClusterWithSteps] = {
for {
(steps, clusterIndex) <- Data.dates.map(generateSpecificStep).grouped(maxStepsPerCluster).toList.zipWithIndex
cluster = generateClusterWithIndex(clusterIndex)
} yield {
ClusterWithSteps(cluster, steps)
}
}
for {
clusterAndSteps <- stepsByCluster
step <- clusterAndSteps.steps
} yield {
println(s"${clusterAndSteps.cluster.name} -> ${step.name}")
}
val runs = stepsByCluster
.map { sbc =>
manager
.launchCluster(sbc.cluster, runConfiguration)
.flatMap { clusterId =>
println(s"launched cluster $clusterId")
manager.submitSteps(clusterId, sbc.steps)
}
.leftMap(e => println(e))
.map(_ => println("submitted steps"))
}
Await.result(
Future.sequence {
for {
run <- runs
} yield run.value
},
60.seconds
)
}
case class ClusterWithSteps(cluster: ClusterDefinition, steps: List[EmrStep])
object CustomBackfillHelpers {
val creds = new ProfileCredentialsProvider("ophan")
val client = AmazonElasticMapReduceAsyncClientBuilder.standard().withCredentials(creds).build()
val manager = new EmrClusterManager("test", client)
val spotPricing = LatestAWSSpotPricing(creds, 1.1)
val masterInstance = SpotInstanceSpec(InstanceType.M3Xlarge, 1, spotPricing)
val coreInstances = SpotInstanceSpec(InstanceType.R44xlarge, 10, spotPricing)
def generateClusterWithIndex(index: Int): ClusterDefinition = {
ClusterDefinition(
s"jon-norman-manual-backfill-$index",
ReleaseLabel.`emr-5.18.0`,
autoTerminate = true,
masterInstance,
Some(coreInstances),
ec2SubnetId = Some(EC2SubnetID("subnet-73f78d14")),
configuration = ClusterConfiguration.from(SparkHiveSite.useGlue),
applications = Set(Application.Spark)
)
}
def generateGenericStep(
runDate: String,
jarPath: String,
className: String,
outputTable: String,
outputDataPath: String): EmrStep = {
val arguments =
List(
"spark-submit",
"--deploy-mode",
"client",
"--class",
className,
jarPath,
"--date-range",
s"$runDate,$runDate",
"--output-table",
outputTable,
"--output-data-path",
outputDataPath
)
EmrStep(
name = s"$className $runDate",
new HadoopJarStepConfig()
.withJar(EmrStep.DefaultJarPath)
.withArgs(arguments.asJavaCollection)
)
}
}
object Data {
val dates = List.range(0, 1096).map(LocalDate.of(2016, 1, 1).plusDays(_)).map(_.toString)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment