Created
October 16, 2023 20:14
-
-
Save joestein/e6ad819160242892b498a71f933930e3 to your computer and use it in GitHub Desktop.
pytorch lstm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
import math | |
#import matplotlib.pyplot as plt # Visualization | |
#import matplotlib.dates as mdates # Formatting dates | |
from sklearn.preprocessing import MinMaxScaler | |
import torch # Library for implementing Deep Neural Network | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import torch.optim as optim | |
from torch.utils.data import Dataset, DataLoader | |
# Loading the Apple.Inc Stock Data | |
import yfinance as yf | |
from datetime import date, timedelta, datetime | |
def main(): | |
print("MAIN") | |
end_date = date.today().strftime("%Y-%m-%d") #end date for our data retrieval will be current date | |
start_date = '1990-01-01' # Beginning date for our historical data retrieval | |
print("****************") | |
df = yf.download('AAPL', start=start_date, end=end_date)# Function used to fetch the data | |
print("AAPL downloaded") | |
#def data_plot(df): | |
# df_plot = df.copy() | |
# ncols = 2 | |
# nrows = int(round(df_plot.shape[1] / ncols, 0)) | |
# fig, ax = plt.subplots(nrows=nrows, ncols=ncols, | |
# sharex=True, figsize=(14, 7)) | |
# for i, ax in enumerate(fig.axes): | |
# sns.lineplot(data=df_plot.iloc[:, i], ax=ax) | |
# ax.tick_params(axis="x", rotation=30, labelsize=10, length=0) | |
# ax.xaxis.set_major_locator(mdates.AutoDateLocator()) | |
# fig.tight_layout() | |
# plt.show() | |
#data_plot(df) | |
print(df.head(5)) | |
# Train-Test Split | |
# Setting 80 percent data for training | |
training_data_len = math.ceil(len(df) * .8) | |
print(training_data_len) | |
#Splitting the dataset | |
train_data = df[:training_data_len].iloc[:,:2] | |
test_data = df[training_data_len:].iloc[:,:2] | |
print("$$$$$$$$$$$$$$$$$$$$$$$$$$$") | |
print(train_data.shape, test_data.shape) | |
#print(train_data[0]) | |
# Selecting Open Price values | |
print(train_data.head(5)) | |
dataset_train = train_data["Open"] #.Open.values | |
# Reshaping 1D to 2D array | |
dataset_train = np.reshape(dataset_train, (-1,1)) | |
# Selecting Open Price values | |
dataset_test = test_data["Open"] | |
# Reshaping 1D to 2D array | |
dataset_test = np.reshape(dataset_test, (-1,1)) | |
from sklearn.preprocessing import MinMaxScaler | |
scaler = MinMaxScaler(feature_range=(0,1)) | |
# scaling dataset | |
scaled_train = scaler.fit_transform(dataset_train) | |
print(scaled_train[:5]) | |
# Normalizing values between 0 and 1 | |
scaled_test = scaler.fit_transform(dataset_test) | |
print(*scaled_test[:5]) #prints the first 5 rows of scaled_test | |
# Create sequences and labels for training data | |
sequence_length = 50 # Number of time steps to look back | |
X_train, y_train = [], [] | |
for i in range(len(scaled_train) - sequence_length): | |
X_train.append(scaled_train[i:i+sequence_length]) | |
y_train.append(scaled_train[i+1:i+sequence_length+1]) | |
X_train, y_train = np.array(X_train), np.array(y_train) | |
# Convert data to PyTorch tensors | |
X_train = torch.tensor(X_train, dtype=torch.float32) | |
y_train = torch.tensor(y_train, dtype=torch.float32) | |
# Create sequences and labels for testing data | |
sequence_length = 30 # Number of time steps to look back | |
X_test, y_test = [], [] | |
for i in range(len(scaled_test) - sequence_length): | |
X_test.append(scaled_test[i:i+sequence_length]) | |
y_test.append(scaled_test[i+1:i+sequence_length+1]) | |
X_test, y_test = np.array(X_test), np.array(y_test) | |
# Convert data to PyTorch tensors | |
X_test = torch.tensor(X_test, dtype=torch.float32) | |
y_test = torch.tensor(y_test, dtype=torch.float32) | |
class LSTMModel(nn.Module): | |
# input_size : number of features in input at each time step | |
# hidden_size : Number of LSTM units | |
# num_layers : number of LSTM layers | |
def __init__(self, input_size, hidden_size, num_layers): | |
super(LSTMModel, self).__init__() #initializes the parent class nn.Module | |
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) | |
self.linear = nn.Linear(hidden_size, 1) | |
def forward(self, x): # defines forward pass of the neural network | |
out, _ = self.lstm(x) | |
out = self.linear(out) | |
return out | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
print(device) | |
input_size = 1 | |
num_layers = 2 | |
hidden_size = 64 | |
output_size = 1 | |
# Define the model, loss function, and optimizer | |
model = LSTMModel(input_size, hidden_size, num_layers).to(device) | |
loss_fn = torch.nn.MSELoss(reduction='mean') | |
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) | |
print(model) | |
batch_size = 16 | |
# Create DataLoader for batch training | |
train_dataset = torch.utils.data.TensorDataset(X_train, y_train) | |
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True) | |
# Create DataLoader for batch training | |
test_dataset = torch.utils.data.TensorDataset(X_test, y_test) | |
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False) | |
num_epochs = 50 | |
train_hist =[] | |
test_hist =[] | |
# Training loop | |
for epoch in range(num_epochs): | |
total_loss = 0.0 | |
# Training | |
model.train() | |
for batch_X, batch_y in train_loader: | |
batch_X, batch_y = batch_X.to(device), batch_y.to(device) | |
predictions = model(batch_X) | |
loss = loss_fn(predictions, batch_y) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
# Calculate average training loss and accuracy | |
average_loss = total_loss / len(train_loader) | |
train_hist.append(average_loss) | |
# Validation on test data | |
model.eval() | |
with torch.no_grad(): | |
total_test_loss = 0.0 | |
for batch_X_test, batch_y_test in test_loader: | |
batch_X_test, batch_y_test = batch_X_test.to(device), batch_y_test.to(device) | |
predictions_test = model(batch_X_test) | |
test_loss = loss_fn(predictions_test, batch_y_test) | |
total_test_loss += test_loss.item() | |
# Calculate average test loss and accuracy | |
average_test_loss = total_test_loss / len(test_loader) | |
test_hist.append(average_test_loss) | |
if (epoch+1)%10==0: | |
print(f'Epoch [{epoch+1}/{num_epochs}] - Training Loss: {average_loss:.4f}, Test Loss: {average_test_loss:.4f}') | |
#x = np.linspace(1,num_epochs,num_epochs) | |
#plt.plot(x,train_hist,scalex=True, label="Training loss") | |
#plt.plot(x, test_hist, label="Test loss") | |
#plt.legend() | |
#plt.show() | |
# Define the number of future time steps to forecast | |
num_forecast_steps = 30 | |
# Convert to NumPy and remove singleton dimensions | |
sequence_to_plot = X_test.squeeze().cpu().numpy() | |
# Use the last 30 data points as the starting point | |
historical_data = sequence_to_plot[-1] | |
#print(historical_data.shape) | |
# Initialize a list to store the forecasted values | |
forecasted_values = [] | |
# Use the trained model to forecast future values | |
with torch.no_grad(): | |
for _ in range(num_forecast_steps*2): | |
# Prepare the historical_data tensor | |
historical_data_tensor = torch.as_tensor(historical_data).view(1, -1, 1).float().to(device) | |
# Use the model to predict the next value | |
predicted_value = model(historical_data_tensor).cpu().numpy()[0, 0] | |
# Append the predicted value to the forecasted_values list | |
forecasted_values.append(predicted_value[0]) | |
# Update the historical_data sequence by removing the oldest value and adding the predicted value | |
historical_data = np.roll(historical_data, shift=-1) | |
historical_data[-1] = predicted_value | |
# Generate futute dates | |
last_date = test_data.index[-1] | |
# Generate the next 30 dates | |
future_dates = pd.date_range(start=last_date + pd.DateOffset(1), periods=30) | |
# Concatenate the original index with the future dates | |
combined_index = test_data.index.append(future_dates) | |
#set the size of the plot | |
#plt.rcParams['figure.figsize'] = [14, 4] | |
#Test data | |
#plt.plot(test_data.index[-100:-30], test_data.Open[-100:-30], label = "test_data", color = "b") | |
#reverse the scaling transformation | |
original_cases = scaler.inverse_transform(np.expand_dims(sequence_to_plot[-1], axis=0)).flatten() | |
#the historical data used as input for forecasting | |
#plt.plot(test_data.index[-30:], original_cases, label='actual values', color='green') | |
#Forecasted Values | |
#reverse the scaling transformation | |
forecasted_cases = scaler.inverse_transform(np.expand_dims(forecasted_values, axis=0)).flatten() | |
# plotting the forecasted values | |
#plt.plot(combined_index[-60:], forecasted_cases, label='forecasted values', color='red') | |
print(forecasted_cases) | |
#plt.xlabel('Time Step') | |
#plt.ylabel('Value') | |
#plt.legend() | |
#plt.title('Time Series Forecasting') | |
#plt.grid(True) | |
#plt.show() | |
if __name__ == "__main__": | |
main() | |
print('Finished Forecasting') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment