Last active
May 4, 2023 16:21
-
-
Save ronniejoshua/c389ccd940014833cadf87cb67b822cd to your computer and use it in GitHub Desktop.
Pandas for Data Engineers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright 2021 Ronnie Joshua | |
# Email: ron.juden@gmail.com | |
# Linkedin: https://www.linkedin.com/in/ronnie-joshua/ | |
# Vist: http://www.webanalyticsinfo.com/ | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# https://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Built in Libraries | |
import os | |
from typing import List, Callable | |
import functools | |
import time | |
import multiprocessing as mp | |
# External Libraries | |
import pandas as pd | |
import numpy as np | |
import psutil | |
def memory_footprint(): | |
'''Returns memory (in MB) being used by Python process''' | |
process_object = psutil.Process(os.getpid()) | |
mem = process_object.memory_info().rss | |
return (mem/(1024**2)) | |
def memory_footprint_calc(func: Callable): | |
""" Calcuates the Memory footprint of the python process | |
Use it as a function decorator. | |
Memory Calculations are in MBs | |
""" | |
@functools.wraps(func) | |
def wrapper(*args, **kwargs): | |
mem_before = psutil.Process(os.getpid()).memory_info().rss/(1024**2) | |
print(f"Python Process Memory Consumption (Before): {mem_before}") | |
value = func(*args, **kwargs) | |
mem_after = psutil.Process(os.getpid()).memory_info().rss/(1024**2) | |
print(f"Python Process Memory Consumption (After): {mem_after}") | |
print(f"Difference: {(mem_after - mem_before)}") | |
return value | |
return wrapper | |
# Utility Functions | |
# Using a decorator to time the function execution time | |
def timer(func: Callable): | |
"""Print the runtime of the decorated function""" | |
@functools.wraps(func) | |
def wrapper_timer(*args, **kwargs): | |
start_time = time.perf_counter() | |
value = func(*args, **kwargs) | |
end_time = time.perf_counter() | |
run_time = end_time - start_time | |
print(f"Finished {func.__name__!r} in {run_time:.4f} secs") | |
return value | |
return wrapper_timer | |
# Managing Data with Generators | |
# Using Generator for Lazy Evaluation of Data | |
def generate_filenames(): | |
return (f"data_{k}.csv" for k in range(1,5)) | |
def read_df_chunks(file): | |
return (chunk for chunk in pd.read_csv(file, chunksize=1000)) | |
def generate_chunks(): | |
chunks = (read_df_chunks(file) for file in generate_filenames()) | |
return chunks | |
@timer | |
@memory_footprint_calc | |
def consume_generator(dfs): | |
df_chunks_list = list() | |
for df in dfs: | |
for df_chunk in df: | |
# Perfor Operation on each chunk | |
df_chunk.set_index('lead_id', inplace=True) | |
df_chunk.loc[:,'row_total'] = df_chunk.sum(numeric_only=True, axis=1) | |
df_chunk = df_chunk[['registration_date', 'country', 'row_total']] | |
df_chunks_list.append(df_chunk) | |
return pd.concat(df_chunks_list) | |
def total_system_resources() -> pd.DataFrame: | |
""" | |
Returns a Dataframe showing the system resources stats. | |
Returns: | |
pd.DataFrame: Dataframe with information about the system resources | |
""" | |
svmem = psutil.virtual_memory() | |
sys_resource = { | |
'total' : svmem.total, | |
'available':svmem.available, | |
'used':svmem.used, | |
'free':svmem.free, | |
'active':svmem.active, | |
'inactive':svmem.inactive, | |
'wired':svmem.wired | |
} | |
df = pd.Series(sys_resource, name="usage").map(lambda x: f'{x/1000**2:.2f} MB').to_frame() | |
df.loc['percent'] = svmem.percent | |
df.loc['cpu_count_cores'] = psutil.cpu_count(logical=False) | |
# Attributed to Hyperthreading | |
df.loc['logical_cpu_count'] = psutil.cpu_count() | |
return df | |
def inspect_dataframe(df: pd.DataFrame): | |
print("\nInformation about the dataframe") | |
print('*'*31) | |
df.info(memory_usage = 'deep') | |
criterias = { | |
"\nCheck if Dataframe is Empty" : df.empty, | |
"\nData Types and thier Value Counts" : df.dtypes.value_counts(), | |
"\nThe Dimensions of the Dataframe" : df.shape, | |
"\nDataframe's total missing values" : df.isnull().sum().sum(), | |
"\nDataframe's missing values column-wise" : df.isnull().sum(), | |
"\nDataframe's Size" : df.size, | |
"\nDataframe's Columns" : df.columns, | |
"\nDataframe's Col Memory Usage (MB's)" : df.memory_usage(deep=True)/2**20, | |
"\nDataframe's Memory Usage" : f'{df.memory_usage(deep=True).sum()/2**20:.5f} MB' | |
} | |
for criteria in criterias: | |
print(criteria) | |
print('*'*len(criteria)) | |
print(criterias.get(criteria)) | |
# https://medium.com/bigdatarepublic/advanced-pandas-optimize-speed-and-memory-a654b53be6c2 | |
# I have use the code from the above attached link, however I have refectored the following | |
# functions: optimize_objects and df_optimize_pipe | |
# I have also tried to implement the Pandas piping operator | |
def optimize_floats(df: pd.DataFrame) -> pd.DataFrame: | |
floats = df.select_dtypes(include=['float64']).columns.tolist() | |
df[floats] = df[floats].apply(pd.to_numeric, downcast='float') | |
return df | |
def optimize_ints(df: pd.DataFrame) -> pd.DataFrame: | |
ints = df.select_dtypes(include=['int64']).columns.tolist() | |
df[ints] = df[ints].apply(pd.to_numeric, downcast='integer') | |
return df | |
def optimize_objects(df: pd.DataFrame, datetime_features: List[str] = None) -> pd.DataFrame: | |
for col in df.select_dtypes(include=['object']): | |
if not datetime_features or col not in datetime_features: | |
num_unique_values = len(df[col].unique()) | |
num_total_values = len(df[col]) | |
if float(num_unique_values) / num_total_values < 0.5: | |
df[col] = df[col].astype('category') | |
else: | |
df[col] = pd.to_datetime(df[col]) | |
return df | |
def df_optimize_pipe(df: pd.DataFrame): | |
df_size_before = df.memory_usage(deep=True).sum()/2**20 | |
print(f'Total Size of Dataframe in MBs Before Optimization: {df_size_before:.5f} MB') | |
(df.pipe(optimize_floats) | |
.pipe(optimize_ints) | |
.pipe(optimize_objects, datetime_features=None)) | |
df_size_after = df.memory_usage(deep=True).sum()/2**20 | |
print(f'Total Size of Dataframe in MBs After Optimization: {df_size_after:.5f} MB') | |
print(f'Percent of Resources Saved: {(df_size_after/df_size_before - 1)*100:.3f} %') | |
def pandas_df_chunksize(file_path, check_nrows=100000, frac_mem_to_use=0.25, | |
size_each_df_mb=250, num_dfs_iterator=None): | |
# Determine the size of each df | |
if size_each_df_mb is None: | |
assert num_dfs_iterator is not None and isinstance(num_dfs_iterator, int) | |
size_each_df_mb = int(((psutil.virtual_memory().available/2**20)*(1*frac_mem_to_use)) / num_dfs_iterator) | |
cost_n_rows_mb = (pd.read_csv(file_path,low_memory=False, nrows=check_nrows) | |
.memory_usage(index=True) | |
.sum()/2**20) | |
# print(int((size_each_df_mb/cost_n_rows_mb)*check_nrows)) | |
return int((size_each_df_mb/cost_n_rows_mb)*check_nrows) | |
def parallelize_dataframe(df: pd.DataFrame, transform_func: Callable) -> pd.DataFrame: | |
df_split = np.array_split(df, os.cpu_count()) | |
with mp.Pool(os.cpu_count()) as p: | |
P_df = pd.concat(p.map(transform_func, df_split)) | |
return P_df | |
def transform_func(df): | |
# Perfor Operation on each chunk | |
df.set_index('lead_id', inplace=True) | |
df.loc[:,'row_total'] = df.sum(numeric_only=True, axis=1) | |
df = df[['registration_date', 'country', 'row_total']] | |
return df | |
if __name__ == "__main___": | |
# Checking the system resources | |
total_system_resources() | |
# Loading the data | |
df = pd.read_csv('./datasets/flights.csv',low_memory=False) | |
pd.set_option('display.max_columns', df.shape[1]) | |
# Inspecting the loaded dataframe | |
inspect_dataframe(df) | |
# Optimizing the loaded dataframe | |
df_optimize_pipe(df) | |
# Reading a dataframe in chunks | |
iter_csv = pd.read_csv( | |
'./datasets/flights.csv', | |
low_memory=False, | |
iterator=True, | |
chunksize=pandas_df_chunksize('./datasets/flights.csv')) | |
for chunk in iter_csv: | |
print(f"Size: {chunk.memory_usage(index=True).sum()/2**20} Shape: {chunk.shape}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment