Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
import org.wikimedia.analytics.refinery.spark.sql.JsonSchemaConverter
import org.wikimedia.analytics.refinery.core.jsonschema.EventSchemaLoader
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.DataFrame
import org.wikimedia.analytics.refinery.job.refine._
import org.wikimedia.analytics.refinery.spark.connectors.DataFrameToHive
import org.wikimedia.analytics.refinery.spark.sql.PartitionedDataFrame
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
def rawEventDataFrame(inputDf: DataFrame, schemaUri: String, rawEventColumnName: String = "rawevent"): DataFrame = {
val schemaBaseUris: Seq[String] = Seq(
"https://schema.wikimedia.org/repositories/primary/jsonschema",
"https://schema.wikimedia.org/repositories/secondary/jsonschema"
)
val schemaLoader = new EventSchemaLoader(schemaBaseUris.asJava)
val jsonSchema = schemaLoader.getLatestSchema(java.net.URI.create(schemaUri))
val sparkSchema = JsonSchemaConverter.toSparkSchema(jsonSchema)
val rawEventColumn = inputDf(rawEventColumnName)
val eventDf = inputDf.select(from_json(rawEventColumn, sparkSchema) as 'jsonstruct).select("jsonstruct.*")
eventDf
}
def backfillEventTableFromEventError(
database: String,
table: String,
location: String,
schemaTitle: String,
partition: ListMap[String, String]
): PartitionedDataFrame = {
val schemaUri = s"/$schemaTitle/latest"
val partitionSql = partition.map(p => s"${p._1}=${p._2}").mkString(" AND ")
// Read the JSON string from the rawevent field.
val rawEventDf = spark.sql(s"""
select event.rawevent from event.eventerror
where ${partitionSql} and event.rawevent LIKE '%${schemaTitle}%'
"""
)
// Parse the rawevent JSON string into a dataframe using jsonschema
val eventDf = rawEventDataFrame(rawEventDf, schemaUri).normalizeAndWiden()
val pdf = PartitionedDataFrame(
eventDf,
database,
table,
location,
partition
)
DataFrameToHive(
spark,
pdf,
() => (),
Seq((p) => event_transforms(p)),
"append"
)
}
val database ="otto"
val table = "searchsatisfaction_backfill_T249261"
val location = "/user/hive/warehouse/otto.db/searchsatisfaction_backfill_t249261"
val schemaTitle = "analytics/legacy/searchsatisfaction"
val partition = ListMap(
"year" -> "2020",
"month" -> "6",
"day" -> "18",
"hour" -> "21"
)
val database ="event"
val table = "searchsatisfaction"
val location = "/wmf/data/event/SearchSatisfaction"
val schemaTitle = "analytics/legacy/searchsatisfaction"
val wrotePdf = backfillEventTableFromEventError(
database, table, location, schemaTitle,
ListMap(
"year" -> "2020",
"month" -> "6",
"day" -> "18",
"hour" -> "20"
)
)
wrotePdf.df.count
res3: Long = 452
val wrotePdf = backfillEventTableFromEventError(
database, table, location, schemaTitle,
ListMap(
"year" -> "2020",
"month" -> "6",
"day" -> "18",
"hour" -> "21"
)
)
wrotePdf.df.count
res4: Long = 35379
val wrotePdf = backfillEventTableFromEventError(
database, table, location, schemaTitle,
ListMap(
"year" -> "2020",
"month" -> "6",
"day" -> "18",
"hour" -> "22"
)
)
wrotePdf.df.count
res5: Long = 47526
val wrotePdf = backfillEventTableFromEventError(
database, table, location, schemaTitle,
ListMap(
"year" -> "2020",
"month" -> "6",
"day" -> "18",
"hour" -> "23"
)
)
wrotePdf.df.count
res7: Long = 23039
val wrotePdf = backfillEventTableFromEventError(
database, table, location, schemaTitle,
ListMap(
"year" -> "2020",
"month" -> "6",
"day" -> "19",
"hour" -> "0"
)
)
wrotePdf.df.count
res8: Long = 524
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.