Created October 16, 2023 20:14
pytorch lstm
import pandas as pd
import numpy as np
import math
#import matplotlib.pyplot as plt # Visualization
#import matplotlib.dates as mdates # Formatting dates
from sklearn.preprocessing import MinMaxScaler
import torch # Library for implementing Deep Neural Network
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from import Dataset, DataLoader
# Loading the Apple.Inc Stock Data
import yfinance as yf
from datetime import date, timedelta, datetime
def main():
end_date ="%Y-%m-%d") #end date for our data retrieval will be current date
start_date = '1990-01-01' # Beginning date for our historical data retrieval
df ='AAPL', start=start_date, end=end_date)# Function used to fetch the data
print("AAPL downloaded")
#def data_plot(df):
# df_plot = df.copy()
# ncols = 2
# nrows = int(round(df_plot.shape[1] / ncols, 0))
# fig, ax = plt.subplots(nrows=nrows, ncols=ncols,
# sharex=True, figsize=(14, 7))
# for i, ax in enumerate(fig.axes):
# sns.lineplot(data=df_plot.iloc[:, i], ax=ax)
# ax.tick_params(axis="x", rotation=30, labelsize=10, length=0)
# ax.xaxis.set_major_locator(mdates.AutoDateLocator())
# fig.tight_layout()
# Train-Test Split
# Setting 80 percent data for training
training_data_len = math.ceil(len(df) * .8)
#Splitting the dataset
train_data = df[:training_data_len].iloc[:,:2]
test_data = df[training_data_len:].iloc[:,:2]
print(train_data.shape, test_data.shape)
# Selecting Open Price values
dataset_train = train_data["Open"] #.Open.values
# Reshaping 1D to 2D array
dataset_train = np.reshape(dataset_train, (-1,1))
# Selecting Open Price values
dataset_test = test_data["Open"]
# Reshaping 1D to 2D array
dataset_test = np.reshape(dataset_test, (-1,1))
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(0,1))
# scaling dataset
scaled_train = scaler.fit_transform(dataset_train)
# Normalizing values between 0 and 1
scaled_test = scaler.fit_transform(dataset_test)
print(*scaled_test[:5]) #prints the first 5 rows of scaled_test
# Create sequences and labels for training data
sequence_length = 50 # Number of time steps to look back
X_train, y_train = [], []
for i in range(len(scaled_train) - sequence_length):
X_train, y_train = np.array(X_train), np.array(y_train)
# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
# Create sequences and labels for testing data
sequence_length = 30 # Number of time steps to look back
X_test, y_test = [], []
for i in range(len(scaled_test) - sequence_length):
X_test, y_test = np.array(X_test), np.array(y_test)
# Convert data to PyTorch tensors
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
class LSTMModel(nn.Module):
# input_size : number of features in input at each time step
# hidden_size : Number of LSTM units
# num_layers : number of LSTM layers
def __init__(self, input_size, hidden_size, num_layers):
super(LSTMModel, self).__init__() #initializes the parent class nn.Module
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.linear = nn.Linear(hidden_size, 1)
def forward(self, x): # defines forward pass of the neural network
out, _ = self.lstm(x)
out = self.linear(out)
return out
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_size = 1
num_layers = 2
hidden_size = 64
output_size = 1
# Define the model, loss function, and optimizer
model = LSTMModel(input_size, hidden_size, num_layers).to(device)
loss_fn = torch.nn.MSELoss(reduction='mean')
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
batch_size = 16
# Create DataLoader for batch training
train_dataset =, y_train)
train_loader =, batch_size=batch_size, shuffle=True)
# Create DataLoader for batch training
test_dataset =, y_test)
test_loader =, batch_size=batch_size, shuffle=False)
num_epochs = 50
train_hist =[]
test_hist =[]
# Training loop
for epoch in range(num_epochs):
total_loss = 0.0
# Training
for batch_X, batch_y in train_loader:
batch_X, batch_y =,
predictions = model(batch_X)
loss = loss_fn(predictions, batch_y)
total_loss += loss.item()
# Calculate average training loss and accuracy
average_loss = total_loss / len(train_loader)
# Validation on test data
with torch.no_grad():
total_test_loss = 0.0
for batch_X_test, batch_y_test in test_loader:
batch_X_test, batch_y_test =,
predictions_test = model(batch_X_test)
test_loss = loss_fn(predictions_test, batch_y_test)
total_test_loss += test_loss.item()
# Calculate average test loss and accuracy
average_test_loss = total_test_loss / len(test_loader)
if (epoch+1)%10==0:
print(f'Epoch [{epoch+1}/{num_epochs}] - Training Loss: {average_loss:.4f}, Test Loss: {average_test_loss:.4f}')
#x = np.linspace(1,num_epochs,num_epochs)
#plt.plot(x,train_hist,scalex=True, label="Training loss")
#plt.plot(x, test_hist, label="Test loss")
# Define the number of future time steps to forecast
num_forecast_steps = 30
# Convert to NumPy and remove singleton dimensions
sequence_to_plot = X_test.squeeze().cpu().numpy()
# Use the last 30 data points as the starting point
historical_data = sequence_to_plot[-1]
# Initialize a list to store the forecasted values
forecasted_values = []
# Use the trained model to forecast future values
with torch.no_grad():
for _ in range(num_forecast_steps*2):
# Prepare the historical_data tensor
historical_data_tensor = torch.as_tensor(historical_data).view(1, -1, 1).float().to(device)
# Use the model to predict the next value
predicted_value = model(historical_data_tensor).cpu().numpy()[0, 0]
# Append the predicted value to the forecasted_values list
# Update the historical_data sequence by removing the oldest value and adding the predicted value
historical_data = np.roll(historical_data, shift=-1)
historical_data[-1] = predicted_value
# Generate futute dates
last_date = test_data.index[-1]
# Generate the next 30 dates
future_dates = pd.date_range(start=last_date + pd.DateOffset(1), periods=30)
# Concatenate the original index with the future dates
combined_index = test_data.index.append(future_dates)
#set the size of the plot
#plt.rcParams['figure.figsize'] = [14, 4]
#Test data
#plt.plot(test_data.index[-100:-30], test_data.Open[-100:-30], label = "test_data", color = "b")
#reverse the scaling transformation
original_cases = scaler.inverse_transform(np.expand_dims(sequence_to_plot[-1], axis=0)).flatten()
#the historical data used as input for forecasting
#plt.plot(test_data.index[-30:], original_cases, label='actual values', color='green')
#Forecasted Values
#reverse the scaling transformation
forecasted_cases = scaler.inverse_transform(np.expand_dims(forecasted_values, axis=0)).flatten()
# plotting the forecasted values
#plt.plot(combined_index[-60:], forecasted_cases, label='forecasted values', color='red')
#plt.xlabel('Time Step')
#plt.title('Time Series Forecasting')
if __name__ == "__main__":
print('Finished Forecasting')
