Skip to content

Instantly share code, notes, and snippets.

@adriantre
Last active November 13, 2019 14:12
Show Gist options
  • Save adriantre/b4eb34f05a21b37070e309d5709caebb to your computer and use it in GitHub Desktop.
Save adriantre/b4eb34f05a21b37070e309d5709caebb to your computer and use it in GitHub Desktop.
from functools import partial
import multiprocessing
import warnings
import rasterio
import pandas as pd
import numpy as np
def func_to_run_on_each_row():
# My function that should be called by apply
# i.e. df.apply(func=funcToRunOnEachRow)
# In my case, this function reads a part of the specified image.
pass
def open_dataset_and_call_func(
func_to_call, *args, dataset_path=None, df_inner_func=None, **kwargs
):
""" A function to be used in multiprocessing that opens a dataset and passes it to a function
Opens a dataset and partially evaluates the function df_inner_func.
The dataset is passed to the inputed function, along with the last args argument.
Is to be used in multiprocessing, as each process or thread needs each own dataset handler
"""
# exclude the last arg, as this is partially evaluated (later), containing the df_split
df_innfer_func_args = args[:-1]
func_arg = args[-1]
with rasterio.open(dataset_path) as image_datasource:
# func is the function to be called within the dataframe function
# e.g. df.apply(func=func_to_run_on_each_row)
# here we partially evaluate it with the dataset handler,
# so that each process use its own dataset handler.
df_inner_func = partial(
df_inner_func, image_datasource, *df_innfer_func_args
)
result = func_to_call(func_arg, func=df_inner_func, **kwargs)
return result
def _run_df_func_on_split(tup_arg, **kwargs):
split_ind, df_split, df_func_name = tup_arg
# becomes e.g: df_split.apply(func=func_to_run_on_each_row(args), axis='columns')
return (split_ind, getattr(df_split, df_func_name)(**kwargs))
def multiprocessDF(
*args,
df=None,
subset=None,
njobs=-1,
use_threading=False,
intermediate_func_on_each_split=None,
df_func_name=None,
**kwargs
) -> pd.DataFrame:
"""Runs a dataframe function on a dataframe in parallell"""
if njobs == -1:
njobs = multiprocessing.cpu_count()
if use_threading:
pool = multiprocessing.pool.ThreadPool(njobs)
else:
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except KeyError:
warnings.warn(
"The provided column subset was not found in the dataframe"
)
splits = np.array_split(df, njobs)
pool_data = [
(split_ind, df_split, df_func_name)
for split_ind, df_split in enumerate(splits)
]
if intermediate_func_on_each_split:
results = pool.map(
partial(
intermediate_func_on_each_split, # Function partially evaluated per process/thread
_run_df_func_on_split,
*args,
**kwargs
),
pool_data,
)
else:
results = pool.map(partial(_run_df_func_on_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x: x[0])
results = pd.concat([split[1] for split in results], axis="rows")
results = pd.concat([df, results], axis="columns")
return results
if __name__ == '__main__':
my_df = pd.DataFrame(...)
result = multiprocessDF(
# args that are sent to df_inner_func which is applied on each row
*args
# kwargs used by multiprocessDF
df=my_df,
subset=['list','of','columns','to','process'],
intermediate_func_on_each_split=open_dataset_and_call_func,
# kwargs used by intermediate_func_on_each_split
dataset_path='path/of/data/to/read',
# kwargs that define the pandas function to be called on the dataframe
df_func_name="apply",
df_inner_func=func_to_run_on_each_row,
axis="columns",
)
@adriantre
Copy link
Author

[WIP] The above code is over-the-top generalised.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment