Skip to content

Instantly share code, notes, and snippets.

@soulmachine
Created June 18, 2016 15:13
Show Gist options
  • Save soulmachine/c1448aeb5e125c2f51e0fe800bcc55f4 to your computer and use it in GitHub Desktop.
Save soulmachine/c1448aeb5e125c2f51e0fe800bcc55f4 to your computer and use it in GitHub Desktop.
package com.cmcm.bdp
import java.io.FileInputStream
import java.net.InetAddress
import com.databricks.spark.avro._
import com.maxmind.geoip2.DatabaseReader
import org.apache.spark._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{coalesce, col, collect_list, row_number}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable
import scala.io.Source
object UserDailySnapshot extends Logging {
def main(args: Array[String]) {
if (args.length != 5 && args.length != 6) {
System.err.println("Usage: UserDailySnapshot <user_location> <user_install_status> <oversea_user_label> <cm_active_user> <output> [numPartition]")
return
}
val conf = new SparkConf().setAppName("Spark User Daily Snapshot")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
val numPartitions = if (args.length == 6) args(5).toInt else 0
val primaryKeyName = "aid"
val primaryKey = col(primaryKeyName)
// read data
val cmActiveUserDf = {
val schema = asNullable(sqlContext.read.avro(args(3)).schema)
val tmpDf = sqlContext.read.schema(schema).avro(args(3)).filter(primaryKey.isNotNull)
// why there are duplicated utctime for the same uid? I have to use row_number instead of dense_rank,
// which is not instinctive
val windowSpec = Window.partitionBy(primaryKey).orderBy($"utctime".desc)
tmpDf.withColumn("rank", row_number().over(windowSpec)).filter($"rank" === 1).drop($"rank")
}.persist(StorageLevel.DISK_ONLY)
// val userLocationDf = {
// val schema = asNullable(sqlContext.read.avro(args(0)).schema)
// val tmpDf = sqlContext.read.schema(schema).avro(args(0)).filter(primaryKey.isNotNull)
// aidDf.join(tmpDf, aidDf(primaryKeyName) === tmpDf(primaryKeyName)).drop(aidDf(primaryKeyName))
// }
val userInstallStatusDf = {
val schema = asNullable(sqlContext.read.orc(args(1)).schema)
sqlContext.read.schema(schema).orc(args(1)).filter(primaryKey.isNotNull)
.groupBy(primaryKey).agg(collect_list($"pkgname") as "pkgnames",
collect_list($"gpcategory") as "gpcategories")
}.persist(StorageLevel.DISK_ONLY)
val overseaUserLabelDf = {
val schema = asNullable(sqlContext.read.parquet(args(2)).schema)
sqlContext.read.schema(schema).parquet(args(2)).withColumnRenamed("id", "aid").filter(primaryKey.isNotNull)
}
val join1 = cmActiveUserDf.join(userInstallStatusDf, cmActiveUserDf(primaryKeyName) === userInstallStatusDf(primaryKeyName),
"left_outer").drop(userInstallStatusDf(primaryKeyName)).persist(StorageLevel.DISK_ONLY)
// val join1 = uniqueJoin(cmActiveUserDf, overseaUserLabelDf, primaryKeyName)
// val join2 = userInstallStatusDf.join(userLocationDf,
// userInstallStatusDf(primaryKeyName) === userLocationDf(primaryKeyName), "outer").withColumn(primaryKeyName + "_tmp",
// coalesce(userInstallStatusDf(primaryKeyName), userLocationDf(primaryKeyName)))
// .drop(userInstallStatusDf(primaryKeyName)).drop(userLocationDf(primaryKeyName))
// .withColumnRenamed(primaryKeyName + "_tmp", primaryKeyName)
val join3 = uniqueJoin(join1, overseaUserLabelDf, primaryKeyName).persist(StorageLevel.DISK_ONLY)
// add MaxMind geographical fields
val joinedRdd = join3.mapPartitions { iter =>
val cityDatabase = new DatabaseReader.Builder(new FileInputStream("GeoIP2-City.mmdb")).build()
iter.map { row =>
val ip = row.get(row.fieldIndex("ip")).asInstanceOf[String]
if (ip != null) {
val ipAddress = InetAddress.getByName(ip)
try {
val response = cityDatabase.city(ipAddress)
val city = response.getCity.getName
val country = response.getCountry.getName
val countryIsoCode = response.getCountry.getIsoCode
val latitude = response.getLocation.getLatitude
val longitude = response.getLocation.getLongitude
val timeZone = response.getLocation.getTimeZone
val postalCode = response.getPostal.getCode
Row(row.toSeq ++ Seq(city, country, countryIsoCode, latitude, longitude, timeZone, postalCode): _*)
} catch {
case ex: com.maxmind.geoip2.exception.AddressNotFoundException =>
Row(row.toSeq ++ Seq(null, null, null, null, null, null, null): _*)
}
} else {
Row(row.toSeq ++ Seq(null, null, null, null, null, null, null): _*)
}
}
}
val schemaAfterJoin = StructType(join3.schema.fields ++ Array(StructField("maxmind_city", StringType),
StructField("maxmind_country", StringType), StructField("maxmind_country_iso_code", StringType),
StructField("maxmind_latitude", DoubleType), StructField("maxmind_longitude", DoubleType),
StructField("maxmind_time_zone", StringType),
StructField("maxmind_postal_code", StringType)))
val joinedDf = sqlContext.createDataFrame(joinedRdd, schemaAfterJoin).persist(StorageLevel.DISK_ONLY)
// flatten tags
val idNameMap = {
val lines = Source.fromInputStream(this.getClass.getResourceAsStream("/label_keys.txt")).mkString.split("\n")
lines.map{ line=>
val tmp = line.split("\t")
(tmp(0),tmp(1))
}.toMap
}
val schemaAfterFlatten = StructType(joinedDf.schema.fields ++
idNameMap.values.toArray.sorted.map(fieldName => StructField(fieldName, BooleanType)))
val idPosMap = idNameMap.keys.toArray.sorted.zipWithIndex.map { case (id, index) =>
(id, index)
}.toMap
val idPosMapBroadcast = sc.broadcast(idPosMap)
val flattenRdd = joinedDf.map { row =>
val tags = row.getAs[mutable.WrappedArray[String]]("tags")
val idPosMap = idPosMapBroadcast.value
val values = new Array[Boolean](idPosMap.size)
if (tags != null) {
tags.foreach { tag =>
if (idPosMap.contains(tag)) values(idPosMap(tag)) = true
}
}
Row.fromSeq(row.toSeq ++ values)
}
val flattenDf = sqlContext.createDataFrame(flattenRdd, schemaAfterFlatten)
flattenDf.write.parquet(args(4))
sc.stop()
}
// Inner join and remove duplicated columns
private def uniqueJoin(df1 : DataFrame, df2 : DataFrame, joinKey : String) : DataFrame = {
val commonColumns = df1.columns.toSet intersect df2.columns.toSet
val selectedColumns = df1.columns.collect{ case c => df1(c) } ++
df2.columns.collect{ case c if !commonColumns.contains(c) => df2(c) }
df1.join(df2, df1(joinKey) === df2(joinKey), "left_outer").select(selectedColumns:_*)
}
// copied from StrucType.asNullable()
private def asNullable(schema: StructType): StructType = {
val newFields = schema.fields.map {
case StructField(name, dataType, nullable, metadata) =>
StructField(name, dataType, nullable = true, metadata)
}
StructType(newFields)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment