Skip to content

Instantly share code, notes, and snippets.

@joestein
Created October 16, 2023 20:14
Show Gist options
  • Save joestein/e6ad819160242892b498a71f933930e3 to your computer and use it in GitHub Desktop.
Save joestein/e6ad819160242892b498a71f933930e3 to your computer and use it in GitHub Desktop.
pytorch lstm
import pandas as pd
import numpy as np
import math
#import matplotlib.pyplot as plt # Visualization
#import matplotlib.dates as mdates # Formatting dates
from sklearn.preprocessing import MinMaxScaler
import torch # Library for implementing Deep Neural Network
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
# Loading the Apple.Inc Stock Data
import yfinance as yf
from datetime import date, timedelta, datetime
def main():
print("MAIN")
end_date = date.today().strftime("%Y-%m-%d") #end date for our data retrieval will be current date
start_date = '1990-01-01' # Beginning date for our historical data retrieval
print("****************")
df = yf.download('AAPL', start=start_date, end=end_date)# Function used to fetch the data
print("AAPL downloaded")
#def data_plot(df):
# df_plot = df.copy()
# ncols = 2
# nrows = int(round(df_plot.shape[1] / ncols, 0))
# fig, ax = plt.subplots(nrows=nrows, ncols=ncols,
# sharex=True, figsize=(14, 7))
# for i, ax in enumerate(fig.axes):
# sns.lineplot(data=df_plot.iloc[:, i], ax=ax)
# ax.tick_params(axis="x", rotation=30, labelsize=10, length=0)
# ax.xaxis.set_major_locator(mdates.AutoDateLocator())
# fig.tight_layout()
# plt.show()
#data_plot(df)
print(df.head(5))
# Train-Test Split
# Setting 80 percent data for training
training_data_len = math.ceil(len(df) * .8)
print(training_data_len)
#Splitting the dataset
train_data = df[:training_data_len].iloc[:,:2]
test_data = df[training_data_len:].iloc[:,:2]
print("$$$$$$$$$$$$$$$$$$$$$$$$$$$")
print(train_data.shape, test_data.shape)
#print(train_data[0])
# Selecting Open Price values
print(train_data.head(5))
dataset_train = train_data["Open"] #.Open.values
# Reshaping 1D to 2D array
dataset_train = np.reshape(dataset_train, (-1,1))
# Selecting Open Price values
dataset_test = test_data["Open"]
# Reshaping 1D to 2D array
dataset_test = np.reshape(dataset_test, (-1,1))
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(0,1))
# scaling dataset
scaled_train = scaler.fit_transform(dataset_train)
print(scaled_train[:5])
# Normalizing values between 0 and 1
scaled_test = scaler.fit_transform(dataset_test)
print(*scaled_test[:5]) #prints the first 5 rows of scaled_test
# Create sequences and labels for training data
sequence_length = 50 # Number of time steps to look back
X_train, y_train = [], []
for i in range(len(scaled_train) - sequence_length):
X_train.append(scaled_train[i:i+sequence_length])
y_train.append(scaled_train[i+1:i+sequence_length+1])
X_train, y_train = np.array(X_train), np.array(y_train)
# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
# Create sequences and labels for testing data
sequence_length = 30 # Number of time steps to look back
X_test, y_test = [], []
for i in range(len(scaled_test) - sequence_length):
X_test.append(scaled_test[i:i+sequence_length])
y_test.append(scaled_test[i+1:i+sequence_length+1])
X_test, y_test = np.array(X_test), np.array(y_test)
# Convert data to PyTorch tensors
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
class LSTMModel(nn.Module):
# input_size : number of features in input at each time step
# hidden_size : Number of LSTM units
# num_layers : number of LSTM layers
def __init__(self, input_size, hidden_size, num_layers):
super(LSTMModel, self).__init__() #initializes the parent class nn.Module
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.linear = nn.Linear(hidden_size, 1)
def forward(self, x): # defines forward pass of the neural network
out, _ = self.lstm(x)
out = self.linear(out)
return out
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
input_size = 1
num_layers = 2
hidden_size = 64
output_size = 1
# Define the model, loss function, and optimizer
model = LSTMModel(input_size, hidden_size, num_layers).to(device)
loss_fn = torch.nn.MSELoss(reduction='mean')
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
print(model)
batch_size = 16
# Create DataLoader for batch training
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Create DataLoader for batch training
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
num_epochs = 50
train_hist =[]
test_hist =[]
# Training loop
for epoch in range(num_epochs):
total_loss = 0.0
# Training
model.train()
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
predictions = model(batch_X)
loss = loss_fn(predictions, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
# Calculate average training loss and accuracy
average_loss = total_loss / len(train_loader)
train_hist.append(average_loss)
# Validation on test data
model.eval()
with torch.no_grad():
total_test_loss = 0.0
for batch_X_test, batch_y_test in test_loader:
batch_X_test, batch_y_test = batch_X_test.to(device), batch_y_test.to(device)
predictions_test = model(batch_X_test)
test_loss = loss_fn(predictions_test, batch_y_test)
total_test_loss += test_loss.item()
# Calculate average test loss and accuracy
average_test_loss = total_test_loss / len(test_loader)
test_hist.append(average_test_loss)
if (epoch+1)%10==0:
print(f'Epoch [{epoch+1}/{num_epochs}] - Training Loss: {average_loss:.4f}, Test Loss: {average_test_loss:.4f}')
#x = np.linspace(1,num_epochs,num_epochs)
#plt.plot(x,train_hist,scalex=True, label="Training loss")
#plt.plot(x, test_hist, label="Test loss")
#plt.legend()
#plt.show()
# Define the number of future time steps to forecast
num_forecast_steps = 30
# Convert to NumPy and remove singleton dimensions
sequence_to_plot = X_test.squeeze().cpu().numpy()
# Use the last 30 data points as the starting point
historical_data = sequence_to_plot[-1]
#print(historical_data.shape)
# Initialize a list to store the forecasted values
forecasted_values = []
# Use the trained model to forecast future values
with torch.no_grad():
for _ in range(num_forecast_steps*2):
# Prepare the historical_data tensor
historical_data_tensor = torch.as_tensor(historical_data).view(1, -1, 1).float().to(device)
# Use the model to predict the next value
predicted_value = model(historical_data_tensor).cpu().numpy()[0, 0]
# Append the predicted value to the forecasted_values list
forecasted_values.append(predicted_value[0])
# Update the historical_data sequence by removing the oldest value and adding the predicted value
historical_data = np.roll(historical_data, shift=-1)
historical_data[-1] = predicted_value
# Generate futute dates
last_date = test_data.index[-1]
# Generate the next 30 dates
future_dates = pd.date_range(start=last_date + pd.DateOffset(1), periods=30)
# Concatenate the original index with the future dates
combined_index = test_data.index.append(future_dates)
#set the size of the plot
#plt.rcParams['figure.figsize'] = [14, 4]
#Test data
#plt.plot(test_data.index[-100:-30], test_data.Open[-100:-30], label = "test_data", color = "b")
#reverse the scaling transformation
original_cases = scaler.inverse_transform(np.expand_dims(sequence_to_plot[-1], axis=0)).flatten()
#the historical data used as input for forecasting
#plt.plot(test_data.index[-30:], original_cases, label='actual values', color='green')
#Forecasted Values
#reverse the scaling transformation
forecasted_cases = scaler.inverse_transform(np.expand_dims(forecasted_values, axis=0)).flatten()
# plotting the forecasted values
#plt.plot(combined_index[-60:], forecasted_cases, label='forecasted values', color='red')
print(forecasted_cases)
#plt.xlabel('Time Step')
#plt.ylabel('Value')
#plt.legend()
#plt.title('Time Series Forecasting')
#plt.grid(True)
#plt.show()
if __name__ == "__main__":
main()
print('Finished Forecasting')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment