from pyspark.sql.functions import col, desc, regexp_replace, substring, to_date, from_json, explode, expr | |
from pyspark.sql.types import StructType, StringType | |
yellow_source_path = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet" | |
taxi_zone_source_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone_lookup.csv" | |
taxi_zone_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone" | |
taxi_rate_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_rate_code" | |
yellow_delta_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta" | |
date_format = "yyyy-MM-dd HH:mm:ss" | |
# Define a schema that Spark understands. This is one of several ways to do it. | |
taxi_zone_schema = ( | |
StructType() | |
.add('LocationID', 'integer') | |
.add('Borough', 'string') | |
.add('Zone', 'string') | |
.add('ServiceZone', 'string') | |
) |
zone_df = ( | |
spark.read | |
.option("header","true") | |
.schema(taxi_zone_schema) | |
.csv(taxi_zone_source_path) | |
) | |
zone_df.write.format("delta").mode("overwrite").save(taxi_zone_path) |
input_df = spark.read.parquet(yellow_source_path) | |
# Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed. | |
# Option A | |
# transformed_df = ( | |
# input_df | |
# .withColumn("yearMonth", regexp_replace(substring("tpepPickupDatetime",1,7), '-', '_')) | |
# .withColumn("pickupDt", to_date("tpepPickupDatetime", date_format)) | |
# .withColumn("dropoffDt", to_date("tpepDropoffDatetime", date_format)) | |
# .withColumn("tipPct", col("tipAmount") / col("totalAmount")) | |
# ) | |
# Option B | |
transformed_df = input_df.selectExpr( | |
"*", | |
"replace(left(tpepPickupDatetime, 7),'-','_') as yearMonth", | |
f"to_date(tpepPickupDatetime, '{date_format}') as pickupDt", | |
f"to_date(tpepDropoffDatetime, '{date_format}') as dropoffDt", | |
f"tipAmount/totalAmount as tipPct") | |
zone_df = spark.read.format("delta").load(taxi_zone_path) | |
# Join to bring in Taxi Zone data | |
trip_df = ( | |
transformed_df | |
.join(zone_df, transformed_df["PULocationID"] == zone_df["LocationID"], how="left").drop("LocationID") | |
.withColumnRenamed("Burough", "PickupBurrough") | |
.withColumnRenamed("Zone", "PickupZone") | |
.withColumnRenamed("ServiceZone", "PickupServiceZone") | |
) | |
trip_df.write.mode("overwrite").partitionBy("yearMonth").format("delta").save(yellow_delta_path) |
test_df = spark.read.format("delta").load(yellow_delta_path).limit(20) | |
test_df.select("VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment