Skip to content

Instantly share code, notes, and snippets.

@dvannoy
Last active January 27, 2021 19:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvannoy/c43208fd928ec11bb6c558d311570dcb to your computer and use it in GitHub Desktop.
Save dvannoy/c43208fd928ec11bb6c558d311570dcb to your computer and use it in GitHub Desktop.
DataFrame zoneDF = spark.Read()
.Option("header","true")
.Schema(taxiZoneSchema)
.Csv(taxiZoneSourcePath);
zoneDF.Write().Format("delta").Mode("overwrite").Save(taxiZonePath);
zoneDF.Show();
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
var yellowSourcePath = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet";
var taxiZoneSourcePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone_lookup.csv";
var taxiZonePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone";
var taxiRatePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_rate_code";
var yellowDeltaPath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta";
var dateFormat = "yyyy-MM-dd HH:mm:ss";
// Define a schema that Spark understands. This is one of several ways to do it.
var taxiZoneSchema = new StructType(new[]
{
new StructField("LocationID", new IntegerType()),
new StructField("Borough", new StringType()),
new StructField("Zone", new StringType()),
new StructField("ServiceZone", new StringType()),
});
DataFrame inputDF = spark.Read()
.Option("inferSchema", "true")
.Parquet(yellowSourcePath);
// Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed.
// Option A
// var transformedDF = inputDF
// .WithColumn("yearMonth", RegexpReplace(Substring(Col("tpepPickupDatetime"),1,7), "-", "_"))
// .WithColumn("pickupDt", ToDate(Col("tpepPickupDatetime"), dateFormat))
// .WithColumn("dropoffDt", ToDate(Col("tpepDropoffDatetime"), dateFormat))
// .WithColumn("tipPct", Col("tipAmount") / Col("totalAmount"));
// Option B
var transformedDF = inputDF.SelectExpr(
"*",
"replace(left(tpepPickupDatetime, 7),\"-\",\"_\") as yearMonth",
$"to_date(tpepPickupDatetime, \"{dateFormat}\") as pickupDt",
$"to_date(tpepDropoffDatetime, \"{dateFormat}\") as dropoffDt",
$"tipAmount/totalAmount as tipPct");
DataFrame zoneDF = spark.Read().Format("delta").Load(taxiZonePath);
// Join to bring in Taxi Zone data
var tripDF = transformedDF
.Join(zoneDF, transformedDF["PULocationID"] == zoneDF["LocationID"], "left").Drop("LocationID")
.WithColumnRenamed("Burough", "PickupBurrough")
.WithColumnRenamed("Zone", "PickupZone")
.WithColumnRenamed("ServiceZone", "PickupServiceZone");
tripDF.Write().Mode("overwrite").PartitionBy("yearMonth").Format("delta").Save(yellowDeltaPath);
var testDF = spark.Read().Format("delta").Load(yellowDeltaPath).Limit(20);
testDF.Select("VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").Show();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment