Example Aqueduct Workflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aqueduct as aq | |
import pandas as pd | |
from sklearn.linear_model import LinearRegression | |
client = aq.Client("API_KEY", "SERVER_ADDRESS") | |
# Extract data from warehouse | |
# These 2 lines replace the entire `extract_stage` task in the Airflow. | |
warehouse = client.integration(name="prod_database") | |
input_data = warehouse.sql(query="SELECT * FROM customers WHERE location = 'US';") | |
# Assume the model is trained with some sample data | |
data = pd.read_csv('data.csv') | |
X = data.iloc[:, 0].values.reshape(-1, 1) | |
Y = data.iloc[:, 1].values.reshape(-1, 1) | |
linear_model = LinearRegression() | |
linear_model.fit(X, Y) | |
@aq.op | |
def predict(df: pd.DataFrame) -> pd.DataFrame: | |
''' | |
This is the exact same predict function as defined above. | |
The only difference is the decorator @aq.op, which indicates | |
that this Python code is part of your workflow. | |
''' | |
df['score'] = pd.DataFrame({"linear": linear_model.predict_proba(df)[:, 1]}) | |
return df | |
predictions = predict(input_data) | |
# Save predictions to warehouse | |
# This 1 line replaces the entire `save_stage` task in the Airflow file. | |
warehouse.save(predictions, table_name="predictions") | |
# Schedule this workflow to run daily on your existing Airflow cluster | |
# This assumes that you have connected your Airflow cluster as an integration | |
# with the name `airflow_cluster` | |
# This 1 line replaces the entire `DAG(...)` clause in the Airflow file. | |
flow = client.publish_flow( | |
name="Customer Churn", | |
artifacts=[predictions], | |
schedule=aq.daily(), | |
# The engine is configured by registering an Airflow integration | |
# with Aqueduct. When registering it, you provide the necessary credentials | |
# for accessing the cluster. | |
engine="airflow_cluster", | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment