Skip to content

Instantly share code, notes, and snippets.

/clicks.scala Secret

Created February 14, 2018 09:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/90dac8efadca3a69571e619943ddb2f6 to your computer and use it in GitHub Desktop.
Save anonymous/90dac8efadca3a69571e619943ddb2f6 to your computer and use it in GitHub Desktop.
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.{global => scheduler}
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object Test extends App {
val spark = SparkSession.builder().appName("test").master("local[5]").getOrCreate()
import spark.implicits._
val stream = spark.readStream
.format("json")
.option("path", "/opt/test/events/*")
.schema(Encoders.product[Test].schema).load()
stream.createOrReplaceTempView("clicks_stream")
val userIdMapDf = spark.read
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.schema(idSchema)
.load("/opt/test/dw/users/cid/2018/02/13/*")
val cidDf = userIdMapDf.withColumn("old_cid", when($"did".isNull, $"aid").otherwise($"did")).drop("did", "aid")
cidDf.createOrReplaceTempView("users")
spark.sql("SELECT * FROM clicks_stream LEFT JOIN users ON clicks_stream.user_cid = users.old_cid")
.withColumn("is_new", when($"old_cid".isNull, true).otherwise(false)).createOrReplaceTempView("clicks")
val c = scheduler.scheduleWithFixedDelay(
0, 1, TimeUnit.SECONDS,
new Runnable {
def run(): Unit = {
val userIdMapDf = spark.read
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.schema(idSchema)
.load("/opt/test/dw/users/cid/2018/02/13/*")
val cidDf = userIdMapDf.withColumn("old_cid", when($"did".isNull, $"aid").otherwise($"did")).drop("did", "aid")
cidDf.createOrReplaceTempView("users")
spark.sql("SELECT * FROM clicks_stream LEFT JOIN users ON clicks_stream.user_cid = users.old_cid")
.withColumn("is_new", when($"old_cid".isNull, true).otherwise(false)).createOrReplaceTempView("clicks")
}
})
var outStream = spark.sql("SELECT user_cid, old_cid, event_name, is_new FROM clicks")
.writeStream
.outputMode("append")
.format("console")
.start()
outStream.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment