Skip to content

Instantly share code, notes, and snippets.

@kvnkho
Created July 31, 2022 05:26
Show Gist options
  • Save kvnkho/4d35c8eb96d672e99a64ff7c6cf0b19a to your computer and use it in GitHub Desktop.
Save kvnkho/4d35c8eb96d672e99a64ff7c6cf0b19a to your computer and use it in GitHub Desktop.
from fugue import FugueWorkflow
import pandas as pd
# + tags=["parameters"]
# declare a list tasks whose products you want to use as inputs
upstream = ["extract"]
product = None
engine = None
# -
def add_cols(df: pd.DataFrame) -> pd.DataFrame:
df["col3"] = df["col1"] + df["col2"]
return df
with FugueWorkflow(engine) as dag:
df = dag.load(upstream["extract"]["data"])
df = df.transform(add_cols, schema="*, col3:int")
df.save(product["data"], mode="overwrite", single=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment