Created
July 25, 2022 22:59
-
-
Save jaeyow/f6b595f1ebdb8f7d364d3478c9a50d17 to your computer and use it in GitHub Desktop.
Comet ML and Metaflow - how to handle parallel writes to single experiment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from metaflow import FlowSpec, step, current | |
from comet_ml import API, Experiment | |
import os | |
import random | |
try: | |
from dotenv import load_dotenv | |
load_dotenv(verbose=True, dotenv_path='.env') | |
except: | |
print("No dotenv package") | |
class MyPipeline(FlowSpec): | |
@step | |
def start(self): | |
""" | |
Initialization, place everything init related here, check that everything is | |
in order like environment variables, connection strings, etc, and if there are | |
any issues, fail fast here, now. | |
""" | |
assert os.environ['COMET_API_KEY'] | |
self.next( | |
self.train_model1, | |
self.train_model2, | |
self.train_model3, | |
self.train_model4) | |
@step | |
def train_model1(self): | |
""" | |
Train Models in Parallel | |
""" | |
self.step_name = current.step_name | |
comet_exp = Experiment( | |
api_key=os.environ['COMET_API_KEY'], | |
project_name="my-project", | |
workspace="jaeyow", | |
) | |
comet_exp.log_metric(f'Alg1_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def train_model2(self): | |
""" | |
Train Models in Parallel | |
""" | |
comet_exp = Experiment( | |
api_key=os.environ['COMET_API_KEY'], | |
project_name="my-project", | |
workspace="jaeyow", | |
) | |
comet_exp.log_metric(f'Alg2_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def train_model3(self): | |
""" | |
Train Models in Parallel | |
""" | |
comet_exp = Experiment( | |
api_key=os.environ['COMET_API_KEY'], | |
project_name="my-project", | |
workspace="jaeyow", | |
) | |
comet_exp.log_metric(f'Alg3_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def train_model4(self): | |
""" | |
Train Models in Parallel | |
""" | |
comet_exp = Experiment( | |
api_key=os.environ['COMET_API_KEY'], | |
project_name="my-project", | |
workspace="jaeyow", | |
) | |
comet_exp.log_metric(f'Alg4_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def test_model_join(self, join_inputs): | |
""" | |
Join our parallel model training branches | |
""" | |
self.next(self.deploy_winning_model) | |
@step | |
def deploy_winning_model(self): | |
""" | |
Deploy winning model to public API | |
""" | |
self.next(self.end) | |
@step | |
def end(self): | |
""" | |
This is the 'end' step. All flows must have an 'end' step, which is the | |
last step in the flow. | |
""" | |
if __name__ == "__main__": | |
MyPipeline() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from metaflow import FlowSpec, step, current | |
from comet_ml import API, Experiment | |
import os | |
import random | |
try: | |
from dotenv import load_dotenv | |
load_dotenv(verbose=True, dotenv_path='.env') | |
except: | |
print("No dotenv package") | |
class MyPipeline(FlowSpec): | |
@step | |
def start(self): | |
""" | |
Initialization, place everything init related here, check that everything is | |
in order like environment variables, connection strings, etc, and if there are | |
any issues, fail fast here, now. | |
""" | |
assert os.environ['COMET_API_KEY'] | |
comet_exp = Experiment( | |
api_key=os.environ['COMET_API_KEY'], | |
project_name="my-project", | |
workspace="jaeyow", | |
) | |
self.comet_experiment_key = comet_exp.get_key() # this is a string so it's safe to pickle | |
self.next( | |
self.train_model1, | |
self.train_model2, | |
self.train_model3, | |
self.train_model4) | |
@step | |
def train_model1(self): | |
""" | |
Train Models in Parallel | |
""" | |
self.step_name = current.step_name | |
comet_exp = API().get_experiment_by_key(self.comet_experiment_key) | |
comet_exp.log_metric(f'Alg1_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def train_model2(self): | |
""" | |
Train Models in Parallel | |
""" | |
comet_exp = API().get_experiment_by_key(self.comet_experiment_key) | |
comet_exp.log_metric(f'Alg2_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def train_model3(self): | |
""" | |
Train Models in Parallel | |
""" | |
comet_exp = API().get_experiment_by_key(self.comet_experiment_key) | |
comet_exp.log_metric(f'Alg3_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def train_model4(self): | |
""" | |
Train Models in Parallel | |
""" | |
comet_exp = API().get_experiment_by_key(self.comet_experiment_key) | |
comet_exp.log_metric(f'Alg4_train_time', random.randint(1, 99)) | |
self.next(self.test_model_join) | |
@step | |
def test_model_join(self, join_inputs): | |
""" | |
Join our parallel model training branches | |
""" | |
self.next(self.deploy_winning_model) | |
@step | |
def deploy_winning_model(self): | |
""" | |
Deploy winning model to public API | |
""" | |
self.next(self.end) | |
@step | |
def end(self): | |
""" | |
This is the 'end' step. All flows must have an 'end' step, which is the | |
last step in the flow. | |
""" | |
if __name__ == "__main__": | |
MyPipeline() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment