Skip to content

Instantly share code, notes, and snippets.

Last active January 12, 2023 20:28
What would you like to do?
Example Aqueduct Workflow
import aqueduct as aq
import pandas as pd
from sklearn.linear_model import LinearRegression
client = aq.Client("API_KEY", "SERVER_ADDRESS")
# Extract data from warehouse
# These 2 lines replace the entire `extract_stage` task in the Airflow.
warehouse = client.integration(name="prod_database")
input_data = warehouse.sql(query="SELECT * FROM customers WHERE location = 'US';")
# Assume the model is trained with some sample data
data = pd.read_csv('data.csv')
X = data.iloc[:, 0].values.reshape(-1, 1)
Y = data.iloc[:, 1].values.reshape(-1, 1)
linear_model = LinearRegression(), Y)
def predict(df: pd.DataFrame) -> pd.DataFrame:
This is the exact same predict function as defined above.
The only difference is the decorator @aq.op, which indicates
that this Python code is part of your workflow.
df['score'] = pd.DataFrame({"linear": linear_model.predict_proba(df)[:, 1]})
return df
predictions = predict(input_data)
# Save predictions to warehouse
# This 1 line replaces the entire `save_stage` task in the Airflow file., table_name="predictions")
# Schedule this workflow to run daily on your existing Airflow cluster
# This assumes that you have connected your Airflow cluster as an integration
# with the name `airflow_cluster`
# This 1 line replaces the entire `DAG(...)` clause in the Airflow file.
flow = client.publish_flow(
name="Customer Churn",
# The engine is configured by registering an Airflow integration
# with Aqueduct. When registering it, you provide the necessary credentials
# for accessing the cluster.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment