Skip to content

Instantly share code, notes, and snippets.

@pdet
Created May 25, 2023 10:11
Show Gist options
  • Save pdet/e8d38734232c08e6c15aba79b4eb8368 to your computer and use it in GitHub Desktop.
Save pdet/e8d38734232c08e6c15aba79b4eb8368 to your computer and use it in GitHub Desktop.
Example of taxi fare prediction with PyArrow-DuckDB USF
# Files used in the example:
# !wget "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2016-01.parquet"
# !wget "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2016-02.parquet"
import duckdb
from duckdb.typing import *
import torch
import torch.nn as nn
import pyarrow as pa
class LinearRegression(nn.Module):
def __init__(self, input_dim, output_dim):
super(LinearRegression, self).__init__()
self.linear = nn.Linear(input_dim, output_dim)
def forward(self, distances):
out = self.linear(distances)
return out
class PredictCabFare:
def train_model(self, data, learning_rate=0.01, epochs=1000):
distances = data['trip_distance'].reshape(-1, 1)
fares = data['fare_amount'].reshape(-1, 1)
# Define the input and output dimensions
input_dim = 1
output_dim = 1
# Create a linear regression model instance
model = LinearRegression(input_dim, output_dim)
# Define the loss function
criterion = nn.MSELoss()
# Define the optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# Train the model
for epoch in range(epochs):
# Forward pass
y_pred = model(distances)
# Compute loss
loss = criterion(y_pred, fares)
# Backward pass and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Print progress
if (epoch+1) % 100 == 0:
print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, epochs, loss.item()))
# Return the trained model
return model
def predict(self, input):
with torch.no_grad():
y_pred = self.model(input)
return y_pred.numpy()
def __init__(self, data):
# Train the linear regression model
self.model = self.train_model(data)
def __call__(self, input):
# Convert the input to numpy so it can be fed to the model
tensor_list = [torch.from_numpy(chunk.to_numpy()).float() for chunk in input.chunks]
tensor = torch.stack(tensor_list, dim=1)
predicted = self.predict(tensor).flatten()
# Convert to pyarrow
schema = pa.schema([('predicted_value', pa.float32())])
batch = pa.record_batch([predicted], names=schema.names)
table = pa.Table.from_batches([batch])
return table
training_data = duckdb.execute("""
SELECT
trip_distance::FLOAT as trip_distance,
fare_amount::FLOAT as fare_amount
from 'yellow_tripdata_2016-01.parquet' limit 10000
""").torch()
con = duckdb.connect()
predict_fare = PredictCabFare(training_data)
con.create_function('predict_fare', predict_fare, ['DOUBLE'], 'FLOAT', type='arrow')
## Visualize the comparison between the predicted cab fares and the actual cab fares
import matplotlib.pyplot as plt
prediction = con.sql("""
SELECT
predict_fare(trip_distance) as predicted_fare,
fare_amount,
trip_distance
from 'yellow_tripdata_2016-02.parquet' LIMIT 100
""").df()
ax = prediction.plot(kind='scatter', x='trip_distance', y='predicted_fare', c='blue', alpha=0.5)
prediction.plot(kind='scatter', x='trip_distance', y='fare_amount', c='red', alpha=0.5, ax=ax)
# set the x-axis label
ax.set_xlabel('Trip Distance')
# set the y-axis label
ax.set_ylabel('Fare')
# set the plot title
ax.set_title('Predicted Fare vs Actual Fare by Trip Distance')
# show the plot
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment