Skip to content

Instantly share code, notes, and snippets.

@dvannoy
Last active February 4, 2021 05:31
Show Gist options
  • Save dvannoy/744bbda12e35fcd15a600ee22684d72c to your computer and use it in GitHub Desktop.
Save dvannoy/744bbda12e35fcd15a600ee22684d72c to your computer and use it in GitHub Desktop.
val zoneDF = spark.read.option("header","true").schema(taxiZoneSchema).csv(taxiZoneSourcePath)
zoneDF.write.format("delta").mode("overwrite").save(taxiZonePath)
zoneDF.show()
val inputDF = spark.read.parquet(yellowSourcePath)
// Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed.
// Option A
// val transformedDF = {
// inputDF
// .withColumn("yearMonth", regexp_replace(substring("tpepPickupDatetime",1,7), '-', '_'))
// .withColumn("pickupDt", to_date("tpepPickupDatetime", dateFormat))
// .withColumn("dropoffDt", to_date("tpepDropoffDatetime", dateFormat))
// .withColumn("tipPct", col("tipAmount") / col("totalAmount"))
// }
// Option B
val transformedDF = inputDF.selectExpr(
"*",
"replace(left(tpepPickupDatetime, 7),'-','_') as yearMonth",
s"to_date(tpepPickupDatetime, '$dateFormat') as pickupDt",
s"to_date(tpepDropoffDatetime, '$dateFormat') as dropoffDt",
"tipAmount/totalAmount as tipPct")
val zoneDF = spark.read.format("delta").load(taxiZonePath)
// Join to bring in Taxi Zone data
val tripDF = {
transformedDF.as("t")
.join(zoneDF.as("z"), expr("t.PULocationID == z.LocationID"), joinType="left").drop("LocationID")
.withColumnRenamed("Burough", "PickupBurrough")
.withColumnRenamed("Zone", "PickupZone")
.withColumnRenamed("ServiceZone", "PickupServiceZone")
}
tripDF.write.mode("overwrite").partitionBy("yearMonth").format("delta").save(yellowDeltaPath)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import spark.implicits._
val yellowSourcePath = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet"
val taxiZoneSourcePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone_lookup.csv"
val taxiZonePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone"
val taxiRatePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_rate_code"
val yellowDeltaPath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta"
val dateFormat = "yyyy-MM-dd HH:mm:ss"
// Define a schema that Spark understands. This is one of several ways to do it.
val taxiZoneSchema = StructType(Seq(
StructField("LocationID", IntegerType),
StructField("Borough", StringType),
StructField("Zone", StringType),
StructField("ServiceZone", StringType)
))
val testDF = spark.read.format("delta").load(yellowDeltaPath).limit(20)
testDF.select("VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment