Skip to content

Instantly share code, notes, and snippets.

@Guillaume-Docquier
Last active January 7, 2021 16:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Guillaume-Docquier/ab8631a077e88ca68cb32ed125a2626a to your computer and use it in GitHub Desktop.
Save Guillaume-Docquier/ab8631a077e88ca68cb32ed125a2626a to your computer and use it in GitHub Desktop.
Oddity
import azure.functions as func
import json
from .cost_spike_detector import detect_cost_spikes
def main(req: func.HttpRequest) -> func.HttpResponse:
req_body = req.get_json()
date_costs = req_body.get('dateCosts')
cost_spikes = detect_cost_spikes(date_costs)
response = json.dumps({"spikes": cost_spikes})
return func.HttpResponse(response)
import tensorflow as tf
import numpy as np
import pandas as pd
import os
scriptpath = os.path.abspath(__file__)
scriptdir = os.path.dirname(scriptpath)
modelFolderPath = os.path.join(scriptdir, 'model/')
MODEL = None
MODEL_INPUT_SIZE = 84
STD_SPAN = 31
STD_MIN_PERIODS = 1
STD_MIN_VALUE = 0.5
STD_TOLERANCE_FACTOR = 3
def _initialize():
global MODEL
if MODEL is None:
MODEL = tf.keras.models.load_model(modelFolderPath)
def detect_cost_spikes(date_costs):
_initialize()
date_costs = np.array([[date_cost["date"], date_cost["cost"]] for date_cost in date_costs])
dates = np.array(date_costs[:, 0])
costs = np.array(date_costs[:, 1]).astype(np.float)
predictions, actual_dates, actual_costs = generate_predictions(dates, costs)
cost_spikes_mask, errors, stds = compute_cost_spikes_mask(predictions, actual_costs)
cost_spikes = create_cost_spikes(cost_spikes_mask, actual_dates, actual_costs, errors, stds)
return cost_spikes
def generate_predictions(dates, costs):
normalized_costs = normalize(costs)
subsequences = make_subsequences(normalized_costs, MODEL_INPUT_SIZE)[:-1, :] # The last subsequence will not have an actual cost to compare so we drop it
predictions = MODEL.predict(subsequences, len(subsequences))[:, 1] # The model predicts 4 days, we only care about the first
predictions = denormalize(predictions)
actual_dates = dates[MODEL_INPUT_SIZE:]
actual_costs = costs[MODEL_INPUT_SIZE:]
return predictions, actual_dates, actual_costs
def normalize(costs):
return np.log(costs + 1)
def denormalize(costs):
return np.exp(costs) - 1
def make_subsequences(data, subsequence_size):
"""
Create subsequences of subsequence_size with the array
Example
-------
>>> make_subsequences(np.array([1, 2, 3, 4]), 2)
array([
[1, 2],
[2, 3],
[3, 4],
])
"""
number_of_subsequences = data.shape[0] - subsequence_size + 1
return np.array([data[index:subsequence_size+index] for index in range(number_of_subsequences)])
def compute_cost_spikes_mask(predictions, actual_costs):
errors = abs(predictions - actual_costs)
stds = pd.DataFrame(errors)\
.transform(lambda x: x.ewm(span=STD_SPAN, min_periods=STD_MIN_PERIODS).std())\
.values\
.flatten()
stds = np.nan_to_num(stds)
adjusted_stds = np.where(stds < STD_MIN_VALUE, STD_MIN_VALUE, stds)
cost_spikes_mask = np.where(errors > STD_TOLERANCE_FACTOR * adjusted_stds, True, False)
return cost_spikes_mask, errors, adjusted_stds
def create_cost_spikes(cost_spikes_mask, dates, costs, errors, stds):
cost_spikes_dates = dates[cost_spikes_mask]
cost_spikes_costs = costs[cost_spikes_mask]
cost_spikes_errors = errors[cost_spikes_mask]
cost_spikes_stds = stds[cost_spikes_mask]
cost_spikes_data = zip(cost_spikes_dates, cost_spikes_costs, cost_spikes_errors, cost_spikes_stds)
cost_spikes = [{"date": date, "cost": cost, "error": error, "std": std} for date, cost, error, std in cost_spikes_data]
return cost_spikes
adjusted_stds = np.where(stds < STD_MIN_VALUE, STD_MIN_VALUE, stds)
def compute_cost_spikes_mask(predictions, actual_costs):
errors = abs(predictions - actual_costs)
stds = pd.DataFrame(errors)\
.transform(lambda x: x.ewm(span=STD_SPAN, min_periods=STD_MIN_PERIODS).std())\
.values\
.flatten()
stds = np.nan_to_num(stds)
adjusted_stds = np.where(stds < STD_MIN_VALUE, STD_MIN_VALUE, stds)
cost_spikes_mask = np.where(errors > STD_TOLERANCE_FACTOR * adjusted_stds, True, False)
return cost_spikes_mask, errors, adjusted_stds
def create_cost_spikes(cost_spikes_mask, dates, costs, errors, stds):
cost_spikes_dates = dates[cost_spikes_mask]
cost_spikes_costs = costs[cost_spikes_mask]
cost_spikes_errors = errors[cost_spikes_mask]
cost_spikes_stds = stds[cost_spikes_mask]
cost_spikes_data = zip(cost_spikes_dates, cost_spikes_costs, cost_spikes_errors, cost_spikes_stds)
cost_spikes = [{"date": date, "cost": cost, "error": error, "std": std} for date, cost, error, std in cost_spikes_data]
return cost_spikes
def detect_cost_spikes(date_costs):
_initialize()
date_costs = np.array([[date_cost["date"], date_cost["cost"]] for date_cost in date_costs])
dates = np.array(date_costs[:, 0])
costs = np.array(date_costs[:, 1]).astype(np.float)
predictions, actual_dates, actual_costs = generate_predictions(dates, costs)
cost_spikes_mask, errors, stds = compute_cost_spikes_mask(predictions, actual_costs)
cost_spikes = create_cost_spikes(cost_spikes_mask, actual_dates, actual_costs, errors, stds)
return cost_spikes
def generate_predictions(dates, costs):
normalized_costs = normalize(costs)
subsequences = make_subsequences(normalized_costs, MODEL_INPUT_SIZE)[:-1, :] # The last subsequence will not have an actual cost to compare so we drop it
predictions = MODEL.predict(subsequences, len(subsequences))[:, 1] # The model predicts 4 days, we only care about the first
predictions = denormalize(predictions)
actual_dates = dates[MODEL_INPUT_SIZE:]
actual_costs = costs[MODEL_INPUT_SIZE:]
return predictions, actual_dates, actual_costs
def _initialize():
global MODEL
if MODEL is None:
MODEL = tf.keras.models.load_model(modelFolderPath)
date_costs = np.array([[date_cost["date"], date_cost["cost"]] for date_cost in date_costs])
dates = np.array(date_costs[:, 0])
costs = np.array(date_costs[:, 1]).astype(np.float)
normalized_costs = normalize(costs)
subsequences = make_subsequences(normalized_costs, MODEL_INPUT_SIZE)[:-1, :]
predictions = MODEL.predict(subsequences, len(subsequences))[:, 1]
predictions = denormalize(predictions)
errors = abs(predictions - actual_costs)
stds = pd.DataFrame(errors)\
.transform(lambda x: x.ewm(span=STD_SPAN, min_periods=STD_MIN_PERIODS).std())\
.values\
.flatten()
stds = np.nan_to_num(stds)
params = {
'model_name': 'cnn_oddity',
'minimum_seq_length_buffer': 30,
'batch_size': 10000,
'epochs': 50,
'input_window': 84,
'output_window': 4,
'learning_rate': 0.00005,
'loss': 'mse',
'optimizer': 'adam',
}
def cnn_model(input_window, output_window):
inputs = Input(shape=(input_window, ), name='input-layer')
x = Reshape((input_window, 1))(inputs)
x = Conv1D(128, kernel_size=2)(x)
x = ReLU()(x)
x = BatchNormalization()(x)
x = Conv1D(64, kernel_size=3)(x)
x = ReLU()(x)
x = BatchNormalization()(x)
x = Conv1D(32, kernel_size=3)(x)
x = ReLU()(x)
x = BatchNormalization()(x)
x = Conv1D(32, kernel_size=3)(x)
x = ReLU()(x)
x = BatchNormalization()(x)
x = Flatten()(x)
x = Dense(128)(x)
x = ReLU()(x)
outputs = Dense(output_window, activation='relu', name='output-layer')(x)
return Model(inputs=inputs, outputs=outputs)
func azure functionapp publish $(AzureFunctionName) --build remote
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment