Skip to content

Instantly share code, notes, and snippets.

@AlexanderNeutel
Last active June 13, 2023 20:47
Show Gist options
  • Save AlexanderNeutel/acf285b345ea516fb1257893db64b198 to your computer and use it in GitHub Desktop.
Save AlexanderNeutel/acf285b345ea516fb1257893db64b198 to your computer and use it in GitHub Desktop.
import json
import requests
import pandas as pd
import numpy as np
import pandas_ta as ta
import yfinance as yf
from ubiops import ApiClient, CoreApi, Configuration
from ubiops.utils import upload_file, download_file
from pandas_datareader import data as pdr
from skimage.restoration import denoise_wavelet
from datetime import datetime, timedelta
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adagrad
class Deployment():
def __init__(self, context, base_directory):
#Prep model structure
self.nr_features = 6
self.sequence_length = 10
self.nr_predictions = 1
self.model = Sequential()
self.model.add(LSTM(150, input_shape=(self.sequence_length,self.nr_features)))
self.model.add(Dense(self.nr_predictions))
self.model.compile(loss='mse', optimizer=Adagrad(learning_rate=0.01))
#Import the model from the bucket
UBIOPS_API_TOKEN = os.environ["UBIOPS_API_TOKEN"]
print(type(UBIOPS_API_TOKEN))
UBIOPS_API_TOKEN = str(UBIOPS_API_TOKEN)
client_prod= ApiClient(Configuration(api_key={'Authorization': UBIOPS_API_TOKEN}))
api_instance = CoreApi(api_client=client_prod)
print(f"Service status", api_instance.service_status())
print(os.environ["bucket_name"])
print(type(os.environ["bucket_name"]))
download_file(
client = client_prod,
project_name = context["project"],
bucket_name = os.environ["bucket_name"],
file_name = os.environ["model_path"]
)
self.model.load_weights('S&P500_model.h5')
def request(self, data, context):
#Read data from FED
url = "https://markets.newyorkfed.org/api/rates/unsecured/effr/last/999.json"
response = requests.get(url)
data = json.loads(response.content)
#Bring to dataframe format
effr_df = pd.json_normalize(data['refRates'])
effr_df = effr_df.set_index('effectiveDate')
effr_df = pd.DataFrame({'EFFR': effr_df['percentRate']})
#Override yahoofinance API
yf.pdr_override()
#Take a sufficiently large window so that moving averages (MACD) can be calculated
start_date = (datetime.now() - timedelta(days = 120)).strftime('%Y-%m-%d')
end_date = datetime.now().strftime('%Y-%m-%d')
#Get SP500 data
sp500 = pdr.get_data_yahoo("SPY", start_date, end_date)
#Set datatype of index to object (as in effr) in prep for merge
sp500.index = sp500.index.strftime('%Y-%m-%d').astype('object')
#Calculate technical indicators using pandas_ta
sp500['MACD'] = sp500.ta.macd()['MACD_12_26_9']
sp500['ATR'] = sp500.ta.atr()
sp500['RSI'] = sp500.ta.rsi()
#Denoise 'Close' using the Haar wavelet
denoised_close = denoise_wavelet(sp500['Adj Close'].values, method='VisuShrink', mode='soft', wavelet='haar')
sp500['Denoised_Close'] = denoised_close
sp500_fin = pd.merge(sp500, effr_df, left_index = True, right_index = True)
# Bring data to format model was trained for
data = sp500_fin[['Open', 'Denoised_Close', 'EFFR', 'USDX', 'MACD', 'ATR', 'RSI']].tail(11)
#Apply a scaler transform
scaler = StandardScaler()
data = scaler.fit_transform(data)
X_test, y_test = self.create_sequences(data, self.sequence_length)
prediction = self.model.predict(X_test)
#Apply inverse transform
prediction = np.c_[prediction, np.zeros(len(prediction)), np.zeros(len(prediction)), np.zeros(len(prediction)), np.zeros(len(prediction)), np.zeros(len(prediction))]
prediction = scaler.inverse_transform(prediction)
prediction = prediction[:,1][0]
return {'prediction' : prediction}
@staticmethod
def create_sequences(data, sequence_length):
'''Create timeseries that serve as input for the LSTM model'''
X = []
y = []
for ii in range(sequence_length, len(data)):
X.append(data[ii-sequence_length:ii, :])
y.append(data[ii,0])
return np.array(X), np.array(y)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment