Skip to content

Instantly share code, notes, and snippets.

@joestein
Created December 5, 2023 20:23
Show Gist options
  • Save joestein/223b47d27246fbf548ace5079198285d to your computer and use it in GitHub Desktop.
Save joestein/223b47d27246fbf548ace5079198285d to your computer and use it in GitHub Desktop.
ml
import pandas as pd
import numpy as np
import math
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
import yfinance as yf
from datetime import date
# if we have a GPU then use it
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
symbol = "AAPL"
end_date = date.today().strftime("%Y-%m-%d") #end date for our data retrieval will be current date
start_date = '1990-01-01' # Beginning date for our historical data retrieval
df = yf.download(symbol, start=start_date, end=end_date)# Function used to fetch the data
print(f"{symbol} downloaded")
df['Symbol'] = symbol
#df['sma30'] = df.groupby("Symbol")["Close"].transform(lambda x: x.rolling(30).mean())
df.dropna(inplace=True)
# Train-Test Split
# Setting 80 percent data for training
training_data_len = math.ceil(len(df) * .8)
#Splitting the dataset
train_data = df[:training_data_len].iloc[:,:]
test_data = df[training_data_len:].iloc[:,:]
# Selecting Closing Price values
dataset_train = train_data["Close"]
X_train_data = train_data[["Close"]]#, "sma30"]]
# Reshaping 1D to 2D array
X_dataset_train = np.reshape(X_train_data, (-1,1))
dataset_train = np.reshape(dataset_train, (-1,1))
# Selecting Closing Price values
dataset_test = test_data["Close"]
X_dataset_test = test_data[["Close"]]#, "sma30"]]
# Reshaping 1D to 2D array
X_dataset_test = np.reshape(X_dataset_test, (-1,1))
dataset_test = np.reshape(dataset_test, (-1,1))
scaler = MinMaxScaler(feature_range=(0,1))
# scaling dataset
X_scaled_train = scaler.fit_transform(X_dataset_train)
scaled_train = scaler.fit_transform(dataset_train)
# Normalizing values between 0 and 1
X_scaled_test = scaler.fit_transform(X_dataset_test)
scaled_test = scaler.fit_transform(dataset_test)
# Create sequences and labels for training data
sequence_length = 10 # Number of time steps to look back
X_train, y_train = [], []
for i in range(len(scaled_train) - sequence_length):
X_train.append(X_scaled_train[i:i+sequence_length])
y_train.append(scaled_train[i+1:i+sequence_length+1])
#print(y_train)
#exit(0)
X_train, y_train = np.array(X_train), np.array(y_train)
# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
# Create sequences and labels for testing data
sequence_length = 10 # Number of time steps to look back
X_test, y_test = [], []
for i in range(len(scaled_test) - sequence_length):
X_test.append(X_scaled_test[i:i+sequence_length])
y_test.append(scaled_test[i+1:i+sequence_length+1])
X_test, y_test = np.array(X_test), np.array(y_test)
# Convert data to PyTorch tensors
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
class LSTMModel(nn.Module):
# input_size : number of features in input at each time step
# hidden_size : Number of LSTM units
# num_layers : number of LSTM layers
def __init__(self, input_size, hidden_size, num_layers):
super(LSTMModel, self).__init__() #initializes the parent class nn.Module
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.linear = nn.Linear(hidden_size, 1)
def forward(self, x): # defines forward pass of the neural network
out, _ = self.lstm(x)
out = self.linear(out)
return out
input_size = 1
num_layers = 2
hidden_size = 64
# Define the model, loss function, and optimizer
model = LSTMModel(input_size, hidden_size, num_layers).to(device)
loss_fn = torch.nn.MSELoss(reduction='mean')
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
print(model)
batch_size = 16
# Create DataLoader for batch training
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Create DataLoader for batch training
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
num_epochs = 50
train_hist =[]
test_hist =[]
# Training loop
for epoch in range(num_epochs):
total_loss = 0.0
# Training
model.train()
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
predictions = model(batch_X)
loss = loss_fn(predictions, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
# Calculate average training loss and accuracy
average_loss = total_loss / len(train_loader)
train_hist.append(average_loss)
# Validation on test data
model.eval()
with torch.no_grad():
total_test_loss = 0.0
for batch_X_test, batch_y_test in test_loader:
batch_X_test, batch_y_test = batch_X_test.to(device), batch_y_test.to(device)
predictions_test = model(batch_X_test)
test_loss = loss_fn(predictions_test, batch_y_test)
total_test_loss += test_loss.item()
# Calculate average test loss and accuracy
average_test_loss = total_test_loss / len(test_loader)
test_hist.append(average_test_loss)
if (epoch+1)%10==0:
print(f'Epoch [{epoch+1}/{num_epochs}] - Training Loss: {average_loss:.4f}, Test Loss: {average_test_loss:.4f}')
# Define the number of future time steps to forecast
num_forecast_steps = 30
#https://www.scaler.com/topics/numpy-squeeze/
# Convert to NumPy and remove singleton dimensions
sequence_to_plot = X_test.squeeze().cpu().numpy()
# Use the last 30 data points as the starting point
historical_data = sequence_to_plot[-1]
# Initialize a list to store the forecasted values
forecasted_values = []
# Use the trained model to forecast future values
with torch.no_grad():
for _ in range(num_forecast_steps*2):
# Prepare the historical_data tensor
historical_data_tensor = torch.as_tensor(historical_data).view(1, -1, 1).float().to(device)
# Use the model to predict the next value
predicted_value = model(historical_data_tensor).cpu().numpy()[0, 0]
# Append the predicted value to the forecasted_values list
forecasted_values.append(predicted_value[0])
# Update the historical_data sequence by removing the oldest value and adding the predicted value
historical_data = np.roll(historical_data, shift=-1)
historical_data[-1] = predicted_value
# Generate futute dates
last_date = test_data.index[-1]
# Generate the next 30 dates
future_dates = pd.date_range(start=last_date + pd.DateOffset(1), periods=30)
# Concatenate the original index with the future dates
combined_index = test_data.index.append(future_dates)
#reverse the scaling transformation
original_cases = scaler.inverse_transform(np.expand_dims(sequence_to_plot[-1], axis=0)).flatten()
#Forecasted Values
#reverse the scaling transformation
forecasted_cases = scaler.inverse_transform(np.expand_dims(forecasted_values, axis=0)).flatten()
print(forecasted_cases)
print('Finished Forecasting')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment