-
This demo shows how to set up a recurring forecast for day-ahead hourly electricity price at a trading point in Texas (ERCOT North Hub).
Most electricity is sold in wholesale electricity markets. Power plant operators sell power from their generators while utilities or retail energy providers buy power to cover their load obligations. Often, these companies have dedicated trading, supply, or portfolio management teams responsible for the buying and selling of electricity.
Since the price of electricity can be highly volatile — fluctuating as much as 1,000x between different times in a single day — buying or selling electricity can drive very large gains or losses. To minimize risk and maximize profits, companies need accurate short-term forecasts of volumes (either supply or demand) as well as market prices. These short-term forecasts are then used by traders or portfolio managers to inform buying and selling decisions.
While this demo focuses explicitly on electricity price forecasting, it can be modified to focus on forecasting price for other commodities (e.g. oil and natural gas).
-
The users of the app has to through the following steps.
-
- Data collection. Data is collected for the target (historical day-ahead prices for ERCOT North) and for relevant features (such as system-wide load forecasts and solar production forecasts). These data come via a free data sample from data provider Yes Energy, available through Snowflake Marketplace. This means that no ELT is needed.
- Data processing. Then, we clean the data so that it can be used in the model experimentation. This includes seeing if there are anomalies in the data and formatting the column titles.
- Feature engineering. Then, features must be constructed. We created a feature for Real Time Market (RTM) System Lambda prices from yesterday by doing a 24 hour shift on our Real Time Market (RTM) System Lambda prices from today.
- Hyperparameter optimization. We use two open source packages (Optuna and CMAES) to optimize the hyperparameters of the model.
- Experimentation using backtesting. We then backtest the model. We train the model on data from early 2017 to 2020 and make predictions on the first week of 2021. Then we retrain the model on the previous training data + the first week we made predictions on and predict upon the second week and continue retraining the model weekly to make our next batch of weekly predictions.
- Evaluating performance. Finally, we visualize performance of the model, using both mean squared error (MSE) and mean absolute error (MAE). We find that the model performs significantly better than a persistence forecast.
-
- Scheduling model inference or scoring. Finally, we included a method to produce a new forecast (i.e. run inference or scoring) from a trained model on a schedule. We do this by using a User Defined Function (UDF) to call the model from Snowflake’s ML staging area and perform inference on new data that enters Snowflake’s database.
-
- Since the data is also shared, you can very well access the data using the below commands.
SELECT * FROM <app_name>.CRM.ML_DATA; SELECT * FROM <app_name>.CRM.HYPER_PARAMETER_OUTPUT;
- This app can be used to view the yes energy data in the app and can be used in any existing pipelines too
- This all is the one way to perform price forecasting. There are various storeprocs deployed along with the app as below.
CALL <app_name>.CRM.PYTHON_FUNCTIONS.SPROC_DEPLOY_MODEL(VARCHAR, VARCHAR, VARCHAR, VARCHAR) -- used for deployment CALL <app_name>.CRM.PYTHON_FUNCTIONS.SPROC_FINAL_MODEL(VARCHAR, NUMBER) -- used for forecasting
- Since the data is also shared, you can very well access the data using the below commands.
-
- Please run the below command in a worksheet to ensure the deployment is possible.
GRANT USAGE ON WAREHOUSE {WARE_HOUSE_NAME} TO APPLICATION Energy_Price_Forecasting_App;
Last active
February 12, 2024 14:54
-
-
Save naveenalanthomas/68a6ebb7578cec025c8aaca87b76b046 to your computer and use it in GitHub Desktop.
Energy Price Forecasting App Files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: priceforecasting | |
channels: | |
- snowflake | |
dependencies: | |
- python=3.8.12 | |
- snowflake-snowpark-python | |
- matplotlib | |
- seaborn | |
- tqdm | |
- scikit-learn | |
- cachetools | |
- plotly |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
manifest_version: 1 | |
artifacts: | |
setup_script: scripts/setup.sql | |
readme: readme.md | |
extension_code: true | |
configuration: | |
trace_level: OFF | |
log_level: INFO |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import python packages | |
import streamlit as st | |
# UNCOMMENT FOR NATIVE APP STARTS ## | |
from snowflake.snowpark.context import get_active_session | |
# UNCOMMENT FOR NATIVE APP ENDS ## | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pylab as plt | |
import seaborn as sns | |
from tqdm import tqdm | |
import altair as at | |
import datetime | |
from sklearn.metrics import ( | |
roc_auc_score, | |
average_precision_score, | |
precision_score, | |
recall_score | |
) | |
from sklearn.metrics import ( | |
ConfusionMatrixDisplay, | |
roc_auc_score, | |
average_precision_score, | |
precision_recall_curve, | |
roc_curve, | |
confusion_matrix, accuracy_score, classification_report | |
) | |
from datetime import date | |
st.set_page_config(layout="wide") | |
## defining functions used in the app | |
@st.cache_data(ttl=900) | |
def get_distinct_dates_predicted(_sp_session, app_db, app_sch): | |
""" Get distinct dates post the prediction | |
""" | |
sql_stmt = "select distinct concat(DATE_PART('YEAR',to_timestamp(\"Datetime\")),'-', LPAD(DATE_PART('MM',to_timestamp(\"Datetime\")),2,0),'-', LPAD(DATE_PART('DD',to_timestamp(\"Datetime\")),2,0)) as DATES from {0}.{1}.predictions order by DATES desc;".format(app_db, app_sch) | |
df = _sp_session.sql(sql_stmt).to_pandas() | |
return df['DATES'].values.tolist() | |
@st.cache_data(ttl=900) | |
def get_predictions_results(_sp_session, app_db, app_sh, table_name): | |
""" Get predictions table data into a dataframe | |
""" | |
return _sp_session.table('{0}.{1}.{2}'.format(app_db, app_sh, table_name)).to_pandas() | |
@st.cache_data(ttl=100) | |
def get_tables_list(_sp_session, db_name): | |
""" Get the tables list for auto-displaying. | |
""" | |
#_sp_session.use_database('{0}'.format(db_name)) | |
get_tables_sql_stmt = "select concat(table_schema, '.', table_name) as TABLE_NAME from information_schema.tables where table_schema not in ('INFORMATION_SCHEMA') order by table_schema, table_name;" | |
df = _sp_session.sql(get_tables_sql_stmt).to_pandas() | |
return df['TABLE_NAME'].values.tolist() | |
#@st.cache_data(ttl=900) | |
def get_models_list(_sp_session, app_db, stage_name, app_sch): | |
""" Get the models list from stage for auto-displaying. | |
""" | |
#_sp_session.use_database('{0}'.format(app_db)) | |
show_command = "list @{0}.{1}.{2} PATTERN='.*forecast_.*';".format(app_db, app_sch, stage_name) | |
_sp_session.sql(show_command).collect() | |
get_models_sql_stmt = 'SELECT split_part("name", \'/\', 3) as MODELS FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) order by "last_modified" desc;' | |
df = _sp_session.sql(get_models_sql_stmt).to_pandas() | |
return df['MODELS'].values.tolist() | |
@st.cache_data(ttl=900) | |
def get_warehouses_list(_sp_session): | |
""" Get the tables list for auto-displaying. | |
""" | |
show_command = "SHOW WAREHOUSES in account;" | |
_sp_session.sql(show_command).collect() | |
get_wh_sql_stmt = 'SELECT "name" as WH_NAME FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));' | |
df = _sp_session.sql(get_wh_sql_stmt).to_pandas() | |
return df['WH_NAME'].values.tolist() | |
@st.cache_data(ttl=900) | |
def get_column_list(_sp_session, source_table): | |
try: | |
df = _sp_session.sql("select column_name from information_schema.columns where table_schema ilike '{0}' and table_name ilike '{1}' \ | |
order by ordinal_position;".format(source_table.split('.')[1], source_table.split('.')[2])).to_pandas() | |
except Exception as e: | |
df = _sp_session.sql("select column_name from information_schema.columns where table_schema ilike '{0}' and table_name ilike '{1}' \ | |
order by ordinal_position;".format(source_table.split('.')[0], source_table.split('.')[1])).to_pandas() | |
return df['COLUMN_NAME'].values.tolist() | |
def deploy_price_model_localst(sp_session, app_db, model_name, frequency, source_table, target_table, wh_name, fun_sch): | |
""" Deploy the model that was selected as Dynamic Table | |
¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ THIS WORKS ONLY ON LOCALLY HOSTED APP FOR NOW - DUE TO LIMITAIONS WITH NATIVEAPP !!!!!!!!!!!!!!!!!!!!!!! | |
""" | |
sp_session.sql("CALL {0}.{1}.SPROC_REGISTER_UDF_WITH_MODEL('{2}', '{0}');".format(app_db, | |
fun_sch, | |
model_name)).collect() | |
col_list = get_column_list(sp_session, source_table) | |
col_str = ', '.join('"{0}"'.format(w) for w in col_list) | |
col_without_price_date = ', '.join('"{0}"'.format(w) for w in col_list if w not in ('RTPRICE_TOMM', 'DATETIME')) | |
dynamic_tbl_stmt = "CREATE OR REPLACE DYNAMIC TABLE {0} TARGET_LAG = '{1} minute' WAREHOUSE = {2} AS \ | |
SELECT {3}, {4}.{5}.UDF_PREDICT({7}) as PREDICTION FROM {6}".format(target_table, | |
frequency, | |
wh_name, | |
col_str, | |
app_db, | |
app_sch, | |
source_table, | |
col_without_price_date) | |
sp_session.sql(dynamic_tbl_stmt).collect() | |
return 'Success' | |
def deploy_price_model(sp_session, app_db, model_name, frequency, source_table, target_table, wh_name, append_mode, app_sch, fun_sch): | |
""" Deploy the model that was selected as Streams and Tasks | |
""" | |
if append_mode == 'True': | |
task_stmt = "CALL {0}.{6}.SPROC_DEPLOY_MODEL('{1}', '{2}', '{3}', '{4}', '{0}', '{5}');".format(app_db, source_table, model_name, target_table, append_mode, app_sch, fun_sch) | |
sp_session.sql(task_stmt).collect() | |
return 'Initial Load' | |
else: | |
sp_session.sql("CREATE STREAM IF NOT EXISTS ENGY_FORECASTING_SOURCE_STREAM ON VIEW {0}.{1};".format(app_db, source_table)).collect() | |
task_stmt = "CREATE TASK IF NOT EXISTS {2}.{7}.ENERGY_FORECASTING_INFERENCE_TASK \ | |
WAREHOUSE = {0} \ | |
SCHEDULE = '{1} MINUTE' \ | |
WHEN \ | |
SYSTEM$STREAM_HAS_DATA('ENGY_FORECASTING_SOURCE_STREAM') \ | |
AS \ | |
CALL {2}.{8}.SPROC_DEPLOY_MODEL('{3}', \ | |
'{4}', '{5}', '{6}', '{2}', '{7}');".format(wh_name, frequency, app_db, source_table, model_name, target_table, append_mode, app_sch, fun_sch) | |
sp_session.sql(task_stmt).collect() | |
sp_session.sql("ALTER TASK IF EXISTS ENERGY_FORECASTING_INFERENCE_TASK RESUME").collect() | |
# sp_session.sql("CALL {0}.PYTHON_FUNCTIONS.udf_deploy_model();".format(app_db)).collect() | |
# col_list = get_column_list(sp_session, source_table) | |
# col_str = ', '.join('"{0}"'.format(w) for w in col_list) | |
# col_without_price_date = ', '.join('"{0}"'.format(w) for w in col_list if w not in ('RTPRICE_TOMM', 'DATETIME')) | |
# col_without_price_date = col_without_price_date + ", '@{0}.CRM.ML_MODELS/model/{1}'".format(app_db, model_name) | |
# dynamic_tbl_stmt = "CREATE OR REPLACE DYNAMIC TABLE {0} TARGET_LAG = '{1} minute' WAREHOUSE = {2} AS \ | |
# SELECT {3}, {4}.{5}.udf_predict_price({7}) as PREDICTION FROM {6}".format(target_table, | |
# frequency, | |
# wh_name, | |
# col_str, | |
# app_db, | |
# 'CRM', | |
# source_table, | |
# col_without_price_date) | |
# sp_session.sql(dynamic_tbl_stmt).collect() | |
return 'Dynamic Table Created' | |
@st.cache_data(ttl=900) | |
def perform_hyper_parameter_tuning(_sp_session, app_db, table_name, model_name, n_trials, app_sch, fun_sch): | |
""" Perform Hyper Parameter Tuning and return the results in a dataframe | |
""" | |
sql_call_stmt = "call {4}.{3}.sproc_optuna_optimized_model('{0}','{1}',{2}, '{4}', '{5}')".format(table_name, model_name, n_trials, fun_sch, app_db, app_sch) | |
out_str = _sp_session.sql(sql_call_stmt).collect() | |
st.write(out_str[0][0]) | |
hyper_df = pd.DataFrame() | |
if out_str[0][0] == 'Success': | |
hyper_df = _sp_session.sql('select * from {0}.{1}.HYPER_PARAMETER_OUTPUT;'.format(app_db, app_sch)).to_pandas() | |
return hyper_df | |
def deploy_spike_model(sp_session, app_db, model_name, frequency, source_table, target_table, wh_name, append_mode, fun_sch, app_sch): | |
""" Deploy the model that was selected as Streams and Tasks | |
""" | |
if append_mode == 'True': | |
task_stmt = "CALL {5}.{4}.sproc_deploy_model_spike('{0}', '{1}', '{2}', '{3}', '{5}', '{6}');".format(source_table, model_name, | |
target_table, append_mode, | |
fun_sch, app_db, app_sch) | |
sp_session.sql(task_stmt).collect() | |
return 'Initial Load' | |
else: | |
sp_session.sql("CREATE STREAM IF NOT EXISTS ENGY_FORECASTING_SOURCE_STREAM_SPIKE ON VIEW {0}.{1};".format(app_db, source_table)).collect() | |
task_stmt = "CREATE TASK IF NOT EXISTS {2}.{8}.ENERGY_SPIKE_FORECASTING_INFERENCE_TASK \ | |
WAREHOUSE = {0} \ | |
SCHEDULE = '{1} MINUTE' \ | |
WHEN \ | |
SYSTEM$STREAM_HAS_DATA('ENGY_FORECASTING_SOURCE_STREAM_SPIKE') \ | |
AS \ | |
CALL {2}.{7}.sproc_deploy_model_spike('{3}', \ | |
'{4}', '{5}', '{6}', '{2}', '{8}');".format(wh_name, frequency, app_db, source_table, model_name, target_table, append_mode, fun_sch, app_sch) | |
sp_session.sql(task_stmt).collect() | |
sp_session.sql("ALTER TASK IF EXISTS ENERGY_SPIKE_FORECASTING_INFERENCE_TASK RESUME").collect() | |
return 'Task Created' | |
######################################################################################################################################################## | |
################################# SPIKE FORECASTING FUNCTIONALITY #################################################################################### | |
######################################################################################################################################################## | |
@st.cache_data(ttl=900) | |
def perform_hyper_parameter_tuning_for_spike(_sp_session, app_db, table_name, model_name, n_trials, app_sch, fun_sch): | |
""" Perform Hyper Parameter Tuning and return the results in a dataframe | |
""" | |
sql_call_stmt = "call {4}.{3}.sproc_optuna_optimized_model_spike('{0}','{1}',{2}, '{4}', '{5}')".format(table_name, model_name, n_trials, fun_sch, app_db, app_sch) | |
out_str = _sp_session.sql(sql_call_stmt).collect() | |
#st.write(out_str) | |
hyper_df = pd.DataFrame() | |
if out_str[0][0] == 'Success': | |
hyper_df = _sp_session.sql('select * from {0}.{1}.HYPER_PARAMETER_OUTPUT_SPIKE;'.format(app_db, app_sch)).to_pandas() | |
return hyper_df | |
@st.cache_data(ttl=900) | |
def perform_inferencing(_sp_session, app_db, table_name, app_sch, fun_sch): | |
""" Perform Hyper Parameter Tuning and return the results in a dataframe | |
""" | |
sql_call_stmt = "call {2}.{1}.sproc_spike_forecast('{0}', '{2}', '{3}')".format(table_name, fun_sch, app_db, app_sch) | |
out_str = _sp_session.sql(sql_call_stmt).collect() | |
#st.write(out_str) | |
hyper_df = pd.DataFrame() | |
if out_str[0][0] == 'Success': | |
hyper_df = _sp_session.sql('select * from {0}.{1}.FORECASTED_RESULT;'.format(app_db, app_sch)).to_pandas() | |
return hyper_df | |
#@st.cache_data(ttl=900) | |
def get_database_list(_sp_session, import_flag, app_flag): | |
""" Get the database list for auto-displaying. | |
""" | |
if import_flag: | |
get_command = "select database_name \ | |
from information_schema.databases where type = 'IMPORTED DATABASE' \ | |
order by created desc, database_name;" | |
elif app_flag: | |
get_command = "select database_name \ | |
from information_schema.databases where type = 'APPLICATION' \ | |
order by created desc, database_name;" | |
else: | |
get_command = "select database_name \ | |
from information_schema.databases where type = 'STANDARD' \ | |
order by created desc, database_name;" | |
df = _sp_session.sql(get_command).to_pandas() | |
return df['DATABASE_NAME'].values.tolist() | |
@st.cache_data(ttl=100) | |
def get_table_views_list_from_db(_sp_session, database): | |
""" Get the views list for auto-displaying. | |
""" | |
#_sp_session.use_database('{0}'.format(database)) | |
get_views_sql_stmt = "select CONCAT(TABLE_SCHEMA,'.',TABLE_NAME) as VIEWS from information_schema.tables \ | |
where table_schema != 'INFORMATION_SCHEMA' order by table_schema, table_name;" | |
df = _sp_session.sql(get_views_sql_stmt).to_pandas() | |
return df['VIEWS'].values.tolist() | |
# Get the current credentials -- App way | |
# UNCOMMENT FOR NATIVE APP STARTS ## | |
sp_session = get_active_session() | |
# UNCOMMENT FOR NATIVE APP ENDS ## | |
# COMMENT FOR NATIVE APP STARTS ## | |
# Get the current credentials -- Local Streamlit way | |
# import sflk_base as L | |
# # Define the project home directory, this is used for locating the config.ini file | |
# PROJECT_HOME_DIR='.' | |
# def initialize_snowpark(): | |
# if "snowpark_session" not in st.session_state: | |
# config = L.get_config(PROJECT_HOME_DIR) | |
# sp_session = L.connect_to_snowflake(PROJECT_HOME_DIR) | |
# sp_session.use_role(f'''{config['APP_DB']['role']}''') | |
# sp_session.use_schema(f'''{config['APP_DB']['database']}.{config['APP_DB']['schema']}''') | |
# sp_session.use_warehouse(f'''{config['APP_DB']['warehouse']}''') | |
# st.session_state['snowpark_session'] = sp_session | |
# else: | |
# config = L.get_config(PROJECT_HOME_DIR) | |
# sp_session = st.session_state['snowpark_session'] | |
# return (config, sp_session) | |
# COMMENT FOR NATIVE APP ENDS ## | |
# App | |
st.title("Energy Price Forecasting Native App") | |
st.write( | |
"""This Snowflake Native App allows users to forecast wholesale energy prices and the likelihood of price spikes using data from Yes Energy. The structure below is broken into two pieces: forecasting energy prices as well as forecasting the likelihood of energy price spikes. | |
""" | |
) | |
with st.container(): | |
# UNCOMMENT FOR NATIVE APP STARTS ## | |
app_db = sp_session.sql("SELECT CURRENT_DATABASE()").collect()[0][0] | |
app_sch = 'CRM' | |
fun_db = app_db | |
fun_sch = 'PYTHON_FUNCTIONS' | |
app_flag = True | |
# UNCOMMENT FOR NATIVE APP ENDS ## | |
# COMMENT FOR NATIVE APP STARTS ## | |
# config, sp_session = initialize_snowpark() | |
# app_db = config['APP_DB']['database'] | |
# app_sch = config['APP_DB']['schema'] | |
# fun_db = app_db | |
# fun_sch = app_sch | |
# app_flag = False | |
# COMMENT FOR NATIVE APP ENDS ## | |
colxx, colxxx, colyy, colx, coly, colz = st.columns(6) | |
with colz: | |
if st.button('Refresh Page', use_container_width=False, key='refresh1'): | |
st.experimental_rerun() | |
st.header("Forecasting Energy Prices") | |
hyper_parameter_output = 'hyper_parameter_output' | |
stage_name = 'ML_MODELS' | |
data_array, cols_list, features_list=[],[],[] | |
target_selected='' | |
if app_flag: | |
table_to_select = 'ML_DATA_VW' | |
else: | |
table_to_select = 'ML_DATA' | |
st.subheader(':blue[Feature Selection]') | |
with st.expander("Feature Selection"): | |
st.write("In this section, you will select the tables and features for the model training and testing. For the sake of simplicity, this demo does not show any feature engineering.") | |
db_list = get_database_list(sp_session, False, app_flag) | |
col1, col2, col33 = st.columns(3) | |
with col1: | |
database = st.selectbox( | |
'Select the database with features and target', | |
db_list) | |
with col2: | |
if database: | |
list_of_tables = get_table_views_list_from_db(sp_session, database) | |
try: | |
index = list_of_tables.index("{0}.{1}".format(app_sch, table_to_select)) | |
except Exception as e: | |
index = 0 | |
table_selected = st.selectbox( | |
'Select the source table/view with features and target', | |
list_of_tables) | |
with col33: | |
cols_list = get_column_list(sp_session, table_selected) | |
if database and table_selected: | |
cols_list = get_column_list(sp_session, table_selected) | |
if len(cols_list) > 0: | |
target_selected = st.selectbox( | |
'Select the target column', | |
cols_list, index=len(cols_list)-2) | |
coli, colj, colk = st.columns(3) | |
with coli: | |
if database and table_selected: | |
try: | |
min_date = sp_session.sql("select max(datetime)::date AS MAXDATE , min(datetime)::date as MINDATE from {0}.{1};".format(database, table_selected)).collect()[0][1] | |
start_date = st.date_input( | |
"Pick the start date", | |
min_date, min_value=datetime.date(1996, 10,1),key='3') | |
except Exception as e: | |
min_date = date.today() | |
start_date = st.date_input( | |
"Pick the start date", | |
min_date, min_value=datetime.date(1996, 10,1),key='33') | |
with colj: | |
if database and table_selected: | |
try: | |
max_date = sp_session.sql("select max(datetime)::date AS MAXDATE , min(datetime)::date as MINDATE from {0}.{1};".format(database, table_selected)).collect()[0][0] | |
end_date = st.date_input( | |
"Pick the end date", | |
max_date, max_value=date.today(), key='4') | |
except Exception as e: | |
max_date = date.today() | |
end_date = st.date_input( | |
"Pick the end date", | |
max_date, max_value=date.today(), key='4') | |
with colk: | |
if len(cols_list) > 0: | |
cols_list_features = cols_list.copy() | |
try: | |
cols_list_features.remove('RTPRICE_TOMM') | |
cols_list_features.remove('DATETIME') | |
except Exception as e: | |
print(e) | |
features_list = st.multiselect( | |
'Select the features to use in the model', | |
cols_list_features | |
) | |
if st.button('SELECT FEATURES'): | |
view_exists_flag = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'ML_DATA_VW')).collect()[0][0] | |
if view_exists_flag and app_flag: | |
sp_session.sql("CREATE OR REPLACE VIEW {1}.{2}.ML_DATA_FACTOR_VW as SELECT {0},{6}, DATETIME FROM {1}.{3} WHERE \ | |
datetime >= '{4} 00:00:00.000'::datetime \ | |
and datetime <= '{5} 00:00:00.000'::datetime;".format(('%s' % ', '.join(map(str, features_list))), app_db, app_sch, table_selected, start_date, end_date, target_selected )).collect() | |
sp_session.sql("CREATE OR REPLACE VIEW {1}.{2}.ML_DATA_FACTOR_SPIKE_VW as SELECT {0},{6}, DATETIME FROM {1}.{2}.ML_DATA_SPIKE_VW WHERE \ | |
datetime >= '{4} 00:00:00.000'::datetime \ | |
and datetime <= '{5} 00:00:00.000'::datetime;".format(('%s' % ', '.join(map(str, features_list))), app_db, app_sch, table_selected, start_date, end_date, target_selected )).collect() | |
else: | |
sp_session.sql("CREATE OR REPLACE VIEW {1}.{2}.ML_DATA_VW as SELECT {0},{6}, DATETIME FROM {1}.{3} WHERE \ | |
datetime >= '{4} 00:00:00.000'::datetime \ | |
and datetime <= '{5} 00:00:00.000'::datetime;".format(('%s' % ', '.join(map(str, features_list))), app_db, app_sch, table_selected, start_date, end_date, target_selected )).collect() | |
sp_session.sql("CREATE OR REPLACE VIEW {1}.{2}.ML_DATA_SPIKE_VW as SELECT {0},{6}, DATETIME FROM {1}.{3}_SPIKE WHERE \ | |
datetime >= '{4} 00:00:00.000'::datetime \ | |
and datetime <= '{5} 00:00:00.000'::datetime;".format(('%s' % ', '.join(map(str, features_list))), app_db, app_sch, table_selected, start_date, end_date, target_selected )).collect() | |
st.success('features selected successfully') | |
#st.info(':orange[NOTE: The implementation of the selected features are not included into the demo yet, by default all the features from the table is used and target variable of PRICE is chosen currently]') | |
st.subheader(':blue[Exploration]') | |
with st.expander("Exploration"): | |
st.write('In this section, we are plotting the different features and target.') | |
#sp_session.use_database('{0}'.format(app_db)) | |
if app_flag: | |
table_name = 'ML_DATA_FACTOR_VW' | |
else: | |
table_name = 'ML_DATA_VW' | |
try: | |
df = sp_session.sql("SELECT * FROM {2}.{1}.{0};".format(table_name, app_sch, app_db)).to_pandas() | |
except Exception as e: | |
df = pd.DataFrame() | |
if df.empty: | |
st.info("Please select features before proceeding") | |
else: | |
df = df.set_index('DATETIME') | |
st.subheader('Exploration') | |
col2, col3 = st.columns(2) | |
with col2: | |
st.dataframe(df.head(20)) | |
with col3: | |
st.line_chart(data=df, y='RTPRICE_TOMM', use_container_width=True) | |
st.subheader('Plotting Correlation') | |
col2, col3 = st.columns(2) | |
with col2: | |
correlation_df=df.copy() | |
cor_data = (correlation_df | |
.corr().stack() | |
.reset_index() # The stacking results in an index on the correlation values, we need the index as normal columns for Altair | |
.rename(columns={0: 'correlation', 'level_0': 'Features', 'level_1': 'Features2'})) | |
cor_data['correlation_label'] = cor_data['correlation'].map('{:.2f}'.format) # Round to 2 decimal | |
#st.dataframe(cor_data) | |
base = at.Chart(cor_data).encode( | |
x='Features2:O', | |
y='Features:O' | |
) | |
# Text layer with correlation labels | |
# Colors are for easier readability | |
text = base.mark_text().encode( | |
text='correlation_label', | |
color=at.condition( | |
at.datum.correlation > 0.5, | |
at.value('white'), | |
at.value('black') | |
) | |
) | |
# The correlation heatmap itself | |
cor_plot = base.mark_rect().encode( | |
color='correlation:Q' | |
) | |
cor_plot + text | |
st.subheader(':blue[Hyperparameter Optimization]') | |
with st.expander("Hyperparameter Optimization"): | |
st.subheader('Hyperparameter Optimization') | |
model_name = 'optuna_model.sav' | |
n_trials = 500 | |
hyper_df = pd.DataFrame() | |
#sp_session.use_database('{0}'.format(app_db)) | |
hyper_tuning_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'HYPER_PARAMETER_OUTPUT')).collect()[0][0] | |
st.write("We use two open source packages, Optuna and CMAES, to optimize the hyperparameters of the model. You can see the selected hyperparameters below. ") | |
if hyper_tuning_done: | |
hyper_df = sp_session.sql('SELECT * FROM {2}.{1}.{0};'.format('HYPER_PARAMETER_OUTPUT', app_sch, app_db)).to_pandas() | |
if st.button("Run hyperparameter optimization", key='111'): | |
hyper_df = perform_hyper_parameter_tuning(sp_session, app_db, table_name, model_name, n_trials, app_sch, fun_sch) | |
else: | |
if st.button("Run hyperparameter optimization", key='Z1'): | |
hyper_df = perform_hyper_parameter_tuning(sp_session, app_db, table_name, model_name, n_trials, app_sch, fun_sch) | |
st.dataframe(hyper_df.head(20), use_container_width=True) | |
st.subheader(':blue[Backtesting and Evaluation]') | |
with st.expander("Backtesting and Evaluation"): | |
st.write("In this section, you will run a backtest on a 1-year test period, using walk-forward cross-validation. In the backtest, the model will forecast prices for the next day with only data available up to that point (i.e. no data leakage). The image below allows you to show forecasts versus actuals by day. ") | |
df['DATETIME']=[str(i) for i in df.index] | |
start_threshhold=df[['2020' in i or '2021' in i or '2022' in i for i in df.DATETIME]].shape[0] | |
splits=[i for i in range(start_threshhold-7, df.shape[0], 7*24) if i+7*24< df.shape[0]] | |
scores=[] | |
col5, col6 = st.columns(2) | |
#st.write(scores) | |
with col5: | |
if st.button("Run Backtest"): | |
for i in tqdm(splits): | |
split = i | |
sql_call_stmt = "call {3}.{2}.sproc_final_model('{0}',{1}, '{3}','{4}')".format(table_name, split, fun_sch, app_db, app_sch) | |
best_trial_params = sp_session.sql(sql_call_stmt).collect() | |
scores.append(best_trial_params) | |
#sp_session.use_database('{0}'.format(app_db)) | |
prediction_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'PREDICTIONS')).collect()[0][0] | |
if prediction_done: | |
distinct_dates = get_distinct_dates_predicted(sp_session, app_db, app_sch) | |
date_selected = st.selectbox('Select the day for which to view the actuals versus predictions in the backtest.', distinct_dates) | |
df_pred=sp_session.sql("SELECT * FROM {1}.{2}.PREDICTIONS where concat(DATE_PART('YEAR',to_timestamp(\"Datetime\")), \ | |
'-', LPAD(DATE_PART('MM',to_timestamp(\"Datetime\")),2,0),'-', LPAD(DATE_PART('DD', \ | |
to_timestamp(\"Datetime\")),2,0)) = '{0}'".format(date_selected, app_db, app_sch)).to_pandas() | |
df_pred = df_pred.set_index('Datetime') | |
st.line_chart(data=df_pred, y=['Ground_Truth','Predictions'], use_container_width=True) | |
with st.expander("View Backtest Results and Performance"): | |
try: | |
if prediction_done: | |
#Call predictions table and get results from March 2021 to end of 2021 | |
results = get_predictions_results(sp_session, app_db, app_sch, 'PREDICTIONS') | |
results1=results[["2020" not in i and "2021-01" not in i and "2021-02" not in i for i in results.Datetime]] | |
results['Persistence24HOUR']=results.Ground_Truth.shift(24) | |
results['Persistence48HOUR']=results.Ground_Truth.shift(48) | |
#Define results and metrics of model | |
#Get final TLDR metric performances comparing Myst models with Snowflake models | |
preds=np.array(results1.Predictions) | |
truth=np.array(results1.Ground_Truth) | |
mae=sum(np.abs(np.subtract(preds,truth)))/len(preds) | |
mse=sum(np.subtract(preds,truth)**2)/len(preds) | |
mae24=sum(np.abs(np.subtract(results['Persistence24HOUR'][24:],truth[24:])))/len(results['Persistence24HOUR'][24:]) | |
mse24=sum(np.subtract(results['Persistence24HOUR'][24:],truth[24:])**2)/len(results['Persistence24HOUR'][24:]) | |
mae48=sum(np.abs(np.subtract(results['Persistence48HOUR'][48:],truth[48:])))/len(results['Persistence48HOUR'][48:]) | |
mse48=sum(np.subtract(results['Persistence48HOUR'][48:],truth[48:])**2)/len(results['Persistence48HOUR'][48:]) | |
#add prior results | |
results_df=pd.DataFrame({'MSE': [mse24, mse48, mse],'MAE': [mae24,mae48, mae]}) | |
results_df.index=['Lambda - 24H Persistence', | |
'Lambda - 48H Persistence','Snowflake Price Forecast'] | |
st.dataframe(results_df) | |
st.line_chart(results1, x='Datetime', y=['Predictions', 'Ground_Truth']) | |
st.line_chart(results1, x='Datetime', y='MAE') | |
st.line_chart(results1, x='Datetime', y='MSE') | |
except Exception as e: | |
st.info("Perform Forecasting to view the results") | |
with st.container(): | |
st.header("Model Deployment") | |
coli, colj, colk = st.columns(3) | |
with coli: | |
tables = get_tables_list(sp_session, app_db) | |
if len(tables) > 0: | |
source_table_name = st.selectbox( | |
'Please select the source table', | |
tables, key='price1') | |
else: | |
source_table_name = st.text_input( | |
"Please select the source table","ENGY_FORECASTING_SOURCE", | |
key="sourcetable", | |
) | |
with colj: | |
target_table_name = st.text_input( | |
"Please provide a name for the target table","ENGY_FORECASTING_TARGET", | |
key="targettable", | |
) | |
with colk: | |
frequency = st.selectbox( | |
'Please select how often to generate a new forecast (in minutes)', | |
range(15, 300, 15), key='price2') | |
coll, colm, coln = st.columns(3) | |
with coll: | |
models = get_models_list(sp_session, app_db, stage_name, app_sch) | |
if len(models) > 0: | |
Model_Name = st.selectbox( | |
'Please select the model', | |
models, key='price3') | |
else: | |
Model_Name = 'forecast_2020-12-31 16:00:00.sav' | |
st.info('Please run the forecasting to create Model files in stage') | |
with colm: | |
warehouses = get_warehouses_list(sp_session) | |
if len(warehouses) > 0: | |
warehouse_name = st.selectbox('Please select the warehouse ', | |
warehouses, key='price4') | |
else: | |
warehouse_name = st.text_input( | |
"Please select the warehouse","COMPUTE_WH", | |
key="warehouse", | |
) | |
with coln: | |
append_mode = st.selectbox( | |
'Run inference once or Schedule the inference', | |
('True', 'False')) | |
if st.button('Deploy Model', key='price6', use_container_width=True): | |
out_str = deploy_price_model(sp_session, app_db, Model_Name, frequency, source_table_name, target_table_name, warehouse_name, append_mode, app_sch, fun_sch) | |
if out_str == 'Dynamic Table Created': | |
st.success('Dynamic Table created with the parameters chosen!', icon="✅") | |
df_pred = sp_session.sql("SELECT * FROM {0}.{1}.{2} order by DATETIME DESC limit 30".format(app_db, app_sch, target_table_name)).to_pandas() | |
elif out_str == 'Initial Load': | |
st.success('Forecasting Performed for the Source Table : {0}'.format(source_table_name), icon="✅") | |
df_pred = sp_session.sql("SELECT * FROM {0}.{1}.{2} order by DATETIME DESC limit 30".format(app_db, app_sch, target_table_name)).to_pandas() | |
st.dataframe(df_pred) | |
st.subheader(':blue[View Predicted Results]') | |
pred_sel_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name in ('{1}')".format(app_sch, target_table_name)).collect()[0][0] | |
if pred_sel_done: | |
st.dataframe(sp_session.sql("SELECT * FROM {0}.{1}.{2} order by DATETIME DESC limit 30".format(app_db, app_sch, target_table_name)).to_pandas()) | |
######################################################################################################################################################## | |
################################# SPIKE FORECASTING FUNCTIONALITY #################################################################################### | |
######################################################################################################################################################## | |
with st.container(): | |
if app_flag: | |
factor_sel_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name in ('{1}')".format(app_sch, 'ML_DATA_FACTOR_SPIKE_VW')).collect()[0][0] | |
else: | |
factor_sel_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name in ('{1}')".format(app_sch, 'ML_DATA_SPIKE_VW')).collect()[0][0] | |
if factor_sel_done and app_flag: | |
table_name_spike = 'ML_DATA_FACTOR_SPIKE_VW' | |
elif not factor_sel_done and app_flag: | |
table_name_spike = 'ML_DATA_SPIKE_VW' | |
elif factor_sel_done and not app_flag: | |
table_name_spike = 'ML_DATA_SPIKE_VW' | |
else: | |
table_name_spike = 'ML_DATA_SPIKE' | |
stage_name = 'ML_SPIKE_MODELS' | |
data_array = [] | |
st.title("Perform Energy Spike Forecasting") | |
spike_table_present = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, table_name_spike)).collect()[0][0] | |
if spike_table_present: | |
df_spike = sp_session.sql("SELECT * FROM {2}.{1}.{0};".format(table_name_spike, app_sch, app_db)).to_pandas() | |
df_spike = df_spike.set_index('DATETIME') | |
#app_wh = sp_session.sql("SELECT CURRENT_WAREHOUSE()").collect()[0][0] | |
st.subheader(':blue[Exploration]') | |
with st.expander("Exploration"): | |
st.subheader('Exploration') | |
st.write("In this section, we are plotting the different features and target.") | |
col2, col3 = st.columns(2) | |
with col2: | |
st.dataframe(df_spike.head(20)) | |
with col3: | |
st.line_chart(data=df_spike, y='RTPRICE_TOMM', use_container_width=True) | |
st.subheader('Plotting Correlation') | |
col222, col333 = st.columns(2) | |
with col222: | |
correlation_df=df_spike.copy() | |
cor_data = (correlation_df | |
.corr().stack() | |
.reset_index() # The stacking results in an index on the correlation values, we need the index as normal columns for Altair | |
.rename(columns={0: 'correlation', 'level_0': 'Features', 'level_1': 'Features2'})) | |
cor_data['correlation_label'] = cor_data['correlation'].map('{:.2f}'.format) # Round to 2 decimal | |
#st.dataframe(cor_data) | |
base = at.Chart(cor_data).encode( | |
x='Features2:O', | |
y='Features:O' | |
) | |
# Text layer with correlation labels | |
# Colors are for easier readability | |
text = base.mark_text().encode( | |
text='correlation_label', | |
color=at.condition( | |
at.datum.correlation > 0.5, | |
at.value('white'), | |
at.value('black') | |
) | |
) | |
# The correlation heatmap itself | |
cor_plot = base.mark_rect().encode( | |
color='correlation:Q' | |
) | |
cor_plot + text | |
else: | |
st.info("Perform the Fetch from Yes Energy to create views to your local SF account") | |
st.subheader(':blue[Hyperparameter Optimization]') | |
with st.expander("Hyperparameter Optimization"): | |
model_name = 'optuna_model_s.sav' | |
n_trials = 500 | |
st.subheader('Hyperparameter Optimization') | |
st.write("We use two open source packages, Optuna and CMAES, to optimize the hyperparameters of the model. You can see the selected hyperparameters below. ") | |
hyper_df = pd.DataFrame() | |
hyper_tuning_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'HYPER_PARAMETER_OUTPUT_SPIKE')).collect()[0][0] | |
if hyper_tuning_done: | |
hyper_df = sp_session.sql('SELECT * FROM {1}.{0};'.format('HYPER_PARAMETER_OUTPUT_SPIKE', app_sch)).to_pandas() | |
else: | |
if st.button("Run hyperparameter optimization", key='22'): | |
hyper_df = perform_hyper_parameter_tuning_for_spike(sp_session, app_db, table_name_spike, model_name, n_trials, app_sch, fun_sch) | |
st.dataframe(hyper_df.head(20), use_container_width=True) | |
st.subheader(':blue[Perform threshold analysis]') | |
with st.expander("Perform threshold analysis"): | |
col6, col7, col8 = st.columns(3) | |
with col6: | |
st.subheader('Precision Recall Curve') | |
hyper_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'FORECASTED_RESULT')).collect()[0][0] | |
if hyper_done: | |
scored_sdf = sp_session.sql("SELECT * FROM {0}.{2}.{1}".format(app_db, 'FORECASTED_RESULT', app_sch)).to_pandas() | |
scored_sdf = scored_sdf[['PREDICTION', 'RTPRICE_TOMM']] | |
y_pred = scored_sdf['PREDICTION'] | |
y_true = scored_sdf['RTPRICE_TOMM'] | |
fpr, tpr, t = roc_curve(y_true, y_pred) | |
auc = roc_auc_score(y_true, y_pred) | |
p, r, c = precision_recall_curve(y_true, y_pred) | |
ave_precision = average_precision_score(y_true, y_pred) | |
f1 = 2*p*r/(p+r) | |
t = c[f1.argmax()] | |
fig, axis = plt.subplots() | |
plt.plot(r, p) | |
plt.xlabel('recall') | |
plt.ylabel('precision') | |
plt.title(f'Precision Recall Curve\nAP Distance Numeric={ave_precision:.3f} ') | |
st.plotly_chart(fig, use_container_width=True) | |
else: | |
if st.button('Run Threshold Analysis'): | |
scored_sdf = perform_inferencing(sp_session, app_db, table_name_spike, app_sch, fun_sch) | |
scored_sdf = scored_sdf[['PREDICTION', 'RTPRICE_TOMM']] | |
y_pred = scored_sdf['PREDICTION'] | |
y_true = scored_sdf['RTPRICE_TOMM'] | |
fpr, tpr, t = roc_curve(y_true, y_pred) | |
auc = roc_auc_score(y_true, y_pred) | |
p, r, c = precision_recall_curve(y_true, y_pred) | |
ave_precision = average_precision_score(y_true, y_pred) | |
f1 = 2*p*r/(p+r) | |
t = c[f1.argmax()] | |
fig, axis = plt.subplots() | |
plt.plot(r, p) | |
plt.xlabel('recall') | |
plt.ylabel('precision') | |
plt.title(f'Precision Recall Curve\nAP Distance Numeric={ave_precision:.3f} ') | |
st.plotly_chart(fig, use_container_width=True) | |
with col7: | |
st.subheader('Plot ROC Curve') | |
hyper_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'FORECASTED_RESULT')).collect()[0][0] | |
if hyper_done: | |
fig, axis = plt.subplots() | |
plt.plot(fpr, tpr) | |
plt.xlabel('fpr') | |
plt.ylabel('tpr') | |
plt.title(f'ROC Curve\nROC AUC={auc:.3f}') | |
st.plotly_chart(fig, use_container_width=True) | |
with col8: | |
st.subheader('Threshhold with maximum F1 Score') | |
hyper_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'FORECASTED_RESULT')).collect()[0][0] | |
if hyper_done: | |
#st.write("#### Threshhold that maximizes F1 Score:") | |
pcr=pd.DataFrame({'Threshhold':c, 'Precision':p[:-1],'Recall':r[:-1], 'AUC':auc}) | |
pcr[pcr.Threshhold==t] | |
st.subheader(':blue[Backtesting and Evaluation]') | |
with st.expander("Backtesting and Evaluation"): | |
st.write("In this section, you will run a backtest on a 1-year test period, using walk-forward cross-validation. In the backtest, the model will forecast prices for the next day with only data available up to that point (i.e. no data leakage). The image below allows you to show forecasts versus actuals by day. ") | |
if spike_table_present: | |
df_spike['DATETIME']=[str(i) for i in df_spike.index] | |
start_threshhold=df_spike[['2020' in i or '2021' in i or '2022' in i for i in df_spike.DATETIME]].shape[0] | |
#start_threshhold=df_spike[['2017' in i or '2018' in i or '2019' in i or '2020' in i for i in df_spike.DATETIME]].shape[0] | |
splits=[i for i in range(start_threshhold-7, df_spike.shape[0], 7*24) if i+7*24< df_spike.shape[0]] | |
scores=[] | |
col5, col6, col7, col8 = st.columns(4) | |
with col5: | |
if st.button("Run Backtest", key='333'): | |
for i in splits: | |
split = i | |
sql_call_stmt = "call {3}.{2}.sproc_final_model_spike('{0}',{1}, '{3}', '{4}')".format(table_name_spike, split, fun_sch, app_db, app_sch) | |
best_trial_params = sp_session.sql(sql_call_stmt).collect() | |
scores.append(best_trial_params) | |
st.info("Forecasting is completed") | |
prediction_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name = '{1}'".format(app_sch, 'PREDICTIONS1')).collect()[0][0] | |
#scored_sdf = test.with_column('PREDICTION', udf_score_optuna_model_vec_cached(*feature_cols)) | |
st.subheader(':blue[View Backtest Results and Performance]') | |
with st.expander("View Backtest Results and Performance"): | |
if prediction_done: | |
results = get_predictions_results(sp_session, app_db, app_sch, 'PREDICTIONS1') | |
results1=results[["2023" in i for i in results.Datetime]] | |
results1['Persistence24HOUR']=results1.Ground_Truth.shift(24) | |
results1['Persistence48HOUR']=results1.Ground_Truth.shift(48) | |
#Define results and metrics of model | |
preds=results1.Predictions.values | |
truth=results1.Ground_Truth | |
auc24 = roc_auc_score(truth[24:], results1['Persistence24HOUR'][24:].astype(bool)) | |
ap24 = average_precision_score(truth[24:], results1['Persistence24HOUR'][24:].astype(bool)) | |
r24=recall_score(truth[24:], results1['Persistence24HOUR'][24:].astype(bool)) | |
p24=precision_score(truth[24:], results1['Persistence24HOUR'][24:].astype(bool)) | |
auc48 = roc_auc_score(truth[48:], results1['Persistence48HOUR'][48:].astype(bool)) | |
ap48 = average_precision_score(truth[48:], results1['Persistence48HOUR'][48:].astype(bool)) | |
r48=recall_score(truth[48:], results1['Persistence48HOUR'][48:].astype(bool)) | |
p48=precision_score(truth[48:], results1['Persistence48HOUR'][48:].astype(bool)) | |
auc = roc_auc_score(truth, preds) | |
ap = average_precision_score(truth, preds) | |
r=recall_score(truth, preds) | |
p=precision_score(truth, preds) | |
#results_df=pd.DataFrame({'MSE': [mse],'MAE': [mae]}) | |
#results_df.index=['Snowflake Price Forecast'] | |
st.dataframe(results1) | |
st.line_chart(results1, x='Datetime', y=['Predictions', 'Ground_Truth']) | |
st.line_chart(results1, x='Datetime', y='AP') | |
st.line_chart(results1, x='Datetime', y='Precision') | |
st.line_chart(results1, x='Datetime', y='Recall') | |
st.line_chart(results1, x='Datetime', y='AUC') | |
resultso = get_predictions_results(sp_session, app_db, app_sch, 'PREDICTIONS') | |
#narrow results to March 2021 to end of 2021 | |
resultso1=resultso[["2023" in i for i in resultso.Datetime]] | |
# resultso1=resultso[["2020" not in i and "2021-01" not in i and "2021-02" not in i and "2021-02" not in i for i in resultso.Datetime]] | |
preds1=resultso1.Predictions.values>50 | |
truth1=resultso1.Ground_Truth>50 | |
nauc = roc_auc_score(truth1, preds1) | |
nap = average_precision_score(truth1, preds1) | |
n_r=recall_score(truth1, preds1) | |
n_p=precision_score(truth1, preds1) | |
results_df=pd.DataFrame({'Precision':[p24,p48,p,n_p], 'Recall': [r24,r48,r,n_r],'AUC': [auc24,auc48,auc,nauc]}) | |
results_df=pd.concat([results_df, pcr[pcr.Threshhold==t][['Precision', 'Recall', 'AUC']]]) | |
results_df.index=['Lambda - 24H Persistence','Lambda - 48H Persistence', | |
'Snowflake Spike Forecast', 'Snowflake Price Forecast','Snowflake One Time Train'] | |
results_df.style.format(precision=2).highlight_max(color="lightgreen", axis=0) | |
st.subheader('Our Backtesting Results') | |
st.dataframe(results_df) | |
with st.container(): | |
st.header("Spike Model Deployment") | |
coli, colj, colk = st.columns(3) | |
with coli: | |
tables = get_tables_list(sp_session, app_db) | |
if len(tables) > 0: | |
source_table_name = st.selectbox( | |
'Please select the source table', | |
tables, key='pricespike1') | |
else: | |
source_table_name = st.text_input( | |
"Please select the source table","ENGY_SPIKE_FORECASTING_SOURCE", | |
key="sourcetable2", | |
) | |
with colj: | |
target_table_name = st.text_input( | |
"Please provide a name for the target table","ENGY_SPIKE_FORECASTING_TARGET", | |
key="targettable2", | |
) | |
with colk: | |
frequency = st.selectbox( | |
'Please select how often to generate a new forecast (in minutes)', | |
range(15, 300, 15), key='pricespike2') | |
coll, colm, coln = st.columns(3) | |
with coll: | |
models = get_models_list(sp_session, app_db, stage_name, app_sch) | |
if len(models) > 0: | |
Model_Name = st.selectbox( | |
'Please select the model', | |
models, key='pricespike3') | |
else: | |
Model_Name = 'forecast_2020-12-31 16:00:00.sav' | |
st.info('Please run the forecasting to create Model files in stage') | |
with colm: | |
warehouses = get_warehouses_list(sp_session) | |
if len(warehouses) > 0: | |
warehouse_name = st.selectbox('Please select the warehouse', | |
warehouses, key='pricespike4') | |
else: | |
warehouse_name = st.text_input( | |
"Please select the warehouse","COMPUTE_WH", | |
key="warehousespike", | |
) | |
with coln: | |
append_mode = st.selectbox( | |
'Run inference once or Schedule the inference', | |
('True', 'False'), key='spike') | |
if st.button('Deploy Spike Model', key='pricespike6', use_container_width=True): | |
out_str = deploy_spike_model(sp_session, app_db, Model_Name, frequency, source_table_name, target_table_name, warehouse_name, append_mode, fun_sch, app_sch) | |
if out_str == 'Task Created': | |
st.success('ENERGY_FORECASTING_INFERENCE_TASK --> Task Created', icon="✅") | |
df_pred = sp_session.sql("SELECT * FROM {0}.{1}.PREDICTIONS1 limit 30".format(app_db, app_sch, target_table_name)).to_pandas() | |
elif out_str == 'Initial Load': | |
st.success('Forecasting Performed for the Source Table : {0}'.format(source_table_name), icon="✅") | |
df_pred = sp_session.sql("SELECT * FROM {0}.{1}.PREDICTIONS1 limit 30".format(app_db, app_sch, target_table_name)).to_pandas() | |
st.subheader(':blue[View Predicted Results]') | |
pred_sel_done = sp_session.sql("select to_boolean(count(1)) from information_schema.tables where table_schema = '{0}' and table_name in ('{1}')".format(app_sch, 'PREDICTIONS1')).collect()[0][0] | |
if pred_sel_done: | |
st.dataframe(sp_session.sql("SELECT * FROM {0}.{1}.{2} limit 30".format(app_db, app_sch, 'PREDICTIONS1')).to_pandas()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Set up script for the Hello Snowflake! application. | |
CREATE APPLICATION ROLE IF NOT EXISTS APP_PUBLIC; | |
CREATE SCHEMA IF NOT EXISTS CORE; | |
GRANT USAGE ON SCHEMA CORE TO APPLICATION ROLE APP_PUBLIC; | |
CREATE SCHEMA IF NOT EXISTS TASKS; | |
GRANT USAGE ON SCHEMA TASKS TO APPLICATION ROLE APP_PUBLIC; | |
-- 2nd Part | |
CREATE OR ALTER VERSIONED SCHEMA CRM; | |
GRANT USAGE ON SCHEMA CRM TO APPLICATION ROLE APP_PUBLIC; | |
CREATE OR ALTER VERSIONED SCHEMA PYTHON_FUNCTIONS; | |
GRANT USAGE ON SCHEMA PYTHON_FUNCTIONS TO APPLICATION ROLE APP_PUBLIC; | |
CREATE STREAMLIT CORE.PRICE_FORECASTING_APP_ST | |
FROM '/streamlit' | |
MAIN_FILE = '/nativeApp.py' | |
; | |
GRANT USAGE ON STREAMLIT CORE.PRICE_FORECASTING_APP_ST TO APPLICATION ROLE APP_PUBLIC; | |
create or replace stage CRM.ML_MODELS | |
directory = ( enable = true ) | |
comment = 'used for holding ML Models.'; | |
GRANT READ ON STAGE CRM.ML_MODELS TO APPLICATION ROLE APP_PUBLIC; | |
GRANT WRITE ON STAGE CRM.ML_MODELS TO APPLICATION ROLE APP_PUBLIC; | |
create or replace stage CRM.ML_SPIKE_MODELS | |
directory = ( enable = true ) | |
comment = 'used for holding ML Spike Models.'; | |
GRANT READ ON STAGE CRM.ML_SPIKE_MODELS TO APPLICATION ROLE APP_PUBLIC; | |
GRANT WRITE ON STAGE CRM.ML_SPIKE_MODELS TO APPLICATION ROLE APP_PUBLIC; | |
create or replace stage CORE.UDF | |
directory = ( enable = true ) | |
comment = 'used for holding UDFs.'; | |
GRANT READ ON STAGE CORE.UDF TO APPLICATION ROLE APP_PUBLIC; | |
GRANT WRITE ON STAGE CORE.UDF TO APPLICATION ROLE APP_PUBLIC; | |
create or replace procedure PYTHON_FUNCTIONS.sproc_final_model(training_table varchar, split int, app_db varchar, app_sch varchar) | |
returns varchar | |
language python | |
runtime_version = '3.8' | |
packages = ('snowflake-snowpark-python', | |
'scikit-learn', | |
'xgboost', | |
'joblib', | |
'sqlalchemy', | |
'tqdm', | |
'colorlog', | |
'numpy', | |
'pandas') | |
imports = ('/python/sproc_final_model.py') | |
handler = 'sproc_final_model.sproc_final_model' | |
; | |
GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.sproc_final_model(varchar, int, varchar, varchar) TO APPLICATION ROLE APP_PUBLIC; | |
create or replace procedure PYTHON_FUNCTIONS.sproc_deploy_model(training_table varchar, file_path varchar, | |
target_table varchar, append_mode varchar, app_db varchar, app_sch varchar) | |
returns varchar | |
language python | |
runtime_version = '3.8' | |
packages = ('snowflake-snowpark-python','xgboost', 'scikit-learn','pandas','joblib','cachetools') | |
imports = ('/python/sproc_deploy_model.py') | |
handler = 'sproc_deploy_model.main' | |
; | |
GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.sproc_deploy_model(varchar, varchar, varchar, varchar, varchar, varchar) TO APPLICATION ROLE APP_PUBLIC; | |
-- Experiment | |
-- create or replace procedure PYTHON_FUNCTIONS.udf_deploy_model() | |
-- returns varchar | |
-- language python | |
-- runtime_version = '3.8' | |
-- packages = ('snowflake-snowpark-python','xgboost', 'scikit-learn','pandas','joblib','cachetools') | |
-- imports = ('/python/udf_deploy_model.py') | |
-- handler = 'udf_deploy_model.main' | |
-- ; | |
-- GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.udf_deploy_model() TO APPLICATION ROLE APP_PUBLIC; | |
create or replace procedure PYTHON_FUNCTIONS.sproc_optuna_optimized_model( | |
table_name varchar, | |
model_name varchar, | |
n_trials int, app_db varchar, app_sch varchar) | |
returns varchar | |
language python | |
volatile | |
runtime_version = '3.8' | |
imports = ('/python/sproc_optuna_optimized_model.py') | |
packages=('snowflake-snowpark-python', | |
'scikit-learn', | |
'xgboost', | |
'joblib', | |
'sqlalchemy', | |
'tqdm', | |
'colorlog', | |
'numpy', 'optuna','cmaes', | |
'pandas') | |
handler = 'sproc_optuna_optimized_model.sproc_optuna_optimized_model' | |
; | |
GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.sproc_optuna_optimized_model(varchar, varchar, int, varchar, varchar) TO APPLICATION ROLE APP_PUBLIC; | |
-- SPIKE FUNCTIONS | |
create or replace procedure PYTHON_FUNCTIONS.sproc_spike_forecast(training_table varchar, app_db varchar, app_sch varchar) | |
returns varchar | |
language python | |
runtime_version = '3.8' | |
imports=('/python/sproc_spike_forecast.py'), | |
packages = ('snowflake-snowpark-python','xgboost', 'scikit-learn','pandas','joblib','cachetools') | |
handler = 'sproc_spike_forecast.main' | |
; | |
GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.sproc_optuna_optimized_model(varchar, varchar, int, varchar, varchar) TO APPLICATION ROLE APP_PUBLIC; | |
-- Create a stored procedure for backtesting | |
create or replace procedure PYTHON_FUNCTIONS.sproc_optuna_optimized_model_spike( | |
table_name varchar, | |
model_name varchar, | |
n_trials int, app_db varchar, app_sch varchar) | |
returns varchar | |
language python | |
volatile | |
runtime_version = '3.8' | |
imports = ('/python/sproc_optuna_optimized_model_spike.py') | |
packages=('snowflake-snowpark-python', | |
'scikit-learn', | |
'xgboost', | |
'joblib', | |
'sqlalchemy', | |
'tqdm', | |
'colorlog', | |
'numpy','optuna','cmaes', | |
'pandas') | |
handler = 'sproc_optuna_optimized_model_spike.sproc_optuna_optimized_model' | |
; | |
GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.sproc_optuna_optimized_model_spike(varchar, varchar, int, varchar, varchar) TO APPLICATION ROLE APP_PUBLIC; | |
create or replace procedure PYTHON_FUNCTIONS.sproc_final_model_spike(training_table varchar, split int, app_db varchar, app_sch varchar) | |
returns varchar | |
language python | |
runtime_version = '3.8' | |
packages = ('snowflake-snowpark-python', | |
'scikit-learn', | |
'xgboost', | |
'sqlalchemy', | |
'tqdm', | |
'numpy', | |
'pandas', | |
'joblib') | |
imports = ('/python/sproc_final_model_spike.py') | |
handler = 'sproc_final_model_spike.sproc_final_model1' | |
; | |
GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.sproc_final_model_spike(varchar, int, varchar, varchar) TO APPLICATION ROLE APP_PUBLIC; | |
create or replace procedure PYTHON_FUNCTIONS.sproc_deploy_model_spike(training_table varchar, file_path varchar, target_table varchar, append_mode varchar, app_db varchar, app_sch varchar) | |
returns varchar | |
language python | |
runtime_version = '3.8' | |
packages = ('snowflake-snowpark-python','xgboost', 'scikit-learn','pandas','joblib','cachetools') | |
imports = ('/python/sproc_deploy_model_spike.py') | |
handler = 'sproc_deploy_model_spike.main' | |
; | |
GRANT USAGE ON PROCEDURE PYTHON_FUNCTIONS.sproc_deploy_model_spike(varchar, varchar, varchar, varchar, varchar, varchar) TO APPLICATION ROLE APP_PUBLIC; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from cachetools import cached | |
import pandas as pd | |
import snowflake.snowpark | |
import snowflake.snowpark.types as T | |
from snowflake.snowpark.files import SnowflakeFile | |
@cached(cache={}) | |
def load_model(file_path: str) -> object: | |
from joblib import load | |
with SnowflakeFile.open(file_path, 'rb') as f: | |
model = load(f) | |
return model | |
# This local Python-function will be registered as a Stored Procedure and runs in Snowflake | |
def main(sp_session: snowflake.snowpark.Session, | |
training_table: str, file_path: str, target_table: str, append_mode: str, app_db: str, app_sch: str) -> str: | |
data = sp_session.table("{0}.{1}".format(app_db, training_table)).to_pandas() | |
predict_data=data.drop(['DATETIME','RTPRICE_TOMM'],axis=1) | |
scoped_url = sp_session.sql("SELECT BUILD_SCOPED_FILE_URL(@{0}.{2}.ML_MODELS,'/model/{1}');".format(app_db, file_path, app_sch)).collect()[0][0] | |
model = load_model(scoped_url) | |
scored_data = pd.Series(model.predict(predict_data)) | |
data['PREDICTION'] = scored_data.values | |
if append_mode == 'True' or append_mode == 'TRUE': | |
sp_session.write_pandas(data, target_table,database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
else: | |
sp_session.write_pandas(data, target_table,database=app_db, schema=app_sch, auto_create_table=False, overwrite=False) | |
return 'Success' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define a simple scoring function | |
from cachetools import cached | |
import pandas as pd | |
import snowflake.snowpark | |
import snowflake.snowpark.types as T | |
from snowflake.snowpark.files import SnowflakeFile | |
@cached(cache={}) | |
def load_model(file_path: str) -> object: | |
from joblib import load | |
with SnowflakeFile.open(file_path, 'rb') as f: | |
model = load(f) | |
return model | |
# This local Python-function will be registered as a Stored Procedure and runs in Snowflake | |
def main(sp_session: snowflake.snowpark.Session, | |
training_table: str, file_path: str, target_table: str, append_mode: str, app_db:str, app_sch: str) -> str: | |
data = sp_session.table("{0}.{1}".format(app_db, training_table)).to_pandas() | |
predict_data=data.drop(['DATETIME','RTPRICE_TOMM'],axis=1) | |
scoped_url = sp_session.sql("SELECT BUILD_SCOPED_FILE_URL(@{0}.{2}.ML_SPIKE_MODELS,'/spikemodel/{1}');".format(app_db, file_path, app_sch)).collect()[0][0] | |
model = load_model(scoped_url) | |
scored_data = pd.Series(model.predict_proba(predict_data)[:,1]) | |
data['PREDICTION'] = scored_data.values | |
if append_mode == 'True' or append_mode == 'TRUE': | |
sp_session.write_pandas(data, target_table,database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
else: | |
sp_session.write_pandas(data, target_table,database=app_db, schema=app_sch, auto_create_table=False, overwrite=False) | |
return 'Success' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import snowflake.snowpark | |
def sproc_final_model(sp_session: snowflake.snowpark.Session, | |
training_table: str, | |
split: int, | |
app_db: str, app_sch: str) -> str: | |
import xgboost | |
import numpy as np | |
from sklearn.metrics import mean_squared_error | |
import pandas as pd | |
from sklearn.metrics import mean_absolute_error | |
# Loading data into pandas dataframe | |
data = sp_session.table("{0}.{1}.{2}".format(app_db, app_sch,training_table)).to_pandas() | |
datetime=data.DATETIME[split:split+7*24].reset_index(drop=True) | |
data=data.drop('DATETIME',axis=1) | |
hyper_df = sp_session.sql('select * from {0}.{1}.hyper_parameter_output;'.format(app_db, app_sch)).to_pandas() | |
regressor_obj = xgboost.XGBRegressor( | |
n_estimators= int(hyper_df.loc[hyper_df['Parameters'] == 'xgb_n_estimators', 'Value'].iloc[0]), | |
max_depth=int(hyper_df.loc[hyper_df['Parameters'] == 'xgb_max_depth', 'Value'].iloc[0]), | |
subsample=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_subsample', 'Value'].iloc[0]), | |
colsample_bytree=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_colsample_bytree', 'Value'].iloc[0]), | |
min_child_weight=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_min_child_weight', 'Value'].iloc[0]), | |
learning_rate=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_learning_rate', 'Value'].iloc[0]) | |
) | |
score=[] | |
#training and getting scores through Rolling cross val | |
train, valid = data.iloc[:split], data.iloc[split:split+7*24] | |
regressor_obj.fit(train.drop('RTPRICE_TOMM', axis=1),train['RTPRICE_TOMM']) | |
#make predictions | |
predictions=regressor_obj.predict(valid.drop('RTPRICE_TOMM', axis=1)) | |
predictions=[0 if i<0 else i for i in predictions] | |
#MSE and MAE calculations | |
mse=mean_squared_error(predictions,valid.RTPRICE_TOMM) | |
mae=mean_absolute_error(predictions,valid.RTPRICE_TOMM) | |
#organize results and MSE/MAE into dataframe | |
preds=pd.DataFrame({'Predictions': list(predictions), 'Ground_Truth': list(valid.RTPRICE_TOMM), 'Datetime': list(datetime), 'MAE': list(np.repeat(mae,7*24)), 'MSE': list(np.repeat(mse,7*24))}) | |
try: | |
pred_table=sp_session.table('{0}.{1}.PREDICTIONS'.format(app_db, app_sch)).to_pandas() | |
save_it=pd.concat([pred_table, preds]).drop_duplicates(['Datetime']) | |
sp_session.write_pandas(save_it, 'PREDICTIONS', database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
except: | |
sp_session.write_pandas(preds, 'PREDICTIONS', database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
#add metrics to scores | |
score.append(mean_squared_error(predictions,valid.RTPRICE_TOMM)) | |
score.append(mean_absolute_error(predictions,valid.RTPRICE_TOMM)) | |
#save model at each retraining | |
from joblib import dump | |
dump(regressor_obj, '/tmp/'+'forecast_'+f'{datetime[0]}'+'.sav') | |
sp_session.file.put('/tmp/'+'forecast_'+f'{datetime[0]}'+'.sav', '@{0}.{1}.ML_MODELS/model/'.format(app_db, app_sch), auto_compress=False, overwrite=True) | |
return 'Success' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import snowflake.snowpark | |
def sproc_final_model1(sp_session: snowflake.snowpark.Session, | |
training_table: str, | |
split: int, | |
app_db:str, app_sch:str) -> str: | |
import xgboost | |
import numpy as np | |
import pandas as pd | |
from sklearn.metrics import ( | |
roc_auc_score, | |
average_precision_score, | |
precision_score, | |
recall_score, | |
precision_recall_curve, | |
roc_curve | |
) | |
# Loading data into pandas dataframe | |
data = sp_session.table("{0}.{1}.{2}".format(app_db, app_sch,training_table)).to_pandas() | |
datetime=data.DATETIME[split:split+7*24].reset_index(drop=True) | |
data=data.drop('DATETIME',axis=1) | |
hyper_df = sp_session.sql('select * from {0}.{1}.HYPER_PARAMETER_OUTPUT_SPIKE;'.format(app_db, app_sch)).to_pandas() | |
classifier_obj = xgboost.XGBClassifier( | |
n_estimators= int(hyper_df.loc[hyper_df['Parameters'] == 'xgb_n_estimators', 'Value'].iloc[0]), | |
max_depth=int(hyper_df.loc[hyper_df['Parameters'] == 'xgb_max_depth', 'Value'].iloc[0]), | |
subsample=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_subsample', 'Value'].iloc[0]), | |
colsample_bytree=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_colsample_bytree', 'Value'].iloc[0]), | |
min_child_weight=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_min_child_weight', 'Value'].iloc[0]), | |
learning_rate=float(hyper_df.loc[hyper_df['Parameters'] == 'xgb_learning_rate', 'Value'].iloc[0]) | |
) | |
# data from threshold analysis | |
y='RTPRICE_TOMM' | |
scored_sdf = sp_session.sql("SELECT * FROM {0}.{1}.FORECASTED_RESULT;".format(app_db, app_sch)).to_pandas() | |
scored_sdf = scored_sdf[['PREDICTION', y]] | |
y_pred = scored_sdf['PREDICTION'] | |
y_true = scored_sdf[y] | |
fpr, tpr, t = roc_curve(y_true, y_pred) | |
auc = roc_auc_score(y_true, y_pred) | |
p, r, c = precision_recall_curve(y_true, y_pred) | |
f1 = 2*p*r/(p+r) | |
t = c[f1.argmax()] | |
# Prediction | |
score=[] | |
train, valid = data.iloc[:split], data.iloc[split:split+7*24] | |
classifier_obj.fit(train.drop('RTPRICE_TOMM', axis=1),train['RTPRICE_TOMM']) | |
predictions=classifier_obj.predict_proba(valid.drop('RTPRICE_TOMM', axis=1))[:,1] | |
predictions=predictions>t | |
#try except to define AUC score as -1 if it is undefined | |
try: | |
auc = roc_auc_score(valid.RTPRICE_TOMM, predictions) | |
except: | |
auc = -1 | |
#Define the performance metrics | |
ap = average_precision_score(valid.RTPRICE_TOMM, predictions) | |
r=recall_score(valid.RTPRICE_TOMM, predictions) | |
p=precision_score(valid.RTPRICE_TOMM, predictions) | |
preds=pd.DataFrame({'Predictions': list(predictions), 'Ground_Truth': list(valid.RTPRICE_TOMM), 'Datetime': list(datetime), | |
'Precision': list(np.repeat(p,7*24)), 'Recall': list(np.repeat(r,7*24)), 'AUC': list(np.repeat(auc,7*24)), | |
'AP':list(np.repeat(ap,7*24))}) | |
try: | |
pred_table=sp_session.table('{0}.{1}.PREDICTIONS1'.format(app_db, app_sch)).to_pandas() | |
save_it=pd.concat([pred_table, preds]).drop_duplicates(['Datetime']) | |
sp_session.write_pandas(save_it, 'PREDICTIONS1',database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
except: | |
sp_session.write_pandas(preds, 'PREDICTIONS1',database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
score.append(ap) | |
score.append(p) | |
score.append(r) | |
score.append(auc) | |
from joblib import dump | |
dump(classifier_obj, '/tmp/'+'forecast_'+f'{datetime[0]}'+'.sav') | |
sp_session.file.put('/tmp/'+'forecast_'+f'{datetime[0]}'+'.sav', '@{0}.{1}.ML_SPIKE_MODELS/spikemodel/'.format(app_db, app_sch), auto_compress=False, overwrite=True) | |
return 'Success' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import snowflake.snowpark | |
def sproc_optuna_optimized_model(sp_session: snowflake.snowpark.Session, | |
table_name: str, | |
model_name: str, | |
n_trials: int, | |
app_db: str, app_sch: str) -> str: | |
import optuna | |
import sklearn.ensemble | |
import xgboost | |
import numpy as np | |
from sklearn.metrics import mean_squared_error | |
import pandas as pd | |
from sklearn.model_selection import TimeSeriesSplit | |
# Loading data into pandas dataframe | |
train_val = sp_session.table("{0}.{1}.{2}".format(app_db, app_sch,table_name)).to_pandas() | |
train_val=train_val[['2023' not in i for i in train_val.DATETIME]] | |
tscv = TimeSeriesSplit() | |
X=train_val.drop(['RTPRICE_TOMM','DATETIME'], axis=1) | |
y=train_val['RTPRICE_TOMM'] | |
def objective(trial): | |
xgb_n_estimators = trial.suggest_int("xgb_n_estimators", 5, 100, log=True) | |
xgb_max_depth = trial.suggest_int("xgb_max_depth", 2, 32, log=True) | |
xgb_subsample = trial.suggest_float("xgb_subsample", 0.01, 1, log=True) | |
xgb_colsample_bytree = trial.suggest_float("xgb_colsample_bytree", 0.01, 1, log=True) | |
xgb_min_child_weight = trial.suggest_float("xgb_min_child_weight", 1, 75, log=True) | |
xgb_learning_rate = trial.suggest_float("xgb_learning_rate", 0.01, .7, log=True) | |
regressor_obj = xgboost.XGBRegressor( | |
n_estimators= xgb_n_estimators, max_depth=xgb_max_depth, subsample=xgb_subsample, | |
colsample_bytree=xgb_colsample_bytree, min_child_weight=xgb_min_child_weight, learning_rate=xgb_learning_rate | |
) | |
score=[] | |
for train_index, valid_index in tscv.split(train_val): | |
train, valid = train_val.iloc[train_index], train_val.iloc[valid_index] | |
regressor_obj.fit(train.drop(['RTPRICE_TOMM','DATETIME'], axis=1),train['RTPRICE_TOMM'] ) | |
predictions=regressor_obj.predict(valid.drop(['RTPRICE_TOMM','DATETIME'], axis=1)) | |
score.append(mean_squared_error(predictions,valid.RTPRICE_TOMM)) | |
score = sum(score)/len(score) | |
trial.set_user_attr(key="best_booster", value=regressor_obj) | |
return score | |
# Callback to get best model | |
def callback(study, trial): | |
if study.best_trial.number == trial.number: | |
study.set_user_attr(key="best_booster", value=trial.user_attrs["best_booster"]) | |
# Start Optimizing | |
study = optuna.create_study(direction="minimize") | |
study.optimize(objective, n_trials=n_trials, callbacks=[callback]) | |
# Fit best model on data | |
best_model=study.user_attrs["best_booster"] | |
best_model.fit(X.values, y.values) | |
# Save model as file and upload to Snowflake stage | |
from joblib import dump | |
dump(best_model, '/tmp/'+model_name) | |
sp_session.file.put('/tmp/'+model_name, '@{0}.{1}.ML_MODELS'.format(app_db, app_sch), auto_compress=False, overwrite=True) | |
output = study.best_trial.params | |
df_out = pd.DataFrame(output.items(), columns=['Parameters', 'Value']) | |
sp_session.write_pandas(df_out, 'HYPER_PARAMETER_OUTPUT',database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
return 'Success' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import snowflake.snowpark | |
def sproc_optuna_optimized_model(sp_session: snowflake.snowpark.Session, | |
table_name: str, | |
model_name: str, | |
n_trials: int, app_db: str, app_sch: str) -> str: | |
import optuna | |
import sklearn.ensemble | |
import xgboost | |
import numpy as np | |
from sklearn.metrics import average_precision_score | |
import pandas as pd | |
from sklearn.model_selection import TimeSeriesSplit | |
# Loading data into pandas dataframe | |
train_val = sp_session.table("{0}.{1}.{2}".format(app_db, app_sch,table_name)).to_pandas() | |
train_val=train_val[['2023' not in i for i in train_val.DATETIME]] | |
tscv = TimeSeriesSplit() | |
X=train_val.drop(['RTPRICE_TOMM','DATETIME'], axis=1) | |
y=train_val['RTPRICE_TOMM'] | |
def objective(trial): | |
xgb_n_estimators = trial.suggest_int("xgb_n_estimators", 5, 100, log=True) | |
xgb_max_depth = trial.suggest_int("xgb_max_depth", 2, 32, log=True) | |
xgb_subsample = trial.suggest_float("xgb_subsample", 0.01, 1, log=True) | |
xgb_colsample_bytree = trial.suggest_float("xgb_colsample_bytree", 0.01, 1, log=True) | |
xgb_min_child_weight = trial.suggest_float("xgb_min_child_weight", 1, 75, log=True) | |
xgb_learning_rate = trial.suggest_float("xgb_learning_rate", 0.01, .7, log=True) | |
classifier_obj = xgboost.XGBClassifier( | |
n_estimators= xgb_n_estimators, max_depth=xgb_max_depth, subsample=xgb_subsample, | |
colsample_bytree=xgb_colsample_bytree, min_child_weight=xgb_min_child_weight, learning_rate=xgb_learning_rate | |
) | |
score=[] | |
for train_index, valid_index in tscv.split(train_val): | |
train, valid = train_val.iloc[train_index], train_val.iloc[valid_index] | |
classifier_obj.fit(train.drop(['RTPRICE_TOMM','DATETIME'], axis=1),train['RTPRICE_TOMM'] ) | |
predictions=classifier_obj.predict(valid.drop(['RTPRICE_TOMM','DATETIME'], axis=1)) | |
score.append(average_precision_score(predictions,valid.RTPRICE_TOMM)) | |
score = sum(score)/len(score) | |
trial.set_user_attr(key="best_booster", value=classifier_obj) | |
return score | |
# Callback to get best model | |
def callback(study, trial): | |
if study.best_trial.number == trial.number: | |
study.set_user_attr(key="best_booster", value=trial.user_attrs["best_booster"]) | |
# Start Optimizing | |
study = optuna.create_study(direction="maximize") | |
study.optimize(objective, n_trials=n_trials, callbacks=[callback]) | |
# Fit best model on data | |
best_model=study.user_attrs["best_booster"] | |
best_model.fit(X.values, y.values) | |
# Save model as file and upload to Snowflake stage | |
from joblib import dump | |
dump(best_model, '/tmp/'+model_name) | |
sp_session.file.put('/tmp/'+model_name, '@{0}.{1}.ML_SPIKE_MODELS'.format(app_db, app_sch), auto_compress=False, overwrite=True) | |
output = study.best_trial.params | |
df_out = pd.DataFrame(output.items(), columns=['Parameters', 'Value']) | |
sp_session.write_pandas(df_out, 'HYPER_PARAMETER_OUTPUT_SPIKE',database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
return 'Success' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define a simple scoring function | |
from cachetools import cached | |
import pandas as pd | |
import snowflake.snowpark | |
import snowflake.snowpark.types as T | |
from snowflake.snowpark.files import SnowflakeFile | |
def main(sp_session: snowflake.snowpark.Session, | |
training_table: str, app_db: str, app_sch:str) -> str: | |
@cached(cache={}) | |
def load_model(file_path: str) -> object: | |
from joblib import load | |
with SnowflakeFile.open(file_path, 'rb') as f: | |
model = load(f) | |
return model | |
import pandas as pd | |
import xgboost | |
model_name = 'optuna_model_s.sav' | |
scoped_url = sp_session.sql("SELECT BUILD_SCOPED_FILE_URL(@{1}.{2}.ML_SPIKE_MODELS,'{0}');".format(model_name, app_db, app_sch)).collect()[0][0] | |
model = load_model(scoped_url) | |
test = sp_session.sql("SELECT * FROM {1}.{2}.{0}".format(training_table, app_db, app_sch)).to_pandas() | |
test = test[['2021' in i for i in test.DATETIME]] | |
sp_session.write_pandas(test, 'TEST', database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
test=sp_session.sql("SELECT * FROM {0}.{1}.{2}".format(app_db, app_sch, 'TEST')).to_pandas() | |
predict_data=sp_session.table("{0}.{1}.{2}".format(app_db, app_sch, 'TEST')).drop(['RTPRICE_TOMM','DATETIME']) | |
predict_data = predict_data.to_pandas() | |
scored_data = pd.Series(model.predict_proba(predict_data)[:,1]) | |
test['PREDICTION'] = scored_data.values | |
sp_session.write_pandas(test, 'FORECASTED_RESULT', database=app_db, schema=app_sch, auto_create_table=True, overwrite=True) | |
return 'Success' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment