Skip to content

Instantly share code, notes, and snippets.

@lauralorenz
Created May 5, 2021 16:20
Show Gist options
  • Save lauralorenz/ecedeea5e9c7f58edbb3bba7c8d00ce0 to your computer and use it in GitHub Desktop.
Save lauralorenz/ecedeea5e9c7f58edbb3bba7c8d00ce0 to your computer and use it in GitHub Desktop.
Prefect on Data Science flows
import tempfile
import prefect
from prefect.storage import S3
from prefect import task, Flow, Parameter
from prefect.engine.serializers import Serializer
from prefect.engine.results import S3Result
from yellowbrick.datasets import load_mushroom
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from yellowbrick.classifier import ClassificationReport
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
from sklearn.svm import LinearSVC, NuSVC, SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier
MARKDOWN = """# {estimator_name}
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}">
## Model Location: {model_location}
"""
class NoOpSerializer(Serializer):
"""A `Serializer` that does nothing."""
def serialize(self, value):
return value
def deserialize(self, value):
return value
@task(result=S3Result(bucket="visualizer-demo-models", location="{flow_run_name}/{task_run_id}"))
def visualize_model(estimator, **kwargs):
"""
Test various estimators.
"""
X, y = load_mushroom()
y = LabelEncoder().fit_transform(y)
model = Pipeline([
('one_hot_encoder', OneHotEncoder()),
('estimator', estimator)
])
# Instantiate the classification model and visualizer
visualizer = ClassificationReport(
model, classes=['edible', 'poisonous'],
cmap="YlGn", size=(600, 360), **kwargs
)
visualizer.fit(X, y)
visualizer.score(X, y)
tmp = tempfile.NamedTemporaryFile(suffix=".png")
visualizer.show(outpath=tmp, clear_figure=True)
tmp.seek(0)
visualizer_result = S3Result(bucket="visualizer-demo-images", location="{flow_run_name}/"+estimator.__class__.__name__+".png", serializer=NoOpSerializer())
result = visualizer_result.write(tmp.read(), **prefect.context)
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__,
image=result.location, model_location="{flow_run_name}/{task_run_id}".format(**prefect.context)))
return model
@task
def map_model_parameters(**kwargs):
model_choices = [
SVC(gamma=kwargs['svc_gamma']), NuSVC(gamma=kwargs['nusvc_gamma']), LinearSVC(),
SGDClassifier(max_iter=kwargs['sgdclassifier_maxiter'], tol=1e-3), KNeighborsClassifier(),
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3),
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300),
RandomForestClassifier(n_estimators=300)
]
return model_choices
flow = Flow("compute-estimator-flow", storage=S3(bucket="visualizer-demo-flows", stored_as_script=True, local_script_path="./flow.py"))
with flow:
# Parameters
svc_gamma = Parameter('svc_gamma', default='auto')
nusvc_gamma = Parameter('nusvc_gamma', default='auto')
sgdclassifier_maxiter = Parameter('sgdclassifier max_iter', default=100)
model_choices = map_model_parameters(svc_gamma=svc_gamma, nusvc_gamma=nusvc_gamma, sgdclassifier_maxiter=sgdclassifier_maxiter)
visualizers = visualize_model.map(model_choices)
from prefect.run_configs import ECSRun
flow.run_config = ECSRun(
run_task_kwargs={
"cluster":"prefect-demo-cluster",
"networkConfiguration": {'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
},
task_role_arn="arn:aws:iam::{{your project here}}:role/prefect-demo-fargate-task-role",
image='public.ecr.aws/i0y3l0j3/visualizer-demo-images:latest')
if __name__ == "__main__":
flow.register("data-science-flows")
import tempfile
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.serializers import Serializer
from prefect.engine.results import S3Result
from yellowbrick.datasets import load_mushroom
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from yellowbrick.classifier import ClassificationReport
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
from sklearn.svm import LinearSVC, NuSVC, SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier
MARKDOWN = """# {estimator_name}
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}">
"""
class NoOpSerializer(Serializer):
"""A `Serializer` that does nothing."""
def serialize(self, value):
return value
def deserialize(self, value):
return value
@task
def visualize_model(estimator, **kwargs):
"""
Test various estimators.
"""
X, y = load_mushroom()
y = LabelEncoder().fit_transform(y)
model = Pipeline([
('one_hot_encoder', OneHotEncoder()),
('estimator', estimator)
])
# Instantiate the classification model and visualizer
visualizer = ClassificationReport(
model, classes=['edible', 'poisonous'],
cmap="YlGn", size=(600, 360), **kwargs
)
visualizer.fit(X, y)
visualizer.score(X, y)
# Save the visualization using the Result API
tmp = tempfile.NamedTemporaryFile(suffix=".png")
visualizer.show(outpath=tmp, clear_figure=True)
tmp.seek(0)
visualizer_result = S3Result(bucket="visualizer-demo-images",
location="{flow_run_name}/"+estimator.__class__.__name__+".png",
serializer=NoOpSerializer())
result = visualizer_result.write(tmp.read(), **prefect.context)
# Save the report for this task run with the Artifacts API
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__,
image=result.location))
with Flow("estimator-flow") as f:
model_choices = [
SVC(gamma='auto'), NuSVC(gamma='auto'), LinearSVC(),
SGDClassifier(max_iter=100, tol=1e-3), KNeighborsClassifier(),
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3),
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300),
RandomForestClassifier(n_estimators=300)
]
visualizers = visualize_model.map(model_choices)
f.register("data-science-flows")
import tempfile
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.serializers import Serializer
from prefect.engine.results import S3Result
from yellowbrick.datasets import load_mushroom
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
from yellowbrick.classifier import ClassificationReport
from sklearn.svm import LinearSVC, NuSVC, SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier
MARKDOWN = """# {estimator_name}
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}">
"""
class NoOpSerializer(Serializer):
"""A `Serializer` that does nothing."""
def serialize(self, value):
return value
def deserialize(self, value):
return value
@task
def visualize_model(estimator, **kwargs):
"""
Test various estimators.
"""
X, y = load_mushroom()
y = LabelEncoder().fit_transform(y)
model = Pipeline([
('one_hot_encoder', OneHotEncoder()),
('estimator', estimator)
])
# Instantiate the classification model and visualizer
visualizer = ClassificationReport(
model, classes=['edible', 'poisonous'],
cmap="YlGn", size=(600, 360), **kwargs
)
visualizer.fit(X, y)
visualizer.score(X, y)
# Save the visualization using the Result API
tmp = tempfile.NamedTemporaryFile(suffix=".png")
visualizer.show(outpath=tmp, clear_figure=True)
tmp.seek(0)
visualizer_result = S3Result(bucket="visualizer-demo-images", location="{flow_run_name}/"+estimator.__class__.__name__+".png", serializer=NoOpSerializer())
result = visualizer_result.write(tmp.read(), **prefect.context)
# Save the report for this task run with the Artifacts API
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__,
image=result.location))
return model
@task
def map_model_parameters(**kwargs):
model_choices = [
SVC(gamma=kwargs['svc_gamma']), NuSVC(gamma=kwargs['nusvc_gamma']), LinearSVC(),
SGDClassifier(max_iter=kwargs['sgdclassifier_maxiter'], tol=1e-3), KNeighborsClassifier(),
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3),
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300),
RandomForestClassifier(n_estimators=300)
]
return model_choices
with Flow("estimator-flow-parameterized") as f:
# Parameters
svc_gamma = Parameter('svc_gamma', default='auto')
nusvc_gamma = Parameter('nusvc_gamma', default='auto')
sgdclassifier_maxiter = Parameter('sgdclassifier max_iter', default=100)
model_choices = map_model_parameters(svc_gamma=svc_gamma, nusvc_gamma=nusvc_gamma, sgdclassifier_maxiter=sgdclassifier_maxiter)
visualizers = visualize_model.map(model_choices)
f.register("data-science-flows")
import tempfile
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.serializers import Serializer
from prefect.engine.results import S3Result
from yellowbrick.datasets import load_mushroom
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from yellowbrick.classifier import ClassificationReport
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
from sklearn.svm import LinearSVC, NuSVC, SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier
MARKDOWN = """# {estimator_name}
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}">
## Model Location: {model_location}
"""
class NoOpSerializer(Serializer):
"""A `Serializer` that does nothing."""
def serialize(self, value):
return value
def deserialize(self, value):
return value
@task(result=S3Result(bucket="visualizer-demo-models", location="{flow_run_name}/{task_run_id}"))
def visualize_model(estimator, **kwargs):
"""
Test various estimators.
"""
X, y = load_mushroom()
y = LabelEncoder().fit_transform(y)
model = Pipeline([
('one_hot_encoder', OneHotEncoder()),
('estimator', estimator)
])
# Instantiate the classification model and visualizer
visualizer = ClassificationReport(
model, classes=['edible', 'poisonous'],
cmap="YlGn", size=(600, 360), **kwargs
)
visualizer.fit(X, y)
visualizer.score(X, y)
# Save the visualization using the Result API
tmp = tempfile.NamedTemporaryFile(suffix=".png")
visualizer.show(outpath=tmp, clear_figure=True)
tmp.seek(0)
visualizer_result = S3Result(bucket="visualizer-demo-images", location="{flow_run_name}/"+estimator.__class__.__name__+".png", serializer=NoOpSerializer())
result = visualizer_result.write(tmp.read(), **prefect.context)
# Save the report for this task run with the Artifacts API
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__,
image=result.location,
model_location="{flow_run_name}/{task_run_id}".format(**prefect.context)))
return model
@task
def map_model_parameters(**kwargs):
model_choices = [
SVC(gamma=kwargs['svc_gamma']), NuSVC(gamma=kwargs['nusvc_gamma']), LinearSVC(),
SGDClassifier(max_iter=kwargs['sgdclassifier_maxiter'], tol=1e-3), KNeighborsClassifier(),
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3),
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300),
RandomForestClassifier(n_estimators=300)
]
return model_choices
with Flow("estimator-flow-model-storage") as f:
# Parameters
svc_gamma = Parameter('svc_gamma', default='auto')
nusvc_gamma = Parameter('nusvc_gamma', default='auto')
sgdclassifier_maxiter = Parameter('sgdclassifier max_iter', default=100)
model_choices = map_model_parameters(svc_gamma=svc_gamma, nusvc_gamma=nusvc_gamma, sgdclassifier_maxiter=sgdclassifier_maxiter)
visualizers = visualize_model.map(model_choices)
f.register("data-science-flows")
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import PrefectResult, S3Result
import pandas as pd
from yellowbrick.datasets import load_mushroom
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
MARKDOWN="""# {prediction_h}
Shape: {shape}
Surface: {surface}
Color: {color}
"""
@task(result=PrefectResult())
def predict(model_location, shape, surface, color):
# TODO: don't rebuild encoders
X, y = load_mushroom()
encoder = OneHotEncoder().fit(X)
labelEncoder = LabelEncoder().fit(y)
result = S3Result(bucket="visualizer-demo-models")
model = result.read(location=model_location).value
test_df = pd.DataFrame(data=[[shape, surface, color]],columns=['shape','surface','color'])
prediction = model.predict(test_df)
prediction_h = labelEncoder.inverse_transform(prediction)[0]
prefect.artifacts.create_markdown(MARKDOWN.format(prediction_h=prediction_h,
shape=shape,
surface=surface,
color=color))
return prediction_h
with Flow("predict-flow") as f:
model_location = Parameter('model_location', default='mysterious-skink/07fbf311-e0fa-40b8-a221-2e4d4342d30e')
shape = Parameter('shape', default='convex')
surface = Parameter('surface', default='smooth')
color = Parameter('color', default='yellow')
predict(model_location, shape, surface, color)
# poisionous: convex, scaly, yellow
# edible: convex, smooth, yellow
f.register("data-science-flows")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment