Created
June 18, 2018 07:45
-
-
Save timvw/6819e095c0e2e52f29105d340abf4b6f to your computer and use it in GitHub Desktop.
Explore spark sql (avro -> json)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val spark = SparkSession | |
.builder() | |
.config("spark.sql.avro.compression.codec", "snappy") | |
.appName("test") | |
.master("local[*]") | |
.getOrCreate() | |
import spark.implicits._ | |
val file = "/Users/timvw/src/docker/docker-streaming/data/kafka/hdfs/topics/test/year=2018/month=06/day=06/hour=20/test+0+0000000000+0000000029.avro" | |
val df = spark.read.avro(file) | |
import org.apache.spark.sql.functions._ | |
val stringMap = MapType(StringType, StringType, true) | |
val version = StructField("Version", StringType) | |
val header = StructField("Header", StructType(Array( | |
StructField("Id", StringType), | |
StructField("Timestamp", StringType) | |
))) | |
val context = StructField("Context", StructType(Array( | |
StructField("Device", StructType(Array( | |
StructField("DeviceId", StringType) | |
))), | |
StructField("Component", StructType(Array( | |
StructField("SourceReference", StringType) | |
))), | |
StructField("Subscriber", StructType(Array( | |
StructField("CustomerId", StringType)))) | |
))) | |
val price = StructType(Array( | |
StructField("InVat", StringType), | |
StructField("ExVat", StringType) | |
)) | |
val entity = StructField("Entity", StructType(Array( | |
StructField("Id", StringType), | |
StructField("Timestamp", StringType), | |
StructField("Entitlement", stringMap), | |
StructField("Currency", StringType), | |
StructField("PurchasePrice", price), | |
StructField("ListPrice", price) | |
))) | |
val payload = StructField("Payload", StructType(Array(StructField("Data", StructType(Array(entity)))))) | |
val schema = StructType(Array(version, header, context, payload)) | |
val df2 = df.select(from_json($"value", schema).as("Msg")) | |
df2.printSchema() | |
df2.show(5, false) | |
val df3 = df2.select( | |
col("Msg.Context.Device.DeviceId").as("DeviceId"), | |
col("Msg.Context.Component.SourceReference").as("VodFileName"), | |
col("Msg.Payload.Data.Entity.Id").as("PurchaseId"), | |
col("Msg.Payload.Data.Entity.Entitlement.ProductOfferRef").as("ProductOfferRef"), | |
col("Msg.Payload.Data.Entity.Currency").as("Currency"), | |
col("Msg.Payload.Data.Entity.PurchasePrice.ExVat").as("PurchasePrice"), | |
col("Msg.Payload.Data.Entity.ListPrice.ExVat").as("ListPriceExVat"), | |
col("Msg.Payload.Data.Entity.ListPrice.InVat").as("ListPriceInVat"), | |
regexp_replace(col("Msg.Context.Subscriber.CustomerId"), "_be$", "").as("DTVLine")) | |
df3.printSchema() | |
df3.show(5, false) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment