Skip to content

Instantly share code, notes, and snippets.

@benfasoli
Last active July 9, 2019 22:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benfasoli/974392f21e7641ce3bcc951359b69f2a to your computer and use it in GitHub Desktop.
Save benfasoli/974392f21e7641ce3bcc951359b69f2a to your computer and use it in GitHub Desktop.
Split-apply-combine strategy for data mining in parallel with Pandas DataFrames
#!/usr/bin/env python
from multiprocessing import cpu_count, Pool
import pandas as pd
def parallel_apply(df, fun, n_chunks: int = None):
"""Apply function to batches of dataframe rows
Parameters
----------
df : pd.DataFrame
Source DataFrame which is split into n_chunks and passed as the first
argument to fun
fun : function
Function that returns a pandas DataFrame or Series
n_chunks : int
Number of parallel processes to use. If None, defaults to the number of
cpus available
"""
n_chunks = n_chunks or cpu_count()
n_rows = len(df)
chunk_size = n_rows / n_chunks
n_rows_per_chunk = int(chunk_size) + 1 if n_chunks % chunk_size else int(chunk_size)
breaks = [x * n_rows_per_chunk for x in range(n_chunks)] + [n_rows]
df_chunks = [df.iloc[breaks[i]:breaks[i+1], ] for i in range(n_chunks)]
pool = Pool(n_chunks)
res = pool.map(fun, df_chunks)
pool.close()
pool.join()
return pd.concat(res)
def fun(chunk):
"""Function applied to each chunk of the DataFrame
This example adds 1 to every value in the x column
"""
chunk.x += 1
return chunk
df = pd.DataFrame([
{'x': 1, 'y': 1},
{'x': 3, 'y': 3},
{'x': 4, 'y': 4}
], index=['a', 'b', 'c'])
print(df)
# x y
# a 1 1
# b 3 3
# c 4 4
result = parallel_apply(df, fun)
print(result)
# x y
# a 2 1
# b 4 3
# c 5 4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment