Created
May 25, 2023 10:11
-
-
Save pdet/e8d38734232c08e6c15aba79b4eb8368 to your computer and use it in GitHub Desktop.
Example of taxi fare prediction with PyArrow-DuckDB USF
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Files used in the example: | |
# !wget "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2016-01.parquet" | |
# !wget "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2016-02.parquet" | |
import duckdb | |
from duckdb.typing import * | |
import torch | |
import torch.nn as nn | |
import pyarrow as pa | |
class LinearRegression(nn.Module): | |
def __init__(self, input_dim, output_dim): | |
super(LinearRegression, self).__init__() | |
self.linear = nn.Linear(input_dim, output_dim) | |
def forward(self, distances): | |
out = self.linear(distances) | |
return out | |
class PredictCabFare: | |
def train_model(self, data, learning_rate=0.01, epochs=1000): | |
distances = data['trip_distance'].reshape(-1, 1) | |
fares = data['fare_amount'].reshape(-1, 1) | |
# Define the input and output dimensions | |
input_dim = 1 | |
output_dim = 1 | |
# Create a linear regression model instance | |
model = LinearRegression(input_dim, output_dim) | |
# Define the loss function | |
criterion = nn.MSELoss() | |
# Define the optimizer | |
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) | |
# Train the model | |
for epoch in range(epochs): | |
# Forward pass | |
y_pred = model(distances) | |
# Compute loss | |
loss = criterion(y_pred, fares) | |
# Backward pass and optimize | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
# Print progress | |
if (epoch+1) % 100 == 0: | |
print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, epochs, loss.item())) | |
# Return the trained model | |
return model | |
def predict(self, input): | |
with torch.no_grad(): | |
y_pred = self.model(input) | |
return y_pred.numpy() | |
def __init__(self, data): | |
# Train the linear regression model | |
self.model = self.train_model(data) | |
def __call__(self, input): | |
# Convert the input to numpy so it can be fed to the model | |
tensor_list = [torch.from_numpy(chunk.to_numpy()).float() for chunk in input.chunks] | |
tensor = torch.stack(tensor_list, dim=1) | |
predicted = self.predict(tensor).flatten() | |
# Convert to pyarrow | |
schema = pa.schema([('predicted_value', pa.float32())]) | |
batch = pa.record_batch([predicted], names=schema.names) | |
table = pa.Table.from_batches([batch]) | |
return table | |
training_data = duckdb.execute(""" | |
SELECT | |
trip_distance::FLOAT as trip_distance, | |
fare_amount::FLOAT as fare_amount | |
from 'yellow_tripdata_2016-01.parquet' limit 10000 | |
""").torch() | |
con = duckdb.connect() | |
predict_fare = PredictCabFare(training_data) | |
con.create_function('predict_fare', predict_fare, ['DOUBLE'], 'FLOAT', type='arrow') | |
## Visualize the comparison between the predicted cab fares and the actual cab fares | |
import matplotlib.pyplot as plt | |
prediction = con.sql(""" | |
SELECT | |
predict_fare(trip_distance) as predicted_fare, | |
fare_amount, | |
trip_distance | |
from 'yellow_tripdata_2016-02.parquet' LIMIT 100 | |
""").df() | |
ax = prediction.plot(kind='scatter', x='trip_distance', y='predicted_fare', c='blue', alpha=0.5) | |
prediction.plot(kind='scatter', x='trip_distance', y='fare_amount', c='red', alpha=0.5, ax=ax) | |
# set the x-axis label | |
ax.set_xlabel('Trip Distance') | |
# set the y-axis label | |
ax.set_ylabel('Fare') | |
# set the plot title | |
ax.set_title('Predicted Fare vs Actual Fare by Trip Distance') | |
# show the plot | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment