Skip to content

Instantly share code, notes, and snippets.

@vitouXY
Last active June 6, 2023 02:53
Show Gist options
  • Save vitouXY/5f3c5948a35815d22b4b0281092baec6 to your computer and use it in GitHub Desktop.
Save vitouXY/5f3c5948a35815d22b4b0281092baec6 to your computer and use it in GitHub Desktop.
Simple Stock Prediction
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# vim: set fileencoding=utf-8 :
# https://github.com/lussierc/StockSwingPredictor
# https://github.com/the-tyler/Stock_Price_Prediction_LSTM
# https://github.com/almutwakel/swing-trader
import sys
import numpy as np
import pandas as pd
INSTRUMENT = 'LTM'
print(f'* Get {INSTRUMENT.upper()} Data...')
import urllib.request
import json
try:
URL = f'https://www.bolsadesantiago.com/api/RV_Instrumentos/getPointHistGAT?nemo={INSTRUMENT.upper()}'
with urllib.request.urlopen(urllib.request.Request(URL, headers={'User-Agent': 'Mozilla/5.0 (Linux; Android X; xDevice) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36'})) as resp:
response = resp.read()
data = []
for i in range(len(json.loads(response)['listaResult'])-1):
#data.append(json.loads(response)['listaResult'][i]['CLOSE'])
data.append(json.loads(response)['listaResult'][i]['LOW'])
except:
sys.exit(' Failed !')
"""
try:
URL = f'https://www.ccbolsa.cl/apps/script/modulos/Data/history.asp?symbol={INSTRUMENT.upper()}&resolution=D'
with urllib.request.urlopen(urllib.request.Request(URL, headers={'User-Agent': 'Mozilla/5.0 (Linux; Android X; xDevice) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36'})) as resp:
response = resp.read()
data = json.loads(response)['c']
except:
sys.exit(' Failed !')
try:
INSTRUMENT_BTG = 'Ak0CDkEFGHkuLMkNO'
URL = f'https://mlback.btgpactual.cl/instruments/{INSTRUMENT_BTG}/historicalData?period=5Y'
with urllib.request.urlopen(urllib.request.Request(URL, headers={'User-Agent': 'Mozilla/5.0 (Linux; Android X; xDevice) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36'})) as resp:
response = resp.read()
data = []
for i in range(len(json.loads(response)['chart'])):
data.append(json.loads(response)['chart'][i]['y'])
except:
sys.exit(' Failed !')
"""
"""
from datetime import datetime
from dateutil.relativedelta import relativedelta
start_date = (datetime.now() - relativedelta(years=1)).strftime('%Y-%m-%d')
end_date = datetime.now().strftime('%Y-%m-%d')
interval = '1d'
try:
import yfinance
# "5d", "1mo", "3mo", "6mo", "1y", "2y", "5y", "10y", "max", "ytd"
period = 'max'
data = yfinance.Ticker(f'{INSTRUMENT.upper()}.SN').history(period=period)
#data = yfinance.download(tickers=f'{INSTRUMENT.upper()}.SN', start=start_date, end=end_date, interval=interval)
#data = data['Close'].values
data = data['Low'].values
except:
sys.exit(' Failed !')
try:
from yahoo_fin import stock_info
data = stock_info.get_data(f'{INSTRUMENT.upper()}.SN', start_date=start_date, end_date=end_date, interval=interval)
##data = data.drop(['open', 'high', 'low', 'adjclose', 'ticker', 'volume'], axis=1)
data = data['close'].values
except:
sys.exit(' Failed !')
try:
sys.exit(' Get Data Failed !')
from pandas_datareader import pdata
data = pdata.get_data_yahoo(f'{INSTRUMENT.upper()}.SN', (datetime.now() - relativedelta(years=1)).strftime('%Y-%m-%d'),datetime.now().strftime('%Y-%m-%d'))
data = data['Close'].values
except:
sys.exit(' Failed !')
"""
#data = [1273.1, 1254.5, 1205.9, 1244.4, 1230.1, 1230.1, 1230.1, 1239, 1228.4, 1233.1, 1230.2, 1230.2, 1269.5, 1280.7, 1280.7, 1280.7, 1276.6, 1271.9, 1276.1, 1276.1, 1276.1, 1274.7, 1290.9, 1290.9, 1279.5, 1269.8, 1255.6, 1255.3, 1251.8, 1227.3, 1230, 1256]
df = pd.DataFrame(data, columns=['Price'])
df.index.name = 'Day'
df['Date'] = df.index
print(df.describe())
#sys.exit()
print(' OK !')
import matplotlib
import matplotlib.pyplot as plt
#matplotlib.use('module://drawilleplot')
# Graficar los datos de entrenamiento y las predicciones
plt.style.use(style='ggplot')
#plt.figure(figsize=(15,5))
#plt.figure(figsize=(20,9))
##############
# Datos de entrenamiento
#x_train = np.array([0, 1, 2, 3, 4]).reshape(-1, 1)
#y_train = np.array([1220.59, 1259.00, 1259.09, 1259.09, 1258.50])
dates2list = (df.copy().reset_index().index.tolist())
x_train = np.reshape(dates2list, (len(dates2list),1))
y_train = df['Price'].values
# Generar valores para predecir
#x_pred = np.array([5, 6, 7]).reshape(-1, 1)
#x_pred = np.array([0, 1, 2, 3, 4, 5, 6, 7]).reshape(-1, 1)
#
len_data = len(x_train)
dates2new = []
#for i in range(1000):
for i in range(len_data-1+10):
#dates2new.append(i+len_data)
dates2new.append(i)
x_pred = np.reshape(dates2new, (len(dates2new),1))
#plt.scatter(x_train, y_train, color='black', label='Original')
plt.plot(x_train, y_train, color='black', label='Original')
from sklearn.linear_model import LinearRegression
#from sklearn.linear_model import ElasticNet, Lasso, LinearRegression
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
#from sklearn.neural_network import MLPRegressor
#from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
#from sklearn.naive_bayes import GaussianNB
#from sklearn.tree import DecisionTreeClassifier
#from xgboost import XGBClassifier
models = []
#models.append(('svr_lin', SVR(kernel="linear", C=1e3)))
#models.append(('svr_poly', SVR(kernel="poly", C=1e3, degree=2))) # +1y
models.append(('svr_rbf', SVR(kernel="rbf", C=1e3, degree=3, gamma="scale"))) # +1mo
models.append(('lr', LinearRegression()))
#models.append(('en', ElasticNet()))
#models.append(('lasso', Lasso()))
models.append(('knr', KNeighborsRegressor()))
#models.append(('neuralnet', MLPRegressor(max_iter=90000, solver='adam', hidden_layer_sizes=(4, 4, 4))))
#models.append(('poly', SVR(kernel="poly", degree=5, C=1e-3, epsilon=0.1, gamma="scale") ))
#models.append(('rbf', SVR(kernel="rbf", gamma=1, C=100, epsilon=0.1) ))
#models.append(('lda', LinearDiscriminantAnalysis()))
#models.append(('cart', DecisionTreeClassifier()))
#models.append(('nb', GaussianNB()))
#models.append(('xgbc', XGBClassifier()))
for name, model in models:
print(f'* Try {name.upper()} Prediction...')
# Ajustar el modelo a los datos de entrenamiento
model.fit(x_train, y_train)
# Realizar predicciones
y_pred = model.predict(x_pred)
# Imprimir las predicciones
#print(y_pred)
# Incluir en el grafico
plt.plot(x_pred, y_pred, label=f'{name.upper()} Prediction')
print(' OK !')
##############
import pathlib
from sklearn.preprocessing import RobustScaler
from collections import deque
import tensorflow as tf
from keras.models import Sequential #, load_model
from keras.layers import Dense, LSTM, Dropout
print(f'* Try LSTM Prediction...')
# Scale the raw price data on a new column
scaler = RobustScaler()
df['scaled_price'] = scaler.fit_transform(np.expand_dims(df['Price'].values, axis=1))
# Before we run the LSTM neural network, we have to make configurations on some hyperparameters
# Number of days back that the model will be trained for
NUMBER_of_STEPS_BACK = 30
# Number of days that the model will predict. To predict the next three days, modify it as follows: [1,2,3]
PREDICTION_STEPS = [1,2,3,4]
# Number of training samples that will be passed to the network in one epoch
BATCH_SIZE = 16
# Probability to exclude the input and recurrent connections to improve performance by regularization (25%)
DROPOUT = 0.25
# Number of neurons connected to the layer
UNITS = 60
# Number of times that the learning algorithm will work through the entire training set
EPOCHS = 10
# Methodology to measure the inaccuracy
LOSS='mean_squared_error'
# Optimizer used to iterate to better states
OPTIMIZER='adam'
# Make Prediction
predictions = []
for step in PREDICTION_STEPS:
# 'Prepare_Data' function puts the data set correct form for LSTM
dataframe = df.copy()
dataframe['future'] = dataframe['scaled_price'].shift(-step)
last_sequence = np.array(dataframe[['scaled_price']].tail(step))
dataframe.dropna(inplace=True)
sequence_data = []
sequences = deque(maxlen=NUMBER_of_STEPS_BACK)
for entry, target in zip(dataframe[['scaled_price','Date']].values, dataframe['future'].values):
sequences.append(entry)
if len(sequences) == NUMBER_of_STEPS_BACK:
sequence_data.append([np.array(sequences), target])
last_sequence = list([s[:1] for s in sequences]) + list(last_sequence)
last_sequence = np.array(last_sequence).astype(np.float32)
# build X and Y training set
X, Y = [], []
for seq, target in sequence_data:
X.append(seq)
Y.append(target)
# convert X and Y to numpy arrays for compatibility
X = np.array(X)
Y = np.array(Y)
last_sequence, x_train, y_train = last_sequence, X, Y
x_train = x_train[:, :, :1].astype(np.float32)
model_file = pathlib.Path('LSTM.model')
if not model_file.exists():
# To train the 5 layer LSTM model with set hyperparameters, 'Train_Model' function is implemented
model = Sequential()
model.add(LSTM(UNITS, return_sequences=True, input_shape=(NUMBER_of_STEPS_BACK, 1)))
model.add(Dropout(DROPOUT))
model.add(LSTM(UNITS, return_sequences=False))
model.add(Dropout(DROPOUT))
model.add(Dense(1)) # Makes sure that for each day, there is only one prediction
model.compile(loss=LOSS, optimizer=OPTIMIZER)
model.fit(x_train, y_train, epochs=EPOCHS, batch_size=BATCH_SIZE, verbose=1)
model.summary()
#model.save(model_file)
#else:
# model = load_model(model_file)
last_sequence = last_sequence[-NUMBER_of_STEPS_BACK:]
last_sequence = np.expand_dims(last_sequence, axis=0)
prediction = model.predict(last_sequence)
predicted_price = scaler.inverse_transform(prediction)[0][0]
predictions.append(round(float(predicted_price), 2))
if len(predictions) > 0:
xy_prediction = pd.DataFrame(np.array(predictions), columns=['Price'])
xy_prediction_date = []
for i in range(len(df['Price']), len(df['Price'])+len(predictions)):
xy_prediction_date.append(i)
xy_prediction['Date'] = xy_prediction_date
plt.plot(xy_prediction['Date'], xy_prediction['Price'], label='LSTM Prediction')
print(' OK !')
##############
print('* Graph...')
# Graficar
plt.xlabel('Days')
plt.ylabel('Price')
plt.title(f'BCS Prediction - {INSTRUMENT.upper()}')
#plt.legend([f'Price per {INSTRUMENT.upper()} share'],labelcolor='linecolor')
#plt.legend()
plt.legend(labelcolor='linecolor')
#plt.legend(facecolor='k', labelcolor='w')
plt.savefig(f'{INSTRUMENT.upper()}.png', format="png", bbox_inches='tight')
plt.show()
plt.clf()
plt.close()
print(' OK !')
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment