Skip to content

Instantly share code, notes, and snippets.

@ryanjdillon
Last active November 4, 2019 07:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryanjdillon/a5ac4f1fb9fe66ed5f0e7cb7b705f13c to your computer and use it in GitHub Desktop.
Save ryanjdillon/a5ac4f1fb9fe66ed5f0e7cb7b705f13c to your computer and use it in GitHub Desktop.
Load test AzureML Workspace handling of MLflow logging calls -- results in rate limit errors
"""
Load test MLFlow logging to AzureML Workspace (as a tracking backend)
Warning: Azure charges apply!
The following environment variables must be set:
* SUBSCRIPTION_ID - The Azure Subscription ID the Workspace is created under
* RESOURCE_GROUP - The Azure ResourceGroup ID the Workspace is created under
* WORKSPACE_NAME - The name of the AzureML Worskpace to log to
"""
from typing import Callable, List
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core import Workspace
import concurrent.futures
import logging
import mlflow
from mlflow.entities import Metric, Param
import os
import pendulum
import random
import time
import uuid
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel("DEBUG")
def get_tracking_client(
auth: InteractiveLoginAuthentication
) -> mlflow.tracking.MlflowClient:
"""
Set remote tracking URI for mlflow to AzureML workspace
Returns
-------
client: mlflow.tracking.MlflowClient
Client with tracking uri set to AzureML
"""
logging.info("Create AzureML Workspace object")
subscription_id = os.getenv("SUBSCRIPTION_ID")
resource_group = os.getenv("RESOURCE_GROUP")
workspace_name = os.getenv("WORKSPACE_NAME")
ws = Workspace(subscription_id, resource_group, workspace_name, auth=auth)
logging.info("Creating MLflow tracking client with AzureML Workspace tracking URI")
return mlflow.tracking.MlflowClient(ws.get_mlflow_tracking_uri())
def new_experiment(client: mlflow.tracking.MlflowClient) -> str:
"""
Create a new experiment with a random name on the remote backend
Parameters
----------
client: mlflow.tracking.MlflowClient
Client with tracking uri set to AzureML.
Returns
-------
experiment_id: str
Experiment ID of new experiment.
"""
return client.create_experiment(f"test_{uuid.uuid4()}")
def delete_experiments(client: mlflow.tracking.MlflowClient, prefix: str = "test_"):
"""
Delete experiments beginning with provided prefix
Parameters
----------
client: mlflow.tracking.MlflowClient
Client with tracking uri set to AzureML.
prefix: str
Prefix of experiment names to delete.
"""
experiments = client.list_experiments()
for exp in experiments:
if exp.name.startswith(prefix):
client.delete_experiment(exp.experiment_id)
def log_single_build(client: mlflow.tracking.MlflowClient, run_id: str):
"""
Make a series of log calls representing a single model build
Parameters
----------
client: mlflow.tracking.MlflowClient
Client with tracking uri set to AzureML.
run_id: str
The unique ID of the run to log to.
"""
def random_float():
return random.random()
def random_int():
return round(random.random() * 10)
# Random wait between 0-5s
time.sleep(random.random() * 5)
name = f"rowfilter-drop-column-ff{random_int()}"
model_id = "".join([str(random_int()) for _ in range(15)])
start_date = "2017-{}-{}T00:00:00+00:00:00".format(random_int, random_int)
end_date = "2018-{}-{}T00:00:00+00:00:00".format(random_int, random_int)
model_config = f"""
gordo_components.model.anomaly.diff.DiffBasedAnomalyDetector:
base_estimator:
sklearn.compose.TransformedTargetRegressor:
transformer: sklearn.preprocessing.data.MinMaxScaler
regressor:
sklearn.pipeline.Pipeline:
steps:
- sklearn.compose.ColumnTransformer:
transformers:
- - dropper # Name of this transformer
- drop # Action to perform
- TRA-35TT8567.PV # Column to apply this action to.
remainder: passthrough # What do do with the rest
- sklearn.preprocessing.data.MinMaxScaler
- gordo_components.model.models.KerasAutoEncoder:
kind: feedforward_hourglass
"""
dataset_config = f"""
tags:
- TRA-35TT856{random_int()}.PV
- TRA-35TT856{random_int()}.PV
- TRA-35TT856{random_int()}.PV
- TRA-35TT856{random_int()}.PV
target_tag_list:
- TRA-35TT8566.PV
- TRA-35TT8568.PV
- TRA-35TT8569.PV
train_end_date: '{start_date}'
train_start_date: '{end_date}'
row_filter: "(`TRA-35TT8567.PV` > 30) & (`TRA-35TT8567.PV` < 40)"
type: TimeSeriesDataset
"""
metadata = """
information: 'Use row filtering and dropping the column inside the pipeline'
"""
evaluation_config = """
cv_mode: full_build
"""
params = [
(f"name", name),
(f"model_id", model_id),
(f"start_date", start_date),
(f"end_date", end_date),
(f"model_config", model_config),
(f"dataset_config", dataset_config),
(f"metadata", metadata),
(f"evaluation_config", evaluation_config),
]
metrics = [
("dataset_duration", random_float()),
("training_duration", random_float()),
("mean_squared_errors", random_float()),
("explained_variance", random_float()),
("r_2", random_float()),
("variable_threshold", random_float()),
]
# Create MLflow logging instances for batch log
# Metric timestamp must be in milliseconds since Unix epoch
now = round(pendulum.now("utc").float_timestamp) * 1000
metrics = [Metric(k, v, timestamp=now, step=0) for k, v in metrics]
params = [Param(k, v) for k, v in params]
client.log_batch(run_id, metrics=metrics, params=params)
def log_project_builds(
auth: InteractiveLoginAuthentication, n_deployments: int = 1
) -> float:
"""
Perfom a run(s) to AzureML Workspace for a given machine/project
A new client is created before each series of runs for handling the mlflow
client in a thread safe manner. Multiple runs would correspond to runs from
subsequent deployments for the same machine/project.
If a rate-limit warning is received this threads logging pauses for 60s, as
per the Azure request response's suggestions.
Parameters
----------
auth: azureml.core.authentication.InteractiveLoginAuthentication
Auth object for generating new clients.
n_deployments: int
Number of runs with associated logs to generated.
Returns
-------
duration: float
Duration of mlflow "run".
"""
t0 = pendulum.now("utc")
client = get_tracking_client(auth)
experiment_id = new_experiment(client)
for _ in range(n_deployments):
run_id = client.create_run(
experiment_id, tags={"model_key": str(uuid.uuid4())}
).info.run_id
log_submitted = False
while not log_submitted:
try:
log_single_build(client, run_id)
except Warning:
time.sleep(60)
else:
log_submitted = True
# Manually set run as finished
client.set_terminated(run_id)
del client
return (pendulum.now("utc") - t0).total_seconds()
def marauder(
n_threads: int, n_calls: int, func: Callable, *args, **kwargs: dict
) -> List[int]:
"""
Perform threaded execution of a function
Parameters
----------
n_threads: int
Number of threads in to launch all of the calls.
n_calls: int
Number of calls to 'func'.
func: Callable
The function called by each marauder.
*args: dict
Arguments to func.
**kwargs: dict
Keyword arguments to func.
Returns
-------
durations_sec: List[int]
"""
logger.info(f"{pendulum.now('utc')} ; Maraud initiated")
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
future_to_call = {
executor.submit(func, *args, **kwargs): call for call in range(n_calls)
}
for future in concurrent.futures.as_completed(future_to_call):
durations_sec = list()
try:
call = future_to_call[future]
durations_sec.append(future.result())
logger.info(f"{pendulum.now('utc')} ; Marauder {call} succeeded!!")
except Exception as exc:
logger.exception(exc)
return durations_sec
if __name__ == "__main__":
# Authentication is handled here and passed to all threads, as to only login once for the session.
auth = InteractiveLoginAuthentication(force=True)
t0 = pendulum.now("utc")
n_threads = 100
n_builds = 1000
n_deployments = 2
durations = marauder(
n_threads, n_builds, log_project_builds, auth, n_deployments=n_deployments
)
logger.info(
f"Total duration of {(pendulum.now('utc') - t0).total_seconds()}s for "
f"{n_deployments} deployments of {n_builds} builds, processed on {n_threads} "
"threads"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment