Last active
March 18, 2019 10:57
-
-
Save JonNorman/09a583fbcec8d5491378c5bd7a212496 to your computer and use it in GitHub Desktop.
Useful script for running backfills on specific tables/data/dates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gu.emr | |
import java.time.LocalDate | |
import com.amazonaws.auth.profile.ProfileCredentialsProvider | |
import com.amazonaws.services.ec2.model.InstanceType | |
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceAsyncClientBuilder | |
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig | |
import com.gu.datalake.aws.LatestAWSSpotPricing | |
import com.gu.emr.model.ClusterDefinition.EC2SubnetID | |
import com.gu.emr.model._ | |
import com.gu.emr.model.configuration.Classification.SparkHiveSite | |
import com.gu.emr.model.configuration.ClusterConfiguration | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.ExecutionContext.Implicits.global | |
object CustomBackfill extends App { | |
import scala.concurrent.duration._ | |
import cats.instances.future._ | |
import CustomBackfillHelpers._ | |
val runConfiguration = RunConfiguration.default("s3://ophan-temp/emr/logs") | |
val maxStepsPerCluster = 250 | |
val jarPath = "s3://data-tech-temp/jon/pageview.jar" | |
val className = "com.gu.datalake.etl.pageview.regularity.BrowserVisitsJob" | |
val outputTable = "internal.browser_visits" | |
val outputDataPath = "s3://ophan-internal-browser-visits/build-1/" | |
val generateSpecificStep: String => EmrStep = generateGenericStep(_, jarPath, className, outputTable, outputDataPath) | |
val stepsByCluster: List[ClusterWithSteps] = { | |
for { | |
(steps, clusterIndex) <- Data.dates.map(generateSpecificStep).grouped(maxStepsPerCluster).toList.zipWithIndex | |
cluster = generateClusterWithIndex(clusterIndex) | |
} yield { | |
ClusterWithSteps(cluster, steps) | |
} | |
} | |
for { | |
clusterAndSteps <- stepsByCluster | |
step <- clusterAndSteps.steps | |
} yield { | |
println(s"${clusterAndSteps.cluster.name} -> ${step.name}") | |
} | |
val runs = stepsByCluster | |
.map { sbc => | |
manager | |
.launchCluster(sbc.cluster, runConfiguration) | |
.flatMap { clusterId => | |
println(s"launched cluster $clusterId") | |
manager.submitSteps(clusterId, sbc.steps) | |
} | |
.leftMap(e => println(e)) | |
.map(_ => println("submitted steps")) | |
} | |
Await.result( | |
Future.sequence { | |
for { | |
run <- runs | |
} yield run.value | |
}, | |
60.seconds | |
) | |
} | |
case class ClusterWithSteps(cluster: ClusterDefinition, steps: List[EmrStep]) | |
object CustomBackfillHelpers { | |
val creds = new ProfileCredentialsProvider("ophan") | |
val client = AmazonElasticMapReduceAsyncClientBuilder.standard().withCredentials(creds).build() | |
val manager = new EmrClusterManager("test", client) | |
val spotPricing = LatestAWSSpotPricing(creds, 1.1) | |
val masterInstance = SpotInstanceSpec(InstanceType.M3Xlarge, 1, spotPricing) | |
val coreInstances = SpotInstanceSpec(InstanceType.R44xlarge, 10, spotPricing) | |
def generateClusterWithIndex(index: Int): ClusterDefinition = { | |
ClusterDefinition( | |
s"jon-norman-manual-backfill-$index", | |
ReleaseLabel.`emr-5.18.0`, | |
autoTerminate = true, | |
masterInstance, | |
Some(coreInstances), | |
ec2SubnetId = Some(EC2SubnetID("subnet-73f78d14")), | |
configuration = ClusterConfiguration.from(SparkHiveSite.useGlue), | |
applications = Set(Application.Spark) | |
) | |
} | |
def generateGenericStep( | |
runDate: String, | |
jarPath: String, | |
className: String, | |
outputTable: String, | |
outputDataPath: String): EmrStep = { | |
val arguments = | |
List( | |
"spark-submit", | |
"--deploy-mode", | |
"client", | |
"--class", | |
className, | |
jarPath, | |
"--date-range", | |
s"$runDate,$runDate", | |
"--output-table", | |
outputTable, | |
"--output-data-path", | |
outputDataPath | |
) | |
EmrStep( | |
name = s"$className $runDate", | |
new HadoopJarStepConfig() | |
.withJar(EmrStep.DefaultJarPath) | |
.withArgs(arguments.asJavaCollection) | |
) | |
} | |
} | |
object Data { | |
val dates = List.range(0, 1096).map(LocalDate.of(2016, 1, 1).plusDays(_)).map(_.toString) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment