Last active
July 7, 2018 04:31
-
-
Save ogrisel/b6a97ed87939e3b559568ac2f6599cba to your computer and use it in GitHub Desktop.
Mean target value encoding for categorical variable using dask (take 2)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import os.path as op | |
from time import time | |
import dask.dataframe as ddf | |
import dask.array as da | |
from distributed import Client | |
def make_categorical_data(n_samples=int(1e7), n_features=10, n_partitions=100): | |
"""Generate some random categorical data | |
The default parameters should generate around 1GB of random integer data | |
with increasing cardinality along with a normally distributed real valued | |
target variable. | |
""" | |
feature_names = ['f_%03d' % i for i in range(n_features)] | |
features_series = [ | |
da.random.randint(low=0, high=(i + 1) * 10, size=n_samples, | |
chunks=n_samples // n_partitions) | |
for i in range(n_features) | |
] | |
features_series = [ | |
ddf.from_dask_array(col_data, columns=[feature_name]) | |
for col_data, feature_name in zip(features_series, feature_names) | |
] | |
target = da.random.normal(loc=0, scale=1, size=n_samples, | |
chunks=n_samples // 10) | |
target = ddf.from_dask_array(target, columns=['target']) | |
data = ddf.concat(features_series + [target], axis=1) | |
data = data.repartition(npartitions=n_partitions) | |
return data | |
def target_mean_transform(data, feature_colname, target_colname): | |
if data[feature_colname].dtype.kind not in ('i', 'O'): | |
# Non-categorical variables are kept untransformed: | |
return data[feature_colname] | |
data = data[[feature_colname, target_colname]] | |
target_means = data.groupby(feature_colname).mean() | |
encoded_col = ddf.merge(data[feature_colname].to_frame(), target_means, | |
left_on=feature_colname, right_index=True, | |
how='left')[target_colname].to_frame() | |
new_colname = feature_colname + '_mean_' + target_colname | |
encoded_col = encoded_col.rename(columns={target_colname: new_colname}) | |
# Hack: left join should preserve divisions which are required for | |
# efficient downstream concat with axis=1 (but is it always true?). | |
encoded_col.divisions = data.divisions | |
return encoded_col | |
def encode_with_target_mean(data, target_colname='target'): | |
"""Supervised encoding of categorical variables with per-group target mean. | |
All columns that contain integer values are replaced by real valued data | |
representing the average target value for each category. | |
""" | |
features_data = data.drop(target_colname, axis=1) | |
target_data = data[target_colname].to_frame() | |
# Sequential processing of columns: there is no need to parallelize | |
# column-wise as the inner-operation will already parallelize record-wise. | |
encoded_columns = [target_mean_transform(data, colname, target_colname) | |
for colname in features_data.columns] | |
return ddf.concat(encoded_columns + [target_data], axis=1) | |
if __name__ == '__main__': | |
# make sure dask uses the distributed scheduler: | |
# Start the scheduler and at least one worker with: | |
# $ dask-scheduler | |
# $ dask-worker localhost:8786 | |
# | |
c = Client('localhost:8786') | |
original_folder_name = op.abspath('random_categorical_data') | |
encoded_folder_name = op.abspath('random_encoded_data') | |
if not op.exists(original_folder_name): | |
print("Generating random categorical data in", original_folder_name) | |
os.mkdir(original_folder_name) | |
data = make_categorical_data(n_partitions=10) | |
ddf.to_parquet(data, original_folder_name) | |
t0 = time() | |
print("Using data from", original_folder_name) | |
data = ddf.read_parquet(original_folder_name) | |
print("Encoding categorical variables...") | |
encoded = encode_with_target_mean(data, target_colname='target') | |
print("Saving encoded data to", encoded_folder_name) | |
# Repartition to get small parquet files in the output folder. | |
# encoded = encoded.repartition(npartitions=10) | |
ddf.to_parquet(encoded, encoded_folder_name) | |
print("done in %0.3fs" % (time() - t0)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import os.path as op | |
import os | |
from gzip import GzipFile | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
import pandas as pd | |
DATA_FOLDER = '~/data/criteo' | |
OUTPUT_FOLDER = '~/data/criteo_parquet' | |
CHUNK_SIZE = 500 * 1024 ** 2 | |
COLUMN_NAMES = ['label'] | |
COLUMN_NAMES += ['num_%02d' % (i + 1) for i in range(13)] | |
COLUMN_NAMES += ['cat_%02d' % (i + 1) for i in range(26)] | |
def parse_chunk(data, previous_remainder=b"", missing_value=-1): | |
"""Parse a chunk of bytes""" | |
lines = data.splitlines() | |
remainder = lines[-1] | |
lines = lines[:-1] | |
lines[0] = previous_remainder + lines[0] | |
# Everything can be stored as an integer, even the numerical values. | |
parsed_array = np.empty(shape=(len(lines), len(COLUMN_NAMES)), | |
dtype=np.int32) | |
parsed_array.fill(missing_value) | |
invalid_count = 0 | |
for i, line in enumerate(lines): | |
fields = line[:-1].split(b'\t') | |
if len(fields) != 40: | |
invalid_count += 1 | |
continue | |
for j, x in enumerate(fields): | |
if not x: | |
# skip missing values | |
continue | |
parsed_array[i, j] = int(x) if j < 14 else int(x, 16) | |
return parsed_array, remainder, invalid_count | |
def convert_file_to_parquet(filepath, output_folder, compression='SNAPPY'): | |
"""Convert gzipped TSV data to smaller parquet files | |
Gzip does not allow for efficient seek-based file access and TSV | |
is IO intensive and slow to parse. | |
Converting to compressed parquet files makes it possible to efficiently | |
access record-wise or column-wise chunks of the data using the | |
dask.dataframe API. | |
""" | |
invalid_lines_count = 0 | |
filebase, _ = op.splitext(op.basename(filepath)) | |
output_filepattern = op.join(output_folder, filebase + '_{:04d}.parquet') | |
chunk_idx = 0 | |
print(f"Processing {filepath}...") | |
with GzipFile(filepath) as f: | |
remainder = b'' | |
while True: | |
try: | |
# Process uncompressed chunks of ~500 MiB at a time: | |
bytes_chunk = f.read(CHUNK_SIZE) | |
except EOFError: | |
print("Warning: truncated file:", filepath) | |
break | |
parsed_chunk, remainder, invalid_count = parse_chunk( | |
bytes_chunk, previous_remainder=remainder) | |
if len(parsed_chunk) == 0: | |
break | |
invalid_lines_count += invalid_count | |
outpath = output_filepattern.format(chunk_idx) | |
df = pd.DataFrame(parsed_chunk, columns=COLUMN_NAMES) | |
arrow_table = pa.Table.from_pandas(df) | |
pq.write_table(arrow_table, outpath, use_dictionary=False, | |
compression=compression) | |
print(f"Wrote {outpath}") | |
chunk_idx += 1 | |
if invalid_lines_count: | |
print(f"Found {invalid_lines_count} invalid lines in {filepath}") | |
def convert_folder_to_parquet(input_folder, output_folder): | |
input_folder = op.expanduser(input_folder) | |
output_folder = op.expanduser(output_folder) | |
if not op.exists(output_folder): | |
os.makedirs(output_folder) | |
for filename in os.listdir(input_folder): | |
if not filename.endswith('.gz'): | |
continue | |
filepath = op.join(input_folder, filename) | |
convert_file_to_parquet(filepath, output_folder) | |
if __name__ == "__main__": | |
convert_folder_to_parquet(DATA_FOLDER, OUTPUT_FOLDER) |
This is a version of a previous gist that was actually broken. The old version would mistakenly serialize intermediate dask.data frames to in-memory pandas data frames.
This new version does not have this problem as it does not try to use dask.delayed
at all. Each column is processed sequentially and we just really on the parallelism of the underlying operations instead.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here is the output of the execution:
and the task log: