Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Backfiller command line tool
package backfiller
import aws.AWS
import cats.effect.{ExitCode, IO, IOApp}
import com.amazonaws.AmazonServiceException
import com.amazonaws.retry.RetryUtils
import com.gu.emr.ClusterManager.ClusterID
import com.gu.emr.EmrClusterManager
import com.gu.emr.model.{ClusterDefinition, EmrStep, RunConfiguration}
import fs2.{Chunk, Pure, Stream}
import newworld.syntax.ClusterManagerSyntax
import org.joda.time.LocalDate
import scheduler.actions.ActionSet
import scala.concurrent.ExecutionContext
object Backfiller extends IOApp with ClusterManagerSyntax {
val MaxStepsPerCluster = 250
private val defaultRunConfiguration = RunConfiguration.default("s3://ophan-temp/emr/logs")
def run(args: List[String]): IO[ExitCode] = {
if (args.length < 2 || args.length > 3) {
System.err.println("Usage: Backfiller <Job ID> <start date> <end date>")
System.exit(1)
}
val jobId = args(0)
val startDate = LocalDate.parse(args(1))
val endDate = LocalDate.parse(args(2))
val job = ActionSet.prod.findById(jobId).getOrElse(throw new Error(s"Couldn't find action with ID: $jobId"))
println(s"Running job '${job.id}' (${job.description}) from date $startDate to $endDate")
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global
val dates: Stream[Pure, LocalDate] = Stream
.range(0, Int.MaxValue)
.map(startDate.plusDays)
.takeThrough(date => endDate.isAfter(date))
val jobSteps: Stream[Pure, Chunk[StepForDate]] =
dates
.flatMap(date => Stream.emits(job.steps(date).map(step => StepForDate(date, step))))
.chunkN(MaxStepsPerCluster)
val emr = AWS.emr
val clusterManager = new EmrClusterManager("backfiller", emr)(ec)
val program: Stream[IO, ExitCode] = for {
jobStepChunk <- jobSteps.covary[IO]
clusterIdForChunk <- Stream.eval(
getOrCreateEmrCluster(clusterManager,
job.clusterDefinition,
backfillClusterName(job.clusterDefinition.name, jobStepChunk.toList.map(_.date))))
_ = println(s"Adding ${jobStepChunk.size} steps to cluster $clusterIdForChunk")
_ <- Stream.eval(clusterManager.submitStepsIO(clusterIdForChunk, jobStepChunk.toList.map(_.step)))
_ = println("All done")
} yield ExitCode.Success
program.compile.last.map(_.getOrElse(ExitCode.Error)).guarantee(IO(emr.shutdown()))
}
def getOrCreateEmrCluster(
clusterManager: EmrClusterManager,
clusterDefinition: ClusterDefinition,
backfillClusterName: String): IO[ClusterID] = {
val renamedCluster = clusterDefinition.copy(name = backfillClusterName)
for {
maybeExistingClusterId <- clusterManager.findClusterIdIO(renamedCluster)
clusterId <- getOrElse(maybeExistingClusterId,
clusterManager.launchClusterIO(renamedCluster, defaultRunConfiguration))
} yield clusterId
}
def backfillClusterName(originalName: String, dates: List[LocalDate]): String =
s"Backfill $originalName ${dates.head} to ${dates.last}"
import scala.concurrent.duration._
def asRetriableStream[T](op: IO[T], operationDescription: String): Stream[IO, T] =
fs2.Stream
.retry(op, 10.seconds, nextDelay = identity, maxAttempts = 6 * 10, isRetriable(operationDescription))
case class StepForDate(date: LocalDate, step: EmrStep)
case object Waiting extends Exception
def isRetriable(operation: String): Throwable => Boolean = {
case Waiting =>
System.err.println(s"Retrying $operation")
true
case e: AmazonServiceException if RetryUtils.isRetryableServiceException(e) =>
System.err.println(s"Retriable AWS error (${e.getErrorCode}) while performing $operation", e)
true
case e =>
System.err.println(s"Error while performing $operation", e)
false
}
// Implement here as the old version of cats-effect that's currently on classpath doesn't have it.
def getOrElse[A](maybeA: Option[A], alt: => IO[A]): IO[A] = maybeA match {
case Some(a) => IO.pure(a)
case None => alt
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment