Pandas for Data Engineers
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 Ronnie Joshua
# Email:
# Linkedin:
# Vist:
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# Built in Libraries
import os
from typing import List, Callable
import functools
import time
import multiprocessing as mp
# External Libraries
import pandas as pd
import numpy as np
import psutil
def memory_footprint():
'''Returns memory (in MB) being used by Python process'''
process_object = psutil.Process(os.getpid())
mem = process_object.memory_info().rss
return (mem/(1024**2))
def memory_footprint_calc(func: Callable):
""" Calcuates the Memory footprint of the python process
Use it as a function decorator.
Memory Calculations are in MBs
def wrapper(*args, **kwargs):
mem_before = psutil.Process(os.getpid()).memory_info().rss/(1024**2)
print(f"Python Process Memory Consumption (Before): {mem_before}")
value = func(*args, **kwargs)
mem_after = psutil.Process(os.getpid()).memory_info().rss/(1024**2)
print(f"Python Process Memory Consumption (After): {mem_after}")
print(f"Difference: {(mem_after - mem_before)}")
return value
return wrapper
# Utility Functions
# Using a decorator to time the function execution time
def timer(func: Callable):
"""Print the runtime of the decorated function"""
def wrapper_timer(*args, **kwargs):
start_time = time.perf_counter()
value = func(*args, **kwargs)
end_time = time.perf_counter()
run_time = end_time - start_time
print(f"Finished {func.__name__!r} in {run_time:.4f} secs")
return value
return wrapper_timer
# Managing Data with Generators
# Using Generator for Lazy Evaluation of Data
def generate_filenames():
return (f"data_{k}.csv" for k in range(1,5))
def read_df_chunks(file):
return (chunk for chunk in pd.read_csv(file, chunksize=1000))
def generate_chunks():
chunks = (read_df_chunks(file) for file in generate_filenames())
return chunks
def consume_generator(dfs):
df_chunks_list = list()
for df in dfs:
for df_chunk in df:
# Perfor Operation on each chunk
df_chunk.set_index('lead_id', inplace=True)
df_chunk.loc[:,'row_total'] = df_chunk.sum(numeric_only=True, axis=1)
df_chunk = df_chunk[['registration_date', 'country', 'row_total']]
return pd.concat(df_chunks_list)
def total_system_resources() -> pd.DataFrame:
Returns a Dataframe showing the system resources stats.
pd.DataFrame: Dataframe with information about the system resources
svmem = psutil.virtual_memory()
sys_resource = {
'total' :,
df = pd.Series(sys_resource, name="usage").map(lambda x: f'{x/1000**2:.2f} MB').to_frame()
df.loc['percent'] = svmem.percent
df.loc['cpu_count_cores'] = psutil.cpu_count(logical=False)
# Attributed to Hyperthreading
df.loc['logical_cpu_count'] = psutil.cpu_count()
return df
def inspect_dataframe(df: pd.DataFrame):
print("\nInformation about the dataframe")
print('*'*31) = 'deep')
criterias = {
"\nCheck if Dataframe is Empty" : df.empty,
"\nData Types and thier Value Counts" : df.dtypes.value_counts(),
"\nThe Dimensions of the Dataframe" : df.shape,
"\nDataframe's total missing values" : df.isnull().sum().sum(),
"\nDataframe's missing values column-wise" : df.isnull().sum(),
"\nDataframe's Size" : df.size,
"\nDataframe's Columns" : df.columns,
"\nDataframe's Col Memory Usage (MB's)" : df.memory_usage(deep=True)/2**20,
"\nDataframe's Memory Usage" : f'{df.memory_usage(deep=True).sum()/2**20:.5f} MB'
for criteria in criterias:
# I have use the code from the above attached link, however I have refectored the following
# functions: optimize_objects and df_optimize_pipe
# I have also tried to implement the Pandas piping operator
def optimize_floats(df: pd.DataFrame) -> pd.DataFrame:
floats = df.select_dtypes(include=['float64']).columns.tolist()
df[floats] = df[floats].apply(pd.to_numeric, downcast='float')
return df
def optimize_ints(df: pd.DataFrame) -> pd.DataFrame:
ints = df.select_dtypes(include=['int64']).columns.tolist()
df[ints] = df[ints].apply(pd.to_numeric, downcast='integer')
return df
def optimize_objects(df: pd.DataFrame, datetime_features: List[str] = None) -> pd.DataFrame:
for col in df.select_dtypes(include=['object']):
if not datetime_features or col not in datetime_features:
num_unique_values = len(df[col].unique())
num_total_values = len(df[col])
if float(num_unique_values) / num_total_values < 0.5:
df[col] = df[col].astype('category')
df[col] = pd.to_datetime(df[col])
return df
def df_optimize_pipe(df: pd.DataFrame):
df_size_before = df.memory_usage(deep=True).sum()/2**20
print(f'Total Size of Dataframe in MBs Before Optimization: {df_size_before:.5f} MB')
.pipe(optimize_objects, datetime_features=None))
df_size_after = df.memory_usage(deep=True).sum()/2**20
print(f'Total Size of Dataframe in MBs After Optimization: {df_size_after:.5f} MB')
print(f'Percent of Resources Saved: {(df_size_after/df_size_before - 1)*100:.3f} %')
def pandas_df_chunksize(file_path, check_nrows=100000, frac_mem_to_use=0.25,
size_each_df_mb=250, num_dfs_iterator=None):
# Determine the size of each df
if size_each_df_mb is None:
assert num_dfs_iterator is not None and isinstance(num_dfs_iterator, int)
size_each_df_mb = int(((psutil.virtual_memory().available/2**20)*(1*frac_mem_to_use)) / num_dfs_iterator)
cost_n_rows_mb = (pd.read_csv(file_path,low_memory=False, nrows=check_nrows)
# print(int((size_each_df_mb/cost_n_rows_mb)*check_nrows))
return int((size_each_df_mb/cost_n_rows_mb)*check_nrows)
def parallelize_dataframe(df: pd.DataFrame, transform_func: Callable) -> pd.DataFrame:
df_split = np.array_split(df, os.cpu_count())
with mp.Pool(os.cpu_count()) as p:
P_df = pd.concat(, df_split))
return P_df
def transform_func(df):
# Perfor Operation on each chunk
df.set_index('lead_id', inplace=True)
df.loc[:,'row_total'] = df.sum(numeric_only=True, axis=1)
df = df[['registration_date', 'country', 'row_total']]
return df
if __name__ == "__main___":
# Checking the system resources
# Loading the data
df = pd.read_csv('./datasets/flights.csv',low_memory=False)
pd.set_option('display.max_columns', df.shape[1])
# Inspecting the loaded dataframe
# Optimizing the loaded dataframe
# Reading a dataframe in chunks
iter_csv = pd.read_csv(
for chunk in iter_csv:
print(f"Size: {chunk.memory_usage(index=True).sum()/2**20} Shape: {chunk.shape}")
