-
-
Save AlexanderNeutel/acf285b345ea516fb1257893db64b198 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
import pandas as pd | |
import numpy as np | |
import pandas_ta as ta | |
import yfinance as yf | |
from ubiops import ApiClient, CoreApi, Configuration | |
from ubiops.utils import upload_file, download_file | |
from pandas_datareader import data as pdr | |
from skimage.restoration import denoise_wavelet | |
from datetime import datetime, timedelta | |
from sklearn.preprocessing import StandardScaler | |
from keras.models import Sequential | |
from keras.layers import LSTM, Dense | |
from tensorflow.keras.optimizers import Adagrad | |
class Deployment(): | |
def __init__(self, context, base_directory): | |
#Prep model structure | |
self.nr_features = 6 | |
self.sequence_length = 10 | |
self.nr_predictions = 1 | |
self.model = Sequential() | |
self.model.add(LSTM(150, input_shape=(self.sequence_length,self.nr_features))) | |
self.model.add(Dense(self.nr_predictions)) | |
self.model.compile(loss='mse', optimizer=Adagrad(learning_rate=0.01)) | |
#Import the model from the bucket | |
UBIOPS_API_TOKEN = os.environ["UBIOPS_API_TOKEN"] | |
print(type(UBIOPS_API_TOKEN)) | |
UBIOPS_API_TOKEN = str(UBIOPS_API_TOKEN) | |
client_prod= ApiClient(Configuration(api_key={'Authorization': UBIOPS_API_TOKEN})) | |
api_instance = CoreApi(api_client=client_prod) | |
print(f"Service status", api_instance.service_status()) | |
print(os.environ["bucket_name"]) | |
print(type(os.environ["bucket_name"])) | |
download_file( | |
client = client_prod, | |
project_name = context["project"], | |
bucket_name = os.environ["bucket_name"], | |
file_name = os.environ["model_path"] | |
) | |
self.model.load_weights('S&P500_model.h5') | |
def request(self, data, context): | |
#Read data from FED | |
url = "https://markets.newyorkfed.org/api/rates/unsecured/effr/last/999.json" | |
response = requests.get(url) | |
data = json.loads(response.content) | |
#Bring to dataframe format | |
effr_df = pd.json_normalize(data['refRates']) | |
effr_df = effr_df.set_index('effectiveDate') | |
effr_df = pd.DataFrame({'EFFR': effr_df['percentRate']}) | |
#Override yahoofinance API | |
yf.pdr_override() | |
#Take a sufficiently large window so that moving averages (MACD) can be calculated | |
start_date = (datetime.now() - timedelta(days = 120)).strftime('%Y-%m-%d') | |
end_date = datetime.now().strftime('%Y-%m-%d') | |
#Get SP500 data | |
sp500 = pdr.get_data_yahoo("SPY", start_date, end_date) | |
#Set datatype of index to object (as in effr) in prep for merge | |
sp500.index = sp500.index.strftime('%Y-%m-%d').astype('object') | |
#Calculate technical indicators using pandas_ta | |
sp500['MACD'] = sp500.ta.macd()['MACD_12_26_9'] | |
sp500['ATR'] = sp500.ta.atr() | |
sp500['RSI'] = sp500.ta.rsi() | |
#Denoise 'Close' using the Haar wavelet | |
denoised_close = denoise_wavelet(sp500['Adj Close'].values, method='VisuShrink', mode='soft', wavelet='haar') | |
sp500['Denoised_Close'] = denoised_close | |
sp500_fin = pd.merge(sp500, effr_df, left_index = True, right_index = True) | |
# Bring data to format model was trained for | |
data = sp500_fin[['Open', 'Denoised_Close', 'EFFR', 'USDX', 'MACD', 'ATR', 'RSI']].tail(11) | |
#Apply a scaler transform | |
scaler = StandardScaler() | |
data = scaler.fit_transform(data) | |
X_test, y_test = self.create_sequences(data, self.sequence_length) | |
prediction = self.model.predict(X_test) | |
#Apply inverse transform | |
prediction = np.c_[prediction, np.zeros(len(prediction)), np.zeros(len(prediction)), np.zeros(len(prediction)), np.zeros(len(prediction)), np.zeros(len(prediction))] | |
prediction = scaler.inverse_transform(prediction) | |
prediction = prediction[:,1][0] | |
return {'prediction' : prediction} | |
@staticmethod | |
def create_sequences(data, sequence_length): | |
'''Create timeseries that serve as input for the LSTM model''' | |
X = [] | |
y = [] | |
for ii in range(sequence_length, len(data)): | |
X.append(data[ii-sequence_length:ii, :]) | |
y.append(data[ii,0]) | |
return np.array(X), np.array(y) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment