Skip to content

Instantly share code, notes, and snippets.

@nuria
Created December 9, 2019 16:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nuria/4250e59fd3d119fad9b85bfe806219c2 to your computer and use it in GitHub Desktop.
Save nuria/4250e59fd3d119fad9b85bfe806219c2 to your computer and use it in GitHub Desktop.
// spark2-shell --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar
/**
* Use RefineTarget.find to find all Refine targets for an input (camus job) in the last N hours.
* Then filter for any for which the _REFINED_FAILED flag exists.
*/
import import org.apache.hadoop.fs.Path
import org.joda.time.format.DateTimeFormatter
import com.github.nscala_time.time.Imports._
import scala.util.matching.Regex
import org.wikimedia.analytics.refinery.job.refine.{Refine,RefineTarget}
// non eventlogging data (e.g. EventBus, etc.)
val baseInputPath = new Path("/wmf/data/raw/event")
// output path
val baseTableLocationPath = new Path("/wmf/data/event")
// output database
val databaseName = "event"
// This is how partition values are parsed out of the input path
val inputPathRegex = new Regex(".*(eqiad|codfw)_(.+)/hourly/(\\d+)/(\\d+)/(\\d+)/(\\d+)", "datacenter", "table", "year", "month", "day", "hour")
// This is how the datetime of the input path is parsed
val inputPathDateTimeFormatter = DateTimeFormat.forPattern("'hourly'/yyyy/MM/dd/HH")
// Look back this far for RefineTargets
val sinceDateTime = DateTime.now - 48.hours
// Look back until this far for RefineTargets
val untilDateTime = DateTime.now - 2.hours
// Find RefineTargets
val targets = RefineTarget.find(
spark,
baseInputPath,
baseTableLocationPath,
databaseName,
inputPathDateTimeFormatter,
inputPathRegex,
sinceDateTime,
untilDateTime
)
// Find targets with _REFINED_FAILED flags
val failedTargets = targets.filter(_.failureFlagExists)
println(failedTargets.size)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment