Skip to content

Instantly share code, notes, and snippets.

@timvw
Created June 18, 2018 07:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timvw/6819e095c0e2e52f29105d340abf4b6f to your computer and use it in GitHub Desktop.
Save timvw/6819e095c0e2e52f29105d340abf4b6f to your computer and use it in GitHub Desktop.
Explore spark sql (avro -> json)
val spark = SparkSession
.builder()
.config("spark.sql.avro.compression.codec", "snappy")
.appName("test")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val file = "/Users/timvw/src/docker/docker-streaming/data/kafka/hdfs/topics/test/year=2018/month=06/day=06/hour=20/test+0+0000000000+0000000029.avro"
val df = spark.read.avro(file)
import org.apache.spark.sql.functions._
val stringMap = MapType(StringType, StringType, true)
val version = StructField("Version", StringType)
val header = StructField("Header", StructType(Array(
StructField("Id", StringType),
StructField("Timestamp", StringType)
)))
val context = StructField("Context", StructType(Array(
StructField("Device", StructType(Array(
StructField("DeviceId", StringType)
))),
StructField("Component", StructType(Array(
StructField("SourceReference", StringType)
))),
StructField("Subscriber", StructType(Array(
StructField("CustomerId", StringType))))
)))
val price = StructType(Array(
StructField("InVat", StringType),
StructField("ExVat", StringType)
))
val entity = StructField("Entity", StructType(Array(
StructField("Id", StringType),
StructField("Timestamp", StringType),
StructField("Entitlement", stringMap),
StructField("Currency", StringType),
StructField("PurchasePrice", price),
StructField("ListPrice", price)
)))
val payload = StructField("Payload", StructType(Array(StructField("Data", StructType(Array(entity))))))
val schema = StructType(Array(version, header, context, payload))
val df2 = df.select(from_json($"value", schema).as("Msg"))
df2.printSchema()
df2.show(5, false)
val df3 = df2.select(
col("Msg.Context.Device.DeviceId").as("DeviceId"),
col("Msg.Context.Component.SourceReference").as("VodFileName"),
col("Msg.Payload.Data.Entity.Id").as("PurchaseId"),
col("Msg.Payload.Data.Entity.Entitlement.ProductOfferRef").as("ProductOfferRef"),
col("Msg.Payload.Data.Entity.Currency").as("Currency"),
col("Msg.Payload.Data.Entity.PurchasePrice.ExVat").as("PurchasePrice"),
col("Msg.Payload.Data.Entity.ListPrice.ExVat").as("ListPriceExVat"),
col("Msg.Payload.Data.Entity.ListPrice.InVat").as("ListPriceInVat"),
regexp_replace(col("Msg.Context.Subscriber.CustomerId"), "_be$", "").as("DTVLine"))
df3.printSchema()
df3.show(5, false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment