Skip to content

Instantly share code, notes, and snippets.

@ogrisel
Last active September 29, 2017 15:05
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ogrisel/9886cde997f3e0173a5dc9ca1aae9eab to your computer and use it in GitHub Desktop.
Save ogrisel/9886cde997f3e0173a5dc9ca1aae9eab to your computer and use it in GitHub Desktop.
Mean target value encoding for categorical variable using dask
#
# XXX: do not use this code, it's broken!
# Use: https://gist.github.com/ogrisel/b6a97ed87939e3b559568ac2f6599cba
#
# See comments.
import os
import os.path as op
from time import time
import dask.dataframe as ddf
import dask.array as da
from dask import delayed, compute
from distributed import Client
def make_categorical_data(n_samples=int(1e7), n_features=10):
"""Generate some random categorical data
The default parameters should generate around 1GB of random integer data
with increasing cardinality along with a normally distributed real valued
target variable.
"""
feature_names = ['f_%03d' % i for i in range(n_features)]
features_series = [
da.random.randint(low=0, high=(i + 1) * 10, size=n_samples,
chunks=n_samples // 10)
for i in range(n_features)
]
features_series = [
ddf.from_dask_array(col_data, columns=[feature_name])
for col_data, feature_name in zip(features_series, feature_names)
]
target = da.random.normal(loc=0, scale=1, size=n_samples,
chunks=n_samples // 10)
target = ddf.from_dask_array(target, columns=['target'])
data = ddf.concat(features_series + [target], axis=1)
data = data.repartition(npartitions=10)
return data
def target_mean_transform(data, feature_colname, target_colname):
if data[feature_colname].dtype.kind not in ('i', 'O'):
# Non-categorical variables are kept untransformed:
return data[feature_colname]
data = data[[feature_colname, target_colname]]
target_means = data.groupby(feature_colname).mean()
mapping = target_means.to_dict()[target_colname]
return data[feature_colname].map(mapping)
def encode_with_target_mean(data, target_colname='target'):
"""Supervised encoding of categorical variables with per-group target mean.
All columns that contain integer values are replaced by real valued data
representing the average target value for each category.
"""
features_data = data.drop(target_colname, axis=1)
target_data = data[target_colname]
return delayed(ddf.concat)(
[delayed(target_mean_transform)(data, colname, target_colname)
for colname in features_data.columns] + [target_data],
axis=1
)
if __name__ == '__main__':
# make sure dask uses the distributed scheduler:
# Start the scheduler and at least one worker with:
# $ dask-scheduler
# $ dask-worker localhost:8786
#
c = Client('localhost:8786')
original_folder_name = op.abspath('random_categorical_data')
encoded_folder_name = op.abspath('random_encoded_data')
if not op.exists(original_folder_name):
print("Generating random categorical data in", original_folder_name)
os.mkdir(original_folder_name)
data = make_categorical_data()
ddf.to_parquet(original_folder_name, data)
print("Using data from", original_folder_name)
data = ddf.read_parquet(original_folder_name)
print("Encoding categorical variables...")
encoded = encode_with_target_mean(data, target_colname='target')
print("Saving encoded data to", encoded_folder_name)
t0 = time()
# Repartition to get small parquet files in the output folder.
encoded = encoded.repartition(npartitions=10)
compute(delayed(ddf.to_parquet)(encoded_folder_name, encoded))
print("done in %0.3fs" % (time() - t0))
@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

Here is the output of this script:

Using data from /home/ogrisel/tmp/random_categorical_data
Encoding categorical variables...
Saving encoded data to /home/ogrisel/tmp/random_encoded_data
done in 20.269s

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

Here is the task parallel execution log (on my laptop with 4 threads):

image

What went well:

  • file-wise / record-wise read_parquet (first tasks in blue) is well parallelized (collection API)
  • column-wise target_mean_transform (in purple) is well parallelized as well (dask.delayed calls)
  • the output folder contains 10 parquet files with the right size as expected

What looks wrong:

  • delayed(ddf.concat) and delayed(ddf.to_parquet) (the 2 big green blocks) are not parallelized because (it's a single function call for each) but one could hope that the actual partitions of the underlying dataframe are properly scheduled in parallel by the distributed scheduler. But I am not sure about How such nested parallelism is handled by dask?

Open questions:

  • The imperative and eager calls to to_dict are nested into delayed calls to target_mean_transform: are those nested calls efficiently scheduled by the scheduler? Or are there intermediate results that are recomputed more than necessary?

@mrocklin
Copy link

You should never have to call delayed on any dask.dataframe operation. These are already lazy. So delayed(ddf.concat) is unnecessary (and counter-productive, see point below).

When you call a delayed function on a dask object that dask object will be made into a numpy or pandas dataframe before being passed to your function. If you want to apply that function independently to every partition then I recommend using the map_partitions method.

@TomAugspurger
Copy link

TomAugspurger commented Sep 29, 2017

I think we may want a version of dask.Series.map(val) that works when val is a dask.Series. Taking a look now.

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

So delayed(ddf.concat) is unnecessary

Actually in my case, the columns are generated by delayed calls. Therefore I cannot pass them directly to ddf.concat. I would probably need to introduce an explicit synchronyzation barrier by calling compute on the output of encode_with_target_mean on the output of the [delayed(target_mean_transform)(...)] comprehension but I wanted to make it work in a single call to the scheduler.

It's probably not possible. I will try in a new gist.

When you call a delayed function on a dask object that dask object will be made into a numpy or pandas dataframe before being passed to your function. If you want to apply that function independently to every partition then I recommend using the map_partitions method.

In my case map_partitions does not work as I need to a global groupby for each column (ideally in parallel). I cannot restrict the transformation to be partition-wise.

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

I think we may want a version of dask.Series.map(val) that works when val is a dask.Series. Taking a look now.

That would be nice to have to avoid when the cardinality of the categorical variables would be too big to get the full mapping to fit in memory but I don't think this is a problem for common categorical encoding in practice. Categories with a cardinality in the million scale are small enough to fit in ram without any problem.

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

Maybe I should just not try to do any nesting by parallelizing the column-wise transforms. The internal calls to group-by and map should already parallelize enough.

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

Ok I have a new version of the example where I removed the column-wise parallelism:

https://gist.github.com/ogrisel/b6a97ed87939e3b559568ac2f6599cba

There is no need for delayed anymore (just plain collection operations) and the tasks panel of bokeh makes me think that it's well parallelized like that already.

Also that fixed version only take 10s 13s total instead of 20s.

@TomAugspurger
Copy link

Maybe I should just not try to do any nesting by parallelizing the column-wise transforms. The internal calls to group-by and map should already parallelize enough.

I think that can still be parallelized, if you're able to fit the data in distributed memory.

def encode_with_target_mean(data, target_colname='target'):
    """Supervised encoding of categorical variables with per-group target mean.

    All columns that contain integer values are replaced by real valued data
    representing the average target value for each category.
    """
    features_data = data.drop(target_colname, axis=1)
    target_data = data[target_colname]

    encode_columns = features_data.select_dtypes(['int', 'object']).dtypes.index

    mappings = [target_data.groupby(features_data[col]).mean()
                for col in encode_columns]
    mappings = compute(*mappings)  # explicit compute here, so dd.persist(data) is useful earlier
    mappings = {m.index.name: m for m in mappings}

    for col in encode_columns:
        features_data[col] = features_data[col].map(mappings[col])
        
    return ddf.concat([features_data, target_data], axis=1)

A couple other changes in the main (mostly removing delayed calls): https://gist.github.com/436442baeb353c53bcdf61adf6b32823, which runs in

Using data from /private/tmp/random_categorical_data
Encoding categorical variables...
Saving encoded data to /private/tmp/random_encoded_data
done in 17.253s

compared to 24 seconds for the original version.

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

The flat / pure collection API version runs in 10s 13s. Column-wise parallelism is probably useless. Unless the data has many many columns on a cluster with many many workers.

Edit: I did not include the time of computing the mappings which adds ~3s.

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

BTW in your code you might also want to features_data.select_dtypes(['int', 'object', 'category']).

@ogrisel
Copy link
Author

ogrisel commented Sep 29, 2017

@TomAugspurger thanks for the parallelized mapping version though. Even if slower it's interesting. I had not thought about that option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment