Skip to content

Instantly share code, notes, and snippets.

@dvannoy
Last active February 18, 2021 05:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvannoy/870fdb6cdde7b072384a7bc8da85984e to your computer and use it in GitHub Desktop.
Save dvannoy/870fdb6cdde7b072384a7bc8da85984e to your computer and use it in GitHub Desktop.
from pyspark.sql.functions import col, desc, regexp_replace, substring, to_date, from_json, explode, expr
from pyspark.sql.types import StructType, StringType
yellow_source_path = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet"
taxi_zone_source_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone_lookup.csv"
taxi_zone_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone"
taxi_rate_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_rate_code"
yellow_delta_path = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta"
date_format = "yyyy-MM-dd HH:mm:ss"
# Define a schema that Spark understands. This is one of several ways to do it.
taxi_zone_schema = (
StructType()
.add('LocationID', 'integer')
.add('Borough', 'string')
.add('Zone', 'string')
.add('ServiceZone', 'string')
)
zone_df = (
spark.read
.option("header","true")
.schema(taxi_zone_schema)
.csv(taxi_zone_source_path)
)
zone_df.write.format("delta").mode("overwrite").save(taxi_zone_path)
input_df = spark.read.parquet(yellow_source_path)
# Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed.
# Option A
# transformed_df = (
# input_df
# .withColumn("yearMonth", regexp_replace(substring("tpepPickupDatetime",1,7), '-', '_'))
# .withColumn("pickupDt", to_date("tpepPickupDatetime", date_format))
# .withColumn("dropoffDt", to_date("tpepDropoffDatetime", date_format))
# .withColumn("tipPct", col("tipAmount") / col("totalAmount"))
# )
# Option B
transformed_df = input_df.selectExpr(
"*",
"replace(left(tpepPickupDatetime, 7),'-','_') as yearMonth",
f"to_date(tpepPickupDatetime, '{date_format}') as pickupDt",
f"to_date(tpepDropoffDatetime, '{date_format}') as dropoffDt",
f"tipAmount/totalAmount as tipPct")
zone_df = spark.read.format("delta").load(taxi_zone_path)
# Join to bring in Taxi Zone data
trip_df = (
transformed_df
.join(zone_df, transformed_df["PULocationID"] == zone_df["LocationID"], how="left").drop("LocationID")
.withColumnRenamed("Burough", "PickupBurrough")
.withColumnRenamed("Zone", "PickupZone")
.withColumnRenamed("ServiceZone", "PickupServiceZone")
)
trip_df.write.mode("overwrite").partitionBy("yearMonth").format("delta").save(yellow_delta_path)
test_df = spark.read.format("delta").load(yellow_delta_path).limit(20)
test_df.select("VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment