Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active October 2, 2019 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/b9c59bc0858832bdf4ed1ebcd7187397 to your computer and use it in GitHub Desktop.
Save ottomata/b9c59bc0858832bdf4ed1ebcd7187397 to your computer and use it in GitHub Desktop.
backfill_mediawiki_revision_score.scala
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// We need this to convert the out of order new schema to the new hive table schema.
// This also is used to drop columns that aren't in the new hive table schema.
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
// Get the new desired field schemas
val mediawiki_revision_score_2 = spark.table("event.mediawiki_revision_score")
val scoreMapFieldSchema = mediawiki_revision_score_2.schema("scores").dataType
val errorMapFieldSchema = mediawiki_revision_score_2.schema("errors").dataType
// Function to convert scores array to scores map
val scoreArrayRowStructToMap = (scoresArray: Seq[Row]) => {
// We need to return an Option to avoid NullPointerExceptions if values are null
if (scoresArray == null || scoresArray.length == 0) {
None
}
else {
// Convert the array of score structs to a array of score with probability maps
val scoresWithMapProbability = scoresArray.map(scoreWithStructProbabilty => {
val model_name = scoreWithStructProbabilty.getString(0)
val model_version = scoreWithStructProbabilty.getString(1)
val prediction = scoreWithStructProbabilty.getSeq[String](2)
val probabilityMap = scoreWithStructProbabilty.getSeq[Row](3).map(p => p.getString(0) -> p.getDouble(1)).toMap
Row(model_name, model_version, prediction, probabilityMap)
})
// convert the array of score object with probability maps to
// a map of model_name -> score object
Some(scoresWithMapProbability.map(r => r.getString(0) -> r).toMap)
}
}
// Make a udf
val scoreArrayRowStructToMapUdf = udf(scoreArrayRowStructToMap, scoreMapFieldSchema)
// function to convert errors array to errors map
val errorArrayRowStructToMap = (errorsArray: Seq[Row]) => {
if (errorsArray == null || errorsArray.length == 0) {
None
}
else {
Some(errorsArray.map(errorStruct => errorStruct.getString(0) -> errorStruct).toMap)
}
}
val errorArrayRowStructToMapUdf = udf(errorArrayRowStructToMap, errorMapFieldSchema)
def convertRevisionScore1to2(revScore1Df: DataFrame) = {
// Use the udf to convert and add the new map columns, and then drop the old array ones.
revScore1Df
.withColumn("scores_map", scoreArrayRowStructToMapUdf(col("scores"))).drop("scores").withColumnRenamed("scores_map", "scores")
.withColumn("errors_map", errorArrayRowStructToMapUdf(col("errors"))).drop("errors").withColumnRenamed("errors_map", "errors")
// Now all of the fields should be the same, we just need the field order to be the same.
// Good thing we have HiveExtensions convertToSchema!
// Note: This also drops 2 unused columns: meta.schema_uri and meta.topic
.convertToSchema(mediawiki_revision_score_2.schema)
}
def convertAndWriteRevisionScore1to2(revScore1Df: DataFrame, outputBasePath: String) = {
// I can't seem to insert this DataFrame directly into event.mediawiki_revision_score.
// I get ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead.
// I've tried modifying the UDF functions above to return Options for any map type inside (like probability),
// but it doesnt't help. Without the options, I get NullPointerExceptions.
// So! We write this to its own NEW hive table, and will switch to Hive directoy
// to insert into event.mediawiki_revision_score.
convertRevisionScore1to2(revScore1Df)
.write
.partitionBy("datacenter", "year", "month", "day", "hour")
.mode("append")
.parquet(outputBasePath)
}
val months = Seq(
("2018", "12"),
("2019", "1"),
("2019", "2"),
("2019", "3"),
("2019", "4"),
("2019", "5"),
("2019", "6"),
("2019", "7"),
("2019", "8"),
("2019", "9")
)
val mediawiki_revision_score_1 = spark.table("otto.mediawiki_revision_score_1")
months.foreach({ case (year, month) => {
println(s"------ BEGIN Transforming ${year} ${month} day < 15")
convertAndWriteRevisionScore1to2(
mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day < 15"),
"/user/otto/mediawiki_revision_score_1_backfill/backfill0"
)
println(s"------ DONE Transforming ${year} ${month} day < 15\n\n\n")
println(s"------ BEGIN Transforming ${year} ${month} day >= 15")
convertAndWriteRevisionScore1to2(
mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day >= 15"),
"/user/otto/mediawiki_revision_score_1_backfill/backfill0"
)
println(s"------ DONE Transforming ${year} ${month} day >= 15\n\n\n")
}})
// --- TODO ---
// Move data dirs out of /user/otto/mediawiki_revision_score_1_backfill/backfill0 into event/mediawiki_revision_score
// MSCK REPAIR TABLE event.mediawiki_revision_score
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment