Skip to content

Instantly share code, notes, and snippets.

@kvnkho
Last active May 30, 2021 23:29
Show Gist options
  • Save kvnkho/2ac420ea37b47569ac1be0f060204e53 to your computer and use it in GitHub Desktop.
Save kvnkho/2ac420ea37b47569ac1be0f060204e53 to your computer and use it in GitHub Desktop.
import pandera as pa
from pandera import Column, Check, DataFrameSchema
from fugue import FugueWorkflow
from fugue_spark import SparkExecutionEngine
price_check_FL = pa.DataFrameSchema({
"Price": Column(pa.Float, Check.in_range(min_value=7,max_value=13)),
})
price_check_CA = pa.DataFrameSchema({
"Price": Column(pa.Float, Check.in_range(min_value=15,max_value=21)),
})
price_checks = {'CA': price_check_CA, 'FL': price_check_FL}
# schema: *
def price_validation(df:pd.DataFrame) -> pd.DataFrame:
location = df['State'].iloc[0]
check = price_checks[location]
check.validate(df)
return df
with FugueWorkflow(SparkExecutionEngine) as dag:
df = dag.df(df).partition(by=["State"]).transform(price_validation)
df.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment