Skip to content

Instantly share code, notes, and snippets.

@mgbckr
Last active February 9, 2022 17:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mgbckr/ef03186fa018daf0d093b5f4e10d6266 to your computer and use it in GitHub Desktop.
Save mgbckr/ef03186fa018daf0d093b5f4e10d6266 to your computer and use it in GitHub Desktop.
Reading a large CSV file via pandas and joblib. Probably degrades due to pd.concat usage. Tests and better function parameter definitions and documentation pending.

Parallelized pd.read_csv with joblib

Reading a large CSV file via pandas and joblib. Probably degrades due to pd.concat usage. Tests and better function parameter definitions and documentation pending.

A very objective test on a 5GB CSV file (shape=()) resulted in a Kernel died message (it was run in a Jupyter notebook and repeated twice) when using pd.read_csv directly. In contrast, using read_csv_joblib with the following settings returned in 3h 4m: Concatenating the row chunks took the longest.

df = read_csv_joblib(file_path, row_chunksize=17, column_chunksize=None, n_rows=None, n_columns=None, sep="\s+", n_jobs=70)

Here are the timing results:

* Reading dimensions

  * duration: 177.37sec
* Deriving intervals
  * row steps: 97, column steps: 1
  * duration: 0.00sec
* Loading dataframes

  * loaded dataframes: 97
  * duration: 1203.23sec
* Concatenating dataframe columns
  * duration: 0.00sec
* Concatenating dataframe rows

  * duration: 9691.57sec
CPU times: user 2h 42min 46s, sys: 3min 26s, total: 2h 46min 12s
Wall time: 3h 4min 52s
import time
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
def read_csv_joblib(csv_path, row_chunksize, column_chunksize=None, usecols=None, n_rows=None, n_columns=None, n_jobs=None, **kwargs):
if usecols is not None and column_chunksize is not None:
raise ValueError("`usecols` and `column_chunksize` are mutually exclusive")
print("* Reading dimensions")
start = time.time()
if n_rows is None:
n_rows = pd.read_csv(file_path, sep="\s+", usecols=[0]).shape[0]
if n_columns is None:
df_columns = pd.read_csv(file_path, sep="\s+", nrows=1)
n_columns = df_columns.shape[1]
# headers = df_columns.columns
# dtypes = df_columns.dtypes
if row_chunksize is None:
row_chunksize = n_rows
if column_chunksize is None:
column_chunksize = n_columns
print(f" * duration: {time.time() - start:.02f}sec")
print("* Deriving intervals")
start = time.time()
row_steps = np.arange(0, n_rows, row_chunksize)
row_steps = np.append(row_steps, n_rows)
column_steps = np.arange(0, n_columns, column_chunksize)
column_steps = np.append(column_steps, n_columns)
print(f" * row steps: {len(row_steps) - 1}, column steps: {len(column_steps) - 1}")
print(f" * duration: {time.time() - start:.02f}sec")
print("* Loading dataframes")
start = time.time()
dfs = Parallel(n_jobs=n_jobs)(
delayed(pd.read_csv)(
file_path,
usecols=np.arange(column_steps[c],column_steps[c+1]),
skiprows=None if row_steps[r] == 0 else np.arange(1, row_steps[r]),
nrows=row_steps[r + 1] - row_steps[r],
**kwargs)
for r in range(len(row_steps) - 1)
for c in range(len(column_steps) - 1))
print(f" * loaded dataframes: {len(dfs)}")
print(f" * duration: {time.time() - start:.02f}sec")
# concat rows
print("* Concatenating dataframe columns")
start = time.time()
len_row = len(column_steps) - 1
rows = []
for r in range(len(row_steps) - 1):
row_chunks = dfs[r * len_row : ((r + 1) * len_row)]
if len(row_chunks) > 0:
row = pd.concat(row_chunks, axis=1)
rows.append(row)
print(f" * duration: {time.time() - start:.02f}sec")
print("* Concatenating dataframe rows")
start = time.time()
df = pd.concat(rows).reset_index(drop=True)
print(f" * duration: {time.time() - start:.02f}sec")
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment