Last active
June 6, 2023 02:53
-
-
Save vitouXY/5f3c5948a35815d22b4b0281092baec6 to your computer and use it in GitHub Desktop.
Simple Stock Prediction
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding:utf-8 -*- | |
# vim: set fileencoding=utf-8 : | |
# https://github.com/lussierc/StockSwingPredictor | |
# https://github.com/the-tyler/Stock_Price_Prediction_LSTM | |
# https://github.com/almutwakel/swing-trader | |
import sys | |
import numpy as np | |
import pandas as pd | |
INSTRUMENT = 'LTM' | |
print(f'* Get {INSTRUMENT.upper()} Data...') | |
import urllib.request | |
import json | |
try: | |
URL = f'https://www.bolsadesantiago.com/api/RV_Instrumentos/getPointHistGAT?nemo={INSTRUMENT.upper()}' | |
with urllib.request.urlopen(urllib.request.Request(URL, headers={'User-Agent': 'Mozilla/5.0 (Linux; Android X; xDevice) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36'})) as resp: | |
response = resp.read() | |
data = [] | |
for i in range(len(json.loads(response)['listaResult'])-1): | |
#data.append(json.loads(response)['listaResult'][i]['CLOSE']) | |
data.append(json.loads(response)['listaResult'][i]['LOW']) | |
except: | |
sys.exit(' Failed !') | |
""" | |
try: | |
URL = f'https://www.ccbolsa.cl/apps/script/modulos/Data/history.asp?symbol={INSTRUMENT.upper()}&resolution=D' | |
with urllib.request.urlopen(urllib.request.Request(URL, headers={'User-Agent': 'Mozilla/5.0 (Linux; Android X; xDevice) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36'})) as resp: | |
response = resp.read() | |
data = json.loads(response)['c'] | |
except: | |
sys.exit(' Failed !') | |
try: | |
INSTRUMENT_BTG = 'Ak0CDkEFGHkuLMkNO' | |
URL = f'https://mlback.btgpactual.cl/instruments/{INSTRUMENT_BTG}/historicalData?period=5Y' | |
with urllib.request.urlopen(urllib.request.Request(URL, headers={'User-Agent': 'Mozilla/5.0 (Linux; Android X; xDevice) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36'})) as resp: | |
response = resp.read() | |
data = [] | |
for i in range(len(json.loads(response)['chart'])): | |
data.append(json.loads(response)['chart'][i]['y']) | |
except: | |
sys.exit(' Failed !') | |
""" | |
""" | |
from datetime import datetime | |
from dateutil.relativedelta import relativedelta | |
start_date = (datetime.now() - relativedelta(years=1)).strftime('%Y-%m-%d') | |
end_date = datetime.now().strftime('%Y-%m-%d') | |
interval = '1d' | |
try: | |
import yfinance | |
# "5d", "1mo", "3mo", "6mo", "1y", "2y", "5y", "10y", "max", "ytd" | |
period = 'max' | |
data = yfinance.Ticker(f'{INSTRUMENT.upper()}.SN').history(period=period) | |
#data = yfinance.download(tickers=f'{INSTRUMENT.upper()}.SN', start=start_date, end=end_date, interval=interval) | |
#data = data['Close'].values | |
data = data['Low'].values | |
except: | |
sys.exit(' Failed !') | |
try: | |
from yahoo_fin import stock_info | |
data = stock_info.get_data(f'{INSTRUMENT.upper()}.SN', start_date=start_date, end_date=end_date, interval=interval) | |
##data = data.drop(['open', 'high', 'low', 'adjclose', 'ticker', 'volume'], axis=1) | |
data = data['close'].values | |
except: | |
sys.exit(' Failed !') | |
try: | |
sys.exit(' Get Data Failed !') | |
from pandas_datareader import pdata | |
data = pdata.get_data_yahoo(f'{INSTRUMENT.upper()}.SN', (datetime.now() - relativedelta(years=1)).strftime('%Y-%m-%d'),datetime.now().strftime('%Y-%m-%d')) | |
data = data['Close'].values | |
except: | |
sys.exit(' Failed !') | |
""" | |
#data = [1273.1, 1254.5, 1205.9, 1244.4, 1230.1, 1230.1, 1230.1, 1239, 1228.4, 1233.1, 1230.2, 1230.2, 1269.5, 1280.7, 1280.7, 1280.7, 1276.6, 1271.9, 1276.1, 1276.1, 1276.1, 1274.7, 1290.9, 1290.9, 1279.5, 1269.8, 1255.6, 1255.3, 1251.8, 1227.3, 1230, 1256] | |
df = pd.DataFrame(data, columns=['Price']) | |
df.index.name = 'Day' | |
df['Date'] = df.index | |
print(df.describe()) | |
#sys.exit() | |
print(' OK !') | |
import matplotlib | |
import matplotlib.pyplot as plt | |
#matplotlib.use('module://drawilleplot') | |
# Graficar los datos de entrenamiento y las predicciones | |
plt.style.use(style='ggplot') | |
#plt.figure(figsize=(15,5)) | |
#plt.figure(figsize=(20,9)) | |
############## | |
# Datos de entrenamiento | |
#x_train = np.array([0, 1, 2, 3, 4]).reshape(-1, 1) | |
#y_train = np.array([1220.59, 1259.00, 1259.09, 1259.09, 1258.50]) | |
dates2list = (df.copy().reset_index().index.tolist()) | |
x_train = np.reshape(dates2list, (len(dates2list),1)) | |
y_train = df['Price'].values | |
# Generar valores para predecir | |
#x_pred = np.array([5, 6, 7]).reshape(-1, 1) | |
#x_pred = np.array([0, 1, 2, 3, 4, 5, 6, 7]).reshape(-1, 1) | |
# | |
len_data = len(x_train) | |
dates2new = [] | |
#for i in range(1000): | |
for i in range(len_data-1+10): | |
#dates2new.append(i+len_data) | |
dates2new.append(i) | |
x_pred = np.reshape(dates2new, (len(dates2new),1)) | |
#plt.scatter(x_train, y_train, color='black', label='Original') | |
plt.plot(x_train, y_train, color='black', label='Original') | |
from sklearn.linear_model import LinearRegression | |
#from sklearn.linear_model import ElasticNet, Lasso, LinearRegression | |
from sklearn.svm import SVR | |
from sklearn.neighbors import KNeighborsRegressor | |
#from sklearn.neural_network import MLPRegressor | |
#from sklearn.discriminant_analysis import LinearDiscriminantAnalysis | |
#from sklearn.naive_bayes import GaussianNB | |
#from sklearn.tree import DecisionTreeClassifier | |
#from xgboost import XGBClassifier | |
models = [] | |
#models.append(('svr_lin', SVR(kernel="linear", C=1e3))) | |
#models.append(('svr_poly', SVR(kernel="poly", C=1e3, degree=2))) # +1y | |
models.append(('svr_rbf', SVR(kernel="rbf", C=1e3, degree=3, gamma="scale"))) # +1mo | |
models.append(('lr', LinearRegression())) | |
#models.append(('en', ElasticNet())) | |
#models.append(('lasso', Lasso())) | |
models.append(('knr', KNeighborsRegressor())) | |
#models.append(('neuralnet', MLPRegressor(max_iter=90000, solver='adam', hidden_layer_sizes=(4, 4, 4)))) | |
#models.append(('poly', SVR(kernel="poly", degree=5, C=1e-3, epsilon=0.1, gamma="scale") )) | |
#models.append(('rbf', SVR(kernel="rbf", gamma=1, C=100, epsilon=0.1) )) | |
#models.append(('lda', LinearDiscriminantAnalysis())) | |
#models.append(('cart', DecisionTreeClassifier())) | |
#models.append(('nb', GaussianNB())) | |
#models.append(('xgbc', XGBClassifier())) | |
for name, model in models: | |
print(f'* Try {name.upper()} Prediction...') | |
# Ajustar el modelo a los datos de entrenamiento | |
model.fit(x_train, y_train) | |
# Realizar predicciones | |
y_pred = model.predict(x_pred) | |
# Imprimir las predicciones | |
#print(y_pred) | |
# Incluir en el grafico | |
plt.plot(x_pred, y_pred, label=f'{name.upper()} Prediction') | |
print(' OK !') | |
############## | |
import pathlib | |
from sklearn.preprocessing import RobustScaler | |
from collections import deque | |
import tensorflow as tf | |
from keras.models import Sequential #, load_model | |
from keras.layers import Dense, LSTM, Dropout | |
print(f'* Try LSTM Prediction...') | |
# Scale the raw price data on a new column | |
scaler = RobustScaler() | |
df['scaled_price'] = scaler.fit_transform(np.expand_dims(df['Price'].values, axis=1)) | |
# Before we run the LSTM neural network, we have to make configurations on some hyperparameters | |
# Number of days back that the model will be trained for | |
NUMBER_of_STEPS_BACK = 30 | |
# Number of days that the model will predict. To predict the next three days, modify it as follows: [1,2,3] | |
PREDICTION_STEPS = [1,2,3,4] | |
# Number of training samples that will be passed to the network in one epoch | |
BATCH_SIZE = 16 | |
# Probability to exclude the input and recurrent connections to improve performance by regularization (25%) | |
DROPOUT = 0.25 | |
# Number of neurons connected to the layer | |
UNITS = 60 | |
# Number of times that the learning algorithm will work through the entire training set | |
EPOCHS = 10 | |
# Methodology to measure the inaccuracy | |
LOSS='mean_squared_error' | |
# Optimizer used to iterate to better states | |
OPTIMIZER='adam' | |
# Make Prediction | |
predictions = [] | |
for step in PREDICTION_STEPS: | |
# 'Prepare_Data' function puts the data set correct form for LSTM | |
dataframe = df.copy() | |
dataframe['future'] = dataframe['scaled_price'].shift(-step) | |
last_sequence = np.array(dataframe[['scaled_price']].tail(step)) | |
dataframe.dropna(inplace=True) | |
sequence_data = [] | |
sequences = deque(maxlen=NUMBER_of_STEPS_BACK) | |
for entry, target in zip(dataframe[['scaled_price','Date']].values, dataframe['future'].values): | |
sequences.append(entry) | |
if len(sequences) == NUMBER_of_STEPS_BACK: | |
sequence_data.append([np.array(sequences), target]) | |
last_sequence = list([s[:1] for s in sequences]) + list(last_sequence) | |
last_sequence = np.array(last_sequence).astype(np.float32) | |
# build X and Y training set | |
X, Y = [], [] | |
for seq, target in sequence_data: | |
X.append(seq) | |
Y.append(target) | |
# convert X and Y to numpy arrays for compatibility | |
X = np.array(X) | |
Y = np.array(Y) | |
last_sequence, x_train, y_train = last_sequence, X, Y | |
x_train = x_train[:, :, :1].astype(np.float32) | |
model_file = pathlib.Path('LSTM.model') | |
if not model_file.exists(): | |
# To train the 5 layer LSTM model with set hyperparameters, 'Train_Model' function is implemented | |
model = Sequential() | |
model.add(LSTM(UNITS, return_sequences=True, input_shape=(NUMBER_of_STEPS_BACK, 1))) | |
model.add(Dropout(DROPOUT)) | |
model.add(LSTM(UNITS, return_sequences=False)) | |
model.add(Dropout(DROPOUT)) | |
model.add(Dense(1)) # Makes sure that for each day, there is only one prediction | |
model.compile(loss=LOSS, optimizer=OPTIMIZER) | |
model.fit(x_train, y_train, epochs=EPOCHS, batch_size=BATCH_SIZE, verbose=1) | |
model.summary() | |
#model.save(model_file) | |
#else: | |
# model = load_model(model_file) | |
last_sequence = last_sequence[-NUMBER_of_STEPS_BACK:] | |
last_sequence = np.expand_dims(last_sequence, axis=0) | |
prediction = model.predict(last_sequence) | |
predicted_price = scaler.inverse_transform(prediction)[0][0] | |
predictions.append(round(float(predicted_price), 2)) | |
if len(predictions) > 0: | |
xy_prediction = pd.DataFrame(np.array(predictions), columns=['Price']) | |
xy_prediction_date = [] | |
for i in range(len(df['Price']), len(df['Price'])+len(predictions)): | |
xy_prediction_date.append(i) | |
xy_prediction['Date'] = xy_prediction_date | |
plt.plot(xy_prediction['Date'], xy_prediction['Price'], label='LSTM Prediction') | |
print(' OK !') | |
############## | |
print('* Graph...') | |
# Graficar | |
plt.xlabel('Days') | |
plt.ylabel('Price') | |
plt.title(f'BCS Prediction - {INSTRUMENT.upper()}') | |
#plt.legend([f'Price per {INSTRUMENT.upper()} share'],labelcolor='linecolor') | |
#plt.legend() | |
plt.legend(labelcolor='linecolor') | |
#plt.legend(facecolor='k', labelcolor='w') | |
plt.savefig(f'{INSTRUMENT.upper()}.png', format="png", bbox_inches='tight') | |
plt.show() | |
plt.clf() | |
plt.close() | |
print(' OK !') | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment