Skip to content

Instantly share code, notes, and snippets.

@saurav-c
Last active January 12, 2023 20:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saurav-c/01108c74c07a9e6ad76c4ddaca52dbde to your computer and use it in GitHub Desktop.
Save saurav-c/01108c74c07a9e6ad76c4ddaca52dbde to your computer and use it in GitHub Desktop.
Example Aqueduct Workflow
import aqueduct as aq
import pandas as pd
from sklearn.linear_model import LinearRegression
client = aq.Client("API_KEY", "SERVER_ADDRESS")
# Extract data from warehouse
# These 2 lines replace the entire `extract_stage` task in the Airflow.
warehouse = client.integration(name="prod_database")
input_data = warehouse.sql(query="SELECT * FROM customers WHERE location = 'US';")
# Assume the model is trained with some sample data
data = pd.read_csv('data.csv')
X = data.iloc[:, 0].values.reshape(-1, 1)
Y = data.iloc[:, 1].values.reshape(-1, 1)
linear_model = LinearRegression()
linear_model.fit(X, Y)
@aq.op
def predict(df: pd.DataFrame) -> pd.DataFrame:
'''
This is the exact same predict function as defined above.
The only difference is the decorator @aq.op, which indicates
that this Python code is part of your workflow.
'''
df['score'] = pd.DataFrame({"linear": linear_model.predict_proba(df)[:, 1]})
return df
predictions = predict(input_data)
# Save predictions to warehouse
# This 1 line replaces the entire `save_stage` task in the Airflow file.
warehouse.save(predictions, table_name="predictions")
# Schedule this workflow to run daily on your existing Airflow cluster
# This assumes that you have connected your Airflow cluster as an integration
# with the name `airflow_cluster`
# This 1 line replaces the entire `DAG(...)` clause in the Airflow file.
flow = client.publish_flow(
name="Customer Churn",
artifacts=[predictions],
schedule=aq.daily(),
# The engine is configured by registering an Airflow integration
# with Aqueduct. When registering it, you provide the necessary credentials
# for accessing the cluster.
engine="airflow_cluster",
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment