Last active
November 13, 2019 14:12
-
-
Save adriantre/b4eb34f05a21b37070e309d5709caebb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial | |
import multiprocessing | |
import warnings | |
import rasterio | |
import pandas as pd | |
import numpy as np | |
def func_to_run_on_each_row(): | |
# My function that should be called by apply | |
# i.e. df.apply(func=funcToRunOnEachRow) | |
# In my case, this function reads a part of the specified image. | |
pass | |
def open_dataset_and_call_func( | |
func_to_call, *args, dataset_path=None, df_inner_func=None, **kwargs | |
): | |
""" A function to be used in multiprocessing that opens a dataset and passes it to a function | |
Opens a dataset and partially evaluates the function df_inner_func. | |
The dataset is passed to the inputed function, along with the last args argument. | |
Is to be used in multiprocessing, as each process or thread needs each own dataset handler | |
""" | |
# exclude the last arg, as this is partially evaluated (later), containing the df_split | |
df_innfer_func_args = args[:-1] | |
func_arg = args[-1] | |
with rasterio.open(dataset_path) as image_datasource: | |
# func is the function to be called within the dataframe function | |
# e.g. df.apply(func=func_to_run_on_each_row) | |
# here we partially evaluate it with the dataset handler, | |
# so that each process use its own dataset handler. | |
df_inner_func = partial( | |
df_inner_func, image_datasource, *df_innfer_func_args | |
) | |
result = func_to_call(func_arg, func=df_inner_func, **kwargs) | |
return result | |
def _run_df_func_on_split(tup_arg, **kwargs): | |
split_ind, df_split, df_func_name = tup_arg | |
# becomes e.g: df_split.apply(func=func_to_run_on_each_row(args), axis='columns') | |
return (split_ind, getattr(df_split, df_func_name)(**kwargs)) | |
def multiprocessDF( | |
*args, | |
df=None, | |
subset=None, | |
njobs=-1, | |
use_threading=False, | |
intermediate_func_on_each_split=None, | |
df_func_name=None, | |
**kwargs | |
) -> pd.DataFrame: | |
"""Runs a dataframe function on a dataframe in parallell""" | |
if njobs == -1: | |
njobs = multiprocessing.cpu_count() | |
if use_threading: | |
pool = multiprocessing.pool.ThreadPool(njobs) | |
else: | |
pool = multiprocessing.Pool(processes=njobs) | |
try: | |
splits = np.array_split(df[subset], njobs) | |
except KeyError: | |
warnings.warn( | |
"The provided column subset was not found in the dataframe" | |
) | |
splits = np.array_split(df, njobs) | |
pool_data = [ | |
(split_ind, df_split, df_func_name) | |
for split_ind, df_split in enumerate(splits) | |
] | |
if intermediate_func_on_each_split: | |
results = pool.map( | |
partial( | |
intermediate_func_on_each_split, # Function partially evaluated per process/thread | |
_run_df_func_on_split, | |
*args, | |
**kwargs | |
), | |
pool_data, | |
) | |
else: | |
results = pool.map(partial(_run_df_func_on_split, **kwargs), pool_data) | |
pool.close() | |
pool.join() | |
results = sorted(results, key=lambda x: x[0]) | |
results = pd.concat([split[1] for split in results], axis="rows") | |
results = pd.concat([df, results], axis="columns") | |
return results | |
if __name__ == '__main__': | |
my_df = pd.DataFrame(...) | |
result = multiprocessDF( | |
# args that are sent to df_inner_func which is applied on each row | |
*args | |
# kwargs used by multiprocessDF | |
df=my_df, | |
subset=['list','of','columns','to','process'], | |
intermediate_func_on_each_split=open_dataset_and_call_func, | |
# kwargs used by intermediate_func_on_each_split | |
dataset_path='path/of/data/to/read', | |
# kwargs that define the pandas function to be called on the dataframe | |
df_func_name="apply", | |
df_inner_func=func_to_run_on_each_row, | |
axis="columns", | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
[WIP] The above code is over-the-top generalised.