Last active
February 4, 2021 05:31
-
-
Save dvannoy/744bbda12e35fcd15a600ee22684d72c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val zoneDF = spark.read.option("header","true").schema(taxiZoneSchema).csv(taxiZoneSourcePath) | |
zoneDF.write.format("delta").mode("overwrite").save(taxiZonePath) | |
zoneDF.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val inputDF = spark.read.parquet(yellowSourcePath) | |
// Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed. | |
// Option A | |
// val transformedDF = { | |
// inputDF | |
// .withColumn("yearMonth", regexp_replace(substring("tpepPickupDatetime",1,7), '-', '_')) | |
// .withColumn("pickupDt", to_date("tpepPickupDatetime", dateFormat)) | |
// .withColumn("dropoffDt", to_date("tpepDropoffDatetime", dateFormat)) | |
// .withColumn("tipPct", col("tipAmount") / col("totalAmount")) | |
// } | |
// Option B | |
val transformedDF = inputDF.selectExpr( | |
"*", | |
"replace(left(tpepPickupDatetime, 7),'-','_') as yearMonth", | |
s"to_date(tpepPickupDatetime, '$dateFormat') as pickupDt", | |
s"to_date(tpepDropoffDatetime, '$dateFormat') as dropoffDt", | |
"tipAmount/totalAmount as tipPct") | |
val zoneDF = spark.read.format("delta").load(taxiZonePath) | |
// Join to bring in Taxi Zone data | |
val tripDF = { | |
transformedDF.as("t") | |
.join(zoneDF.as("z"), expr("t.PULocationID == z.LocationID"), joinType="left").drop("LocationID") | |
.withColumnRenamed("Burough", "PickupBurrough") | |
.withColumnRenamed("Zone", "PickupZone") | |
.withColumnRenamed("ServiceZone", "PickupServiceZone") | |
} | |
tripDF.write.mode("overwrite").partitionBy("yearMonth").format("delta").save(yellowDeltaPath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} | |
import spark.implicits._ | |
val yellowSourcePath = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet" | |
val taxiZoneSourcePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone_lookup.csv" | |
val taxiZonePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone" | |
val taxiRatePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_rate_code" | |
val yellowDeltaPath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta" | |
val dateFormat = "yyyy-MM-dd HH:mm:ss" | |
// Define a schema that Spark understands. This is one of several ways to do it. | |
val taxiZoneSchema = StructType(Seq( | |
StructField("LocationID", IntegerType), | |
StructField("Borough", StringType), | |
StructField("Zone", StringType), | |
StructField("ServiceZone", StringType) | |
)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val testDF = spark.read.format("delta").load(yellowDeltaPath).limit(20) | |
testDF.select("VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment