This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import col, desc, regexp_replace, substring, to_date, from_json, explode, expr | |
from pyspark.sql.types import StructType, StringType | |
yellow_source_path = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet" | |
taxi_zone_source_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone_lookup.csv" | |
taxi_zone_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone" | |
taxi_rate_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_rate_code" | |
yellow_delta_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta" | |
date_format = "yyyy-MM-dd HH:mm:ss" | |
# Define a schema that Spark understands. This is one of several ways to do it. | |
taxi_zone_schema = ( | |
StructType() | |
.add('LocationID', 'integer') | |
.add('Borough', 'string') | |
.add('Zone', 'string') | |
.add('ServiceZone', 'string') | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
zone_df = ( | |
spark.read | |
.option("header","true") | |
.schema(taxi_zone_schema) | |
.csv(taxi_zone_source_path) | |
) | |
zone_df.write.format("delta").mode("overwrite").save(taxi_zone_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input_df = spark.read.parquet(yellow_source_path) | |
# Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed. | |
# Option A | |
# transformed_df = ( | |
# input_df | |
# .withColumn("yearMonth", regexp_replace(substring("tpepPickupDatetime",1,7), '-', '_')) | |
# .withColumn("pickupDt", to_date("tpepPickupDatetime", date_format)) | |
# .withColumn("dropoffDt", to_date("tpepDropoffDatetime", date_format)) | |
# .withColumn("tipPct", col("tipAmount") / col("totalAmount")) | |
# ) | |
# Option B | |
transformed_df = input_df.selectExpr( | |
"*", | |
"replace(left(tpepPickupDatetime, 7),'-','_') as yearMonth", | |
f"to_date(tpepPickupDatetime, '{date_format}') as pickupDt", | |
f"to_date(tpepDropoffDatetime, '{date_format}') as dropoffDt", | |
f"tipAmount/totalAmount as tipPct") | |
zone_df = spark.read.format("delta").load(taxi_zone_path) | |
# Join to bring in Taxi Zone data | |
trip_df = ( | |
transformed_df | |
.join(zone_df, transformed_df["PULocationID"] == zone_df["LocationID"], how="left").drop("LocationID") | |
.withColumnRenamed("Burough", "PickupBurrough") | |
.withColumnRenamed("Zone", "PickupZone") | |
.withColumnRenamed("ServiceZone", "PickupServiceZone") | |
) | |
trip_df.write.mode("overwrite").partitionBy("yearMonth").format("delta").save(yellow_delta_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
test_df = spark.read.format("delta").load(yellow_delta_path).limit(20) | |
test_df.select("VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment