Skip to content

Instantly share code, notes, and snippets.

View kvnkho's full-sized avatar
👋
Feel free to message me. Contact info in profile

Kevin Kho kvnkho

👋
Feel free to message me. Contact info in profile
View GitHub Profile
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import AutoARIMA
from statsforecast.core import StatsForecast
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})
from fugue import transform
def forecast_series(df: pd.DataFrame, models) -> pd.DataFrame:
tdf = df.set_index("unique_id")
model = StatsForecast(df=tdf, models=models, freq='D', n_jobs=1)
return model.forecast(7).reset_index()
transform(series.reset_index(),
forecast_series,
params=dict(models=[AutoARIMA()]),
from time import time
import pandas as pd
from statsforecast.utils import generate_series
from statsforecast.models import AutoARIMA
from statsforecast.core import StatsForecast
series = generate_series(n_series=1000000, seed=1)
model = StatsForecast(df=series,
import pandas as pd
# + tags=["parameters"]
# declare a list tasks whose products you want to use as inputs
upstream = None
product = None
# -
df = pd.DataFrame({"col1": [1,2,3], "col2":[2,3,4]})
df.to_parquet(product["data"])
@kvnkho
kvnkho / fugue_transform.py
Last active August 2, 2022 19:25
Example of Fugue Transform
import pandas as pd
from fugue import transform
from sklearn.preprocessing import minmax_scale
df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2":[1,2,3,4,5,6]})
def normalize(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(scaled=minmax_scale(df["col2"]))
# run on Pandas
from fugue import FugueWorkflow
import pandas as pd
# + tags=["parameters"]
# declare a list tasks whose products you want to use as inputs
upstream = ["extract"]
product = None
engine = None
# -
@kvnkho
kvnkho / transform_with_spark.py
Last active August 11, 2022 02:25
Ploomber with Fugue on Spark
import pandas as pd
from fugue import transform
from sklearn.preprocessing import minmax_scale
# %% tags=["parameters"]
# declare a list tasks whose products you want to use as inputs
upstream = ["extract"]
product = None
engine = None
@kvnkho
kvnkho / env.yaml
Last active August 1, 2022 17:30
Ploomber env yaml
engine: spark
raw_filename: full_raw.parquet
transformed_filename: full_transformed.parquet
@kvnkho
kvnkho / pipeline_with_parameters.yaml
Created July 31, 2022 04:40
Ploomber pipeline with parameters
# Content of pipeline.yaml
tasks:
- source: extract.py
product:
# scripts generate executed notebooks as outputs
nb: output/extract.html
# you can define as many outputs as you want
data: 'data/{{raw_filename}}'
params:
engine: '{{engine}}'
@kvnkho
kvnkho / pipeline.yaml
Last active August 2, 2022 19:07
Ploomber Pipeline
# Content of pipeline.yaml
tasks:
- source: extract.py
product:
# scripts generate executed notebooks as outputs
nb: output/extract.html
# you can define as many outputs as you want
data: data/raw.parquet
- source: transform.py