Created
December 11, 2022 18:50
-
-
Save GrovesD2/76ba3f16abdbf1c4a548a1e597b3ba0d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This code is a demonstration to show how you can accidentally slip in future | |
results to a time-series predicting neural network. | |
DO NOT USE THIS CODE FOR MAKING PREDICTIONS, IT'S FAULTY ON PURPOSE. | |
''' | |
import numba as nb | |
import numpy as np | |
import pandas as pd | |
import yfinance as yf | |
from typing import Tuple | |
# NN imports | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.models import Sequential | |
from tensorflow.keras.layers import Dense | |
# Imports for evaluating the network | |
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay | |
# Global variables | |
EPOCHS = 10 | |
BATCH_SIZE = 8 | |
VALIDATION = 0.1 | |
LEARN_RATE = 1e-3 | |
TRAIN_SPLIT = 0.8 | |
FEAT_LENGTH = 25 | |
FEAT_COLS = ['Open', 'Low', 'High', 'Close', 'Volume'] | |
TICKERS = ['SPY', 'TSLA', 'AAPL', 'GOOG', 'AMZN'] | |
def time_series( | |
df: pd.DataFrame, | |
col: str, | |
name: str, | |
) -> pd.DataFrame: | |
''' | |
Form the lagged columns for this feature | |
''' | |
return df.assign(**{ | |
f'{name}_t-{lag}': col.shift(lag) | |
for lag in range(0, FEAT_LENGTH) | |
}) | |
def get_lagged_returns(df: pd.DataFrame) -> pd.DataFrame: | |
''' | |
For each of the feature cols, find the returns and then form the lagged | |
time-series as new columns | |
''' | |
for col in FEAT_COLS: | |
return_col = df[col]/df[col].shift(1)-1 | |
df = time_series(df, return_col, f'feat_{col}_ret') | |
return df | |
def get_classification(df: pd.DataFrame) -> pd.DataFrame: | |
''' | |
Get the classifications for the NN, which are as follows: | |
0 = The closing price goes down tomorrow | |
1 = The closing price goes up tomorrow | |
''' | |
df['close_tomorrow'] = df['Close'].shift(-1) | |
conditions = [ | |
df['close_tomorrow'] <= df['Close'], | |
df['close_tomorrow'] > df['Close'], | |
] | |
df['classification'] = np.select( | |
condlist = conditions, | |
choicelist = [0, 1], | |
) | |
return df | |
@nb.jit(nopython = True) | |
def explicit_heat_smooth(vals: np.array, t_end: float) -> np.array: | |
k = 0.1 # Time spacing | |
# Set up the initial condition | |
P = vals | |
t = 0 | |
while t < t_end: | |
P = k*(P[2:] + P[:-2]) + P[1:-1]*(1-2*k) | |
P = np.hstack(( | |
np.array([vals[0]]), | |
P, | |
np.array([vals[-1]]), | |
)) | |
t += k | |
return P | |
def apply_smoothening(df: pd.DataFrame) -> pd.DataFrame: | |
''' | |
Apply the time series smoothening to all feature columns | |
''' | |
for feat in FEAT_COLS: | |
df.loc[:, feat] = explicit_heat_smooth( | |
vals = df[feat].values.astype(np.float64), | |
t_end = 5, | |
) | |
return df | |
def get_nn_data() -> Tuple[np.array, np.array, np.array, np.array]: | |
''' | |
For all tickers, deduce the NN features and classifications, and then save | |
the outputs as four numpy arrays (x_train, y_train, x_test, y_test) | |
''' | |
dfs = [] | |
for ticker in TICKERS: | |
df = yf.download(ticker).reset_index() | |
df = apply_smoothening(df) | |
df = get_lagged_returns(df) | |
df = get_classification(df) | |
# We may end up with some divisions by 0 when calculating the returns | |
# so to prevent any rows with this slipping in, we replace any infs | |
# with nan values and remove all rows with nan values in them | |
dfs.append( | |
df | |
.replace([np.inf, -np.inf], np.nan) | |
.dropna() | |
[[col for col in df.columns if 'feat_' in col] + ['classification']] | |
) | |
nn_values = pd.concat(dfs).values | |
# Shuffle the values to ensure the NN does not learn an order | |
np.random.shuffle(nn_values) | |
# Split into training and test data | |
split_idx = int(TRAIN_SPLIT*nn_values.shape[0]) | |
return ( | |
nn_values[0:split_idx, :-1], # x_train | |
nn_values[0:split_idx:, -1], # y_train | |
nn_values[split_idx:, :-1], # x_test | |
nn_values[split_idx:, -1], # y_test | |
) | |
def get_model(x_train: np.array) -> Sequential: | |
''' | |
Generate the NN model that we are going to train | |
''' | |
return Sequential([ | |
Dense(128, input_shape = (x_train.shape[1], )), | |
Dense(64), | |
Dense(64), | |
Dense(1, activation = 'sigmoid'), | |
]) | |
def evaluate_training( | |
model: Sequential, | |
x_test: np.array, | |
y_test: np.array | |
): | |
''' | |
Produce confusion matrices to evaluate the training on the testing data. | |
''' | |
score = model.evaluate( | |
x_test, | |
y_test, | |
verbose = 0, | |
) | |
print("Test loss:", score[0]) | |
print("Test accuracy:", score[1]) | |
pred = model.predict(x_test) | |
pred[pred >= 0.5] = 1 | |
pred[pred < 0.5] = 0 | |
cm = confusion_matrix( | |
y_true = y_test, | |
y_pred = pred, | |
) | |
# The scaled confusion matrix gives a view where each column is scaled | |
# by the total sum of elements in that column | |
cm_scaled = cm/cm.astype(np.float).sum(axis = 0) | |
unscaled = ConfusionMatrixDisplay(confusion_matrix = cm) | |
unscaled.plot() | |
unscaled.ax_.set_title('Unscaled confusion matrix') | |
scaled = ConfusionMatrixDisplay(confusion_matrix = cm_scaled) | |
scaled.plot() | |
scaled.ax_.set_title('Scaled confusion matrix') | |
return | |
if __name__ == '__main__': | |
x_train, y_train, x_test, y_test = get_nn_data() | |
model = get_model(x_train) | |
model.compile( | |
loss = 'binary_crossentropy', | |
optimizer = Adam(learning_rate = LEARN_RATE), | |
metrics = ['accuracy'] | |
) | |
model.fit( | |
x_train, | |
y_train, | |
epochs = EPOCHS, | |
batch_size = BATCH_SIZE, | |
validation_split = VALIDATION, | |
) | |
evaluate_training(model, x_test, y_test) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment