Skip to content

Instantly share code, notes, and snippets.

@philippschmalen
Last active December 29, 2021 10:13
Show Gist options
  • Save philippschmalen/85f787e9bb6795f9076a39032e25afe7 to your computer and use it in GitHub Desktop.
Save philippschmalen/85f787e9bb6795f9076a39032e25afe7 to your computer and use it in GitHub Desktop.
Logging decorator for functions that transform a pandas dataframe. Logs number of rows before and after.

Logging utility that compares pandas dataframes

Standardized logging for data pipelines that transform pandas dataframes.

TL;DR

# logging functions
def logging_transform_df(df_orig: pd.DataFrame, df_edit: pd.DataFrame, step_name: str = 'compare df') -> None:
    "Logging utility to show row difference between two dataframes"
    N_orig, N_edit = len(df_orig), len(df_edit)
    N_diff = N_orig - N_edit
    pct_diff = N_diff / N_orig * 100

    # sign for added or removed rows
    if N_diff > 0:
        sign = '-'
    if N_diff < 0:
        sign = '+'
    if N_diff == 0:
        sign = ''

    logging.info(f'{step_name} --> {sign}{N_diff} records ({sign}{pct_diff:.2f}%, {N_orig}-{N_edit})')


def wrap_logging_transform_df(func):
    """Wrapper to compare df before and after transformation"""
    @wraps(func)
    def with_logging(*args, **kwargs):
            
            try:
                df_orig: pd.DataFrame = args[0]
                df_edit: pd.DataFrame = func(*args, **kwargs)

                # df_edit and df_orig should be dataframe
                if not isinstance(df_orig, pd.DataFrame):
                    raise TypeError(f'{func.__name__} args[0] is not a dataframe, but type: {type(args[0])}). Skip logging.')
    
                if not isinstance(df_edit, pd.DataFrame):
                    raise TypeError(f"{func.__name__} did not return dataframe, but type: {type(df_edit)}. Skip logging.")

                else:
                    logging_transform_df(df_orig, df_edit, func.__name__)
                    return df_edit

            except Exception as e:
                logging.error(f"{func.__name__} - wrap_log_df_difference(): {e}")
            
            return func(*args, **kwargs)

    return with_logging


# decorate functions
@wrap_logging_transform_df
def select_top_items(df: pd.DataFrame, top_n_items: int =50) -> pd.DataFrame:
    "Top n items by revenue"
    top_items = (df
        .groupby('stockcode').agg({'revenue': 'sum'})
        .sort_values('revenue', ascending=False)
        .head(top_n_items)
        .index
        .to_list()
    )

    return (df
        .loc[df.stockcode.isin(top_items)]
        .reset_index(drop=True)
    )

...

# example preprocessing pipeline
@wrap_logging_transform_df
def preprocessing(df: pd.DataFrame, dtype_mapping: dict, exclude_cols: list, exclude_items: list, top_n_items: int) -> pd.DataFrame:
    """
    Preprocessing pipeline: assign dtypes, drop columns, missings, duplicates,
    non-products, keep top items, aggregate daily by item, create ds, 
    y cols for prophet
    """
    return (df
        .drop(columns=exclude_cols)
        .astype(dtype_mapping)
        .assign(description=lambda x: x['description'].str.strip(), 
            revenue=lambda x: x['quantity'] * x['unitprice'])
        .pipe(drop_non_products, exclude_items=exclude_items)
        .pipe(select_top_items, top_n_items=top_n_items)
        .pipe(aggregate_daily)
        .pipe(create_prophet_df)
        .pipe(drop_missings_duplicates)
    )


if __name__ == '__main__':
    df_raw = get_raw_data(filepath)
    df = preprocessing(df_raw, **config_preprocessing)

Inspect logs:

2021-12-29 10:26:24,115 - root - INFO - Reading file: data/raw/online_retail_small.xlsx
2021-12-29 10:27:33,988 - root - INFO - select_top_items --> -510208 records (-94.15%, 541909-31701)
2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374)
2021-12-29 10:27:34,028 - root - INFO - drop_non_products --> -87 records (-23.26%, 374-287)
2021-12-29 10:27:34,036 - root - INFO - drop_missings_duplicates --> 0 records (0.00%, 287-287)
2021-12-29 10:27:34,037 - root - INFO - preprocessing --> -541622 records (-99.95%, 541909-287)

Problem

Data pipelines take a dataframe as input and transform it with several functions, finally returning a transformed dataframe.

Data pipeline df --> (function_1) --> ... --> (function_n) --> df_transformed

A data pipeline benefits from standardized logging. Errors or unexpected behavior of transformations can be easily spotted.

Standardized logging

2021-12-29 10:27:33,988 - root - INFO - select_top_items --> -510208 records (-94.15%, 541909-31701)
2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374)
2021-12-29 10:27:34,028 - root - INFO - drop_non_products --> -87 records (-23.26%, 374-287)
2021-12-29 10:27:34,036 - root - INFO - drop_missings_duplicates --> 0 records (0.00%, 287-287)
2021-12-29 10:27:34,037 - root - INFO - preprocessing --> -541622 records (-99.95%, 541909-287)

Solution

def logging_transform_df(df_orig: pd.DataFrame, df_edit: pd.DataFrame, step_name: str = 'compare df') -> None:
    "Logging utility to show row difference between two dataframes"
    N_orig, N_edit = len(df_orig), len(df_edit)
    N_diff = N_orig - N_edit
    pct_diff = N_diff / N_orig * 100

    # sign for added or removed rows
    if N_diff > 0:
        sign = '-'
    if N_diff < 0:
        sign = '+'
    if N_diff == 0:
        sign = ''

    logging.info(f'{step_name} --> {sign}{N_diff} records ({sign}{pct_diff:.2f}%, {N_orig}-{N_edit})')


def wrap_logging_transform_df(func):
    """Wrapper to compare df before and after transformation"""
    @wraps(func)
    def with_logging(*args, **kwargs):
            
            try:
                df_orig: pd.DataFrame = args[0]
                df_edit: pd.DataFrame = func(*args, **kwargs)

                # df_edit and df_orig should be dataframe
                if not isinstance(df_orig, pd.DataFrame):
                    raise TypeError(f'{func.__name__} args[0] is not a dataframe, but type: {type(args[0])}). Skip logging.')
    
                if not isinstance(df_edit, pd.DataFrame):
                    raise TypeError(f"{func.__name__} did not return dataframe, but type: {type(df_edit)}. Skip logging.")

                else:
                    logging_transform_df(df_orig, df_edit, func.__name__)
                    return df_edit

            except Exception as e:
                logging.error(f"{func.__name__} - wrap_log_df_difference(): {e}")
            
            return func(*args, **kwargs)

    return with_logging

Usage

Apply the decorator @wrap_logging_transform_df to functions that transform a dataframe as part of a data pipeline, such as aggregate_daily.

@wrap_logging_transform_df
def aggregate_daily(df: pd.DataFrame) -> pd.DataFrame:
    "Sums quantity, revenue per item and day"
    return (df
        .resample('D', on='invoicedate')
        .agg(
                {'description': 'first', 'revenue': 'sum', 'quantity': 'sum', 'unitprice': 'first', 'country': 'first', 'stockcode': 'first', 'invoiceno': 'first'}
            )
        .reset_index()
    )
 
 ...
 
  2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374)

Appendix

Example dataset

Online retail data: http://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx

invoiceno stockcode description quantity invoicedate unitprice customerid country
536365 85123A WHITE HANGING HEART T-LIGHT HOLDER 6 2010-12-01 08:26:00 2.55 17850 United Kingdom
536365 84029G KNITTED UNION FLAG HOT WATER BOTTLE 6 2010-12-01 08:26:00 3.39 17850 United Kingdom
... ... ... ... ... ... ... ...

Example preprocessing pipeline

Load and transform Online retail data stored in data/raw/online_retail_small.xlsx:

  1. drop unnecessary columns
  2. assign dtypes for efficiency
  3. select top_n items
  4. aggregate transactions for each day
  5. drop non-products
  6. create ds and y column for facebook prophet
  7. drop missings and duplicates
import pandas as pd
import logging
from functools import wraps

logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# configuration for local development
filepath = 'data/raw/online_retail_small.xlsx'
exclude_cols = ['customerid']
dtype_mapping = {
    "invoiceno": "object", 
    "quantity": 'uint8', 
    "country": "category"
    }

exclude_items = ['AMAZON FEE',
    'AMAZON',
    'Manual',
    'POSTAGE',
    'DOTCOM POSTAGE',
    'CRUK Commission',
    'Bank Charges',
    'Discount',
    'SAMPLES', 
    'Adjust bad debt'
]

top_n_items = 50

config_preprocessing = {
    'dtype_mapping': dtype_mapping,
    'exclude_cols': exclude_cols,
    'exclude_items': exclude_items,
    'top_n_items': 50
}


def columns_to_lowercase(df: pd.DataFrame) -> pd.DataFrame:
    "Column names to lowercase, no whitespace"
    df.columns = (df.columns
        .str.lower()
        .str.strip()
    )
    return df


def wrap_logging_transform_df(func):
    """Wrapper to compare df before and after transformation"""
    @wraps(func)
    def with_logging(*args, **kwargs):
            
            try:
                df_orig: pd.DataFrame = args[0]
                df_edit: pd.DataFrame = func(*args, **kwargs)

                # df_edit and df_orig should be dataframe
                if not isinstance(df_orig, pd.DataFrame):
                    raise TypeError(f'{func.__name__} args[0] is not a dataframe, but type: {type(args[0])}). Skip logging.')
    
                if not isinstance(df_edit, pd.DataFrame):
                    raise TypeError(f"{func.__name__} did not return dataframe, but type: {type(df_edit)}. Skip logging.")

                else:
                    logging_transform_df(df_orig, df_edit, func.__name__)
                    return df_edit

            except Exception as e:
                logging.error(f"{func.__name__} - wrap_logging_transform_df(): {e}")
            
            return func(*args, **kwargs)

    return with_logging


def logging_transform_df(df_orig: pd.DataFrame, df_edit: pd.DataFrame, step_name: str = 'compare df') -> None:
    "Logging utility to show row difference between two dataframes"
    N_orig, N_edit = len(df_orig), len(df_edit)
    N_diff = N_orig - N_edit
    pct_diff = N_diff / N_orig * 100

    # sign for added or removed rows
    if N_diff > 0:
        sign = '-'
    if N_diff < 0:
        sign = '+'
    if N_diff == 0:
        sign = ''

    logging.info(f'{step_name} --> {sign}{N_diff} records ({sign}{pct_diff:.2f}%, {N_orig}-{N_edit})')


@wrap_logging_transform_df
def drop_missings_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    "Return df without missings and duplicates"
    df_nomiss = df.dropna()
    # log_df_difference(df, df_nomiss, 'drop_missings')
    
    df_nodup = df_nomiss.drop_duplicates()
    # log_df_difference(df_nomiss, df_nodup, 'drop_duplicates')

    return df_nodup.reset_index(drop=True)


@wrap_logging_transform_df
def drop_non_products(df: pd.DataFrame, exclude_items: list) -> pd.DataFrame:
    "Drop administrative items from description"
    
    df_items = (df
        .query(f'description not in {exclude_items}')
        .query(f'revenue > 0') 
    )

    return df_items


@wrap_logging_transform_df
def select_top_items(df: pd.DataFrame, top_n_items: int =50) -> pd.DataFrame:
    "Top n items by revenue"
    top_items = (df
        .groupby('stockcode').agg({'revenue': 'sum'})
        .sort_values('revenue', ascending=False)
        .head(top_n_items)
        .index
        .to_list()
    )

    return (df
        .loc[df.stockcode.isin(top_items)]
        .reset_index(drop=True)
    )


@wrap_logging_transform_df
def aggregate_daily(df: pd.DataFrame) -> pd.DataFrame:
    "Sums quantity, revenue per item and day"
    return (df
        .resample('D', on='invoicedate')
        .agg(
                {'description': 'first', 'revenue': 'sum', 'quantity': 'sum', 'unitprice': 'first', 'country': 'first', 'stockcode': 'first', 'invoiceno': 'first'}
            )
        .reset_index()
    )



def create_prophet_df(df: pd.DataFrame, y: str = 'revenue', ds: str = 'invoicedate') -> pd.DataFrame:
    "According prophet input, create cols: ds and y"
    return (df
        .assign(y= lambda x: x[y], ds= lambda x: x[ds])
    )


def get_raw_data(filepath) -> pd.DataFrame:
    "Return data without preprocessing"
    logging.info(f'Reading file: {filepath}')
    return (pd.read_excel(filepath)
        .pipe(columns_to_lowercase)
    )


@wrap_logging_transform_df
def preprocessing(df: pd.DataFrame, dtype_mapping: dict, exclude_cols: list, exclude_items: list, top_n_items: int) -> pd.DataFrame:
    """
    Preprocessing pipeline: assign dtypes, drop columns, missings, duplicates,
    non-products, keep top items, aggregate daily by item, create ds, 
    y cols for prophet
    """
    return (df
        .drop(columns=exclude_cols)
        .astype(dtype_mapping)
        .assign(description=lambda x: x['description'].str.strip(), 
            revenue=lambda x: x['quantity'] * x['unitprice'])
        .pipe(drop_non_products, exclude_items=exclude_items)
        .pipe(select_top_items, top_n_items=top_n_items)
        .pipe(aggregate_daily)
        .pipe(create_prophet_df)
        .pipe(drop_missings_duplicates)
    )


if __name__ == '__main__':
    df_raw = get_raw_data(filepath)
    df = preprocessing(df_raw, **config_preprocessing)
2021-12-29 10:26:24,115 - root - INFO - Reading file: data/raw/online_retail_small.xlsx
2021-12-29 10:27:33,988 - root - INFO - select_top_items --> -510208 records (-94.15%, 541909-31701)
2021-12-29 10:27:34,021 - root - INFO - aggregate_daily --> -31327 records (-98.82%, 31701-374)
2021-12-29 10:27:34,028 - root - INFO - drop_non_products --> -87 records (-23.26%, 374-287)
2021-12-29 10:27:34,036 - root - INFO - drop_missings_duplicates --> 0 records (0.00%, 287-287)
2021-12-29 10:27:34,037 - root - INFO - preprocessing --> -541622 records (-99.95%, 541909-287)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment