Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active October 11, 2020 17:16
Show Gist options
  • Save rjzamora/d11e5dc442601b913491f211f5b7d0f0 to your computer and use it in GitHub Desktop.
Save rjzamora/d11e5dc442601b913491f211f5b7d0f0 to your computer and use it in GitHub Desktop.
NVTabular-0.2-Dask-Blog-Tutorial
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# NVTabular + Dask Criteo/DLRM Tutorial\n",
"\n",
"This tutorial provides a simplified, step-by-step, version of the preprocessing logic used in [the official dask-criteo benchmark](https://github.com/NVIDIA/NVTabular/blob/main/examples/dask-nvtabular-criteo-benchmark.py). For those who wish to work with a smaller dataset, we recommend the [multi-gpu_dask example notebook](https://github.com/NVIDIA/NVTabular/blob/main/examples/multi-gpu_dask.ipynb) included in the NVTabular repository."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 1 - Define Dask Cluster and Client\n",
"\n",
"Since Dask-CuDF and NVTabular are now tightly integrated, NVTabular will always use Dask for data partitioning and pre-processing. With that said, the default behavior is to utilize a [\"synchronous\"](https://docs.dask.org/en/latest/scheduling.html) task scheduler, which precludes distributed processing. In order to properly utilize a multi-GPU system with NVTabular, there must be a `dask.distributed` cluster deployed on the system.\n",
"\n",
"There are many different ways to create a distributed Dask cluster (and corresponding `Client` object). [This blog article](https://blog.dask.org/2020/07/23/current-state-of-distributed-dask-clusters) provides an excellent overview of the various cluster-deployment utilities available. For multi-node users, the Dask cluster is often deployed through a script or command-line interface, and the NVTabular Dask client can be defined by specifying the port of the scheduler:\n",
"\n",
"```python\n",
"from dask.distributed import Client\n",
"\n",
"cluster = \"tcp://MachineA:8786\"\n",
"client = Client(cluster)\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When using a single node/server with multiple GPUs, the [LocalCUDACluster](https://github.com/rapidsai/dask-cuda/blob/628a1d3a422c46bf244a3cfe8b2d3efc7a5c833f/dask_cuda/local_cuda_cluster.py#L44) API (provided by the [Dask-CUDA](https://github.com/rapidsai/dask-cuda) library) is particularly convenient. This class makes it easy to deploy a local GPU-worker cluster with automated device-to-host memory spilling, and (optionally) enables the use of NVLink and Infiniband-based inter-process communication via UCX. For example, the `LocalCUDACluster` version of the cluster/client creation code would look more like this:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask_cuda import LocalCUDACluster\n",
"from dask.distributed import Client\n",
"from nvtabular.utils import device_mem_size\n",
"\n",
"# Choose memory-spilling threshold\n",
"spill_limit = device_mem_size(kind=\"total\") * 0.8\n",
"\n",
"cluster = LocalCUDACluster(\n",
" protocol=\"ucx\", # Comm protocol (\"tcp\" or \"ucx\")\n",
" local_directory=SCRATCH_DIR, # Scratch space for Dask workers\n",
" device_memory_limit=spill_limit,\n",
")\n",
"client = Client(cluster)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"By default, this API will respect the `CUDA_VISIBLE_DEVICES` environment variable when creating Dask-CUDA workers, but the user can override this behavior with the `n_workers` argument. Since it is common for ETL-based workflows to exceed the total device memory of a system, the cluster can be initialized with a specific `device_memory_limit` for the worker processes. This option corresponds to the threshold at which a Dask-CUDA worker will begin moving (spilling) input and output data for pending and finished tasks out of device memory, and into host memory. Since the worker can only spill input and output data, the threshold must be somewhat lower than the actual capacity of a single device.\n",
"\n",
"Regardless of the size of the cluster, or the total amount of device memory, it is typically a good idea to initialize a memory pool on each of the workers after initialization. This can dramatically reduce the total time required for CUDA memory allocations, and can be accomplished with the `client.run` method:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import rmm\n",
"\n",
"# Choose initial pool size\n",
"pool_size = device_mem_size(kind=\"total\") * 0.9 \n",
"\n",
"client.run(\n",
" rmm.reinitialize,\n",
" pool_allocator=True,\n",
" initial_pool_size=pool_size,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 2 - Define an NVTabular Workflow\n",
"\n",
"The `Workflow` API is the central component of NVTabular for data preprocessing. This API is used to specify a general description of the raw data, and to add both feature-engineering and preprocessing operations to create a GPU-accelerated transformation pipeline. This API is also critical for multi-GPU scaling, because it allows the user to specify a Dask client. In fact, since NVTabular is already built on Dask-CuDF, this `client` argument is the **only** API requirement for multi-GPU scaling. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import nvtabular as nvt\n",
"import nvtabular.ops as ops\n",
"\n",
"# Specify continuous, categorical, and label columns.\n",
"cont_names = [\"I\" + str(x) for x in range(1, 14)]\n",
"cat_names = [\"C\" + str(x) for x in range(1, 27)]\n",
"label_name = [\"label\"]\n",
"\n",
"# Initialize the Workflow.\n",
"# Be sure to pass the Dask client for multi-GPU systems!\n",
"workflow = nvt.Workflow(\n",
" cat_names=cat_names,\n",
" cont_names=cont_names,\n",
" label_name=label_name,\n",
" client=client,\n",
")\n",
"\n",
"# Add operators without statistics dependencies using `add_feature`\n",
"workflow.add_feature([ops.FillMissing(), ops.Clip(min_value=0), ops.LogOp()])\n",
"\n",
"# Add other operators using `add_preprocess`.\n",
"# Here, we use `Categorify`\n",
"\n",
"# Specify “special” high-cardinality columns\n",
"high_card = (\"C1\", \"C10\", \"C20\", \"C22\")\n",
"# Cache larger statistics on host (rather than device) \n",
"cat_cache = {col: \"host\" if col in high_card else \"device\" for col in cat_names}\n",
"# Use a larger tree reduction for high-cardinality columns\n",
"tree_width = {col: 4 if col in high_card else 1 for col in cat_names}\n",
"workflow.add_preprocess(\n",
" ops.Categorify(\n",
" out_path=OUTPUT_PATH,\n",
" tree_width=tree_width,\n",
" cat_cache=cat_cache,\n",
" on_host=True,\n",
" )\n",
")\n",
"workflow.finalize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3 - Define an NVTabular Dataset\n",
"\n",
"The `nvt.Workflow` class requires data to be represented as an `nvt.Dataset`. This convention allows NVTabular to abstract away the raw format of the data, and convert everything to a consistent `dask_cudf.DataFrame` representation. Since the `Dataset` API effectively wraps functions like `dask_cudf.read_parquet`, the syntax is very simple and the computational cost is minimal.\n",
"\n",
"**Important Considerations**:\n",
"\n",
"- A `Dataset` can be initialized with the following objects:\n",
" - 1+ file/directory paths - An `engine` argument is required to specify the file format (unless file names are appended with “csv” or “parquet”)\n",
" - `cudf.DataFrame` - Internal `ddf` will have 1 partition.\n",
" - `pandas.DataFrame` - Internal `ddf` will have 1 partition.\n",
" - `pyarrow.Table` - Internal `ddf` will have 1 partition.\n",
" - `dask_cudf.DataFrame` - Internal `ddf` will be a shallow copy of the input.\n",
" - `dask.dataframe.DataFrame` - Internal ddf will be a direct Pandas-to-CuDF conversion of the input.\n",
"- For file-based data initialization, the size of the internal `ddf` partitions will be chosen according to the following arguments (in order of precedence):\n",
" - `part_size` - Desired maximum size of each partition **in bytes**. Note that you can pass a string here. like `\"2GB\"`.\n",
" - `part_mem_fraction` - Desired maximum size of each partition as a **fraction of total GPU memory**.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Use optimized parquet version of Criteo dataset\n",
"dataset = nvt.DaskDataset(\n",
" CRITEO_DATASET_PATH, engine=\"parquet\", part_mem_fraction=0.16\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 4 - Execute the Workflow\n",
"\n",
"\n",
"Once the `Dataset` and `Workflow` objects are both defined, the only thing left to do is *execute* the NVTabular `Workflow` using the `apply` method. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from nvtabular.io import Shuffle\n",
"\n",
"workflow.apply(\n",
" dataset,\n",
" shuffle=nvtabular.io.Shuffle.PER_PARTITION,\n",
" output_format=\"parquet\",\n",
" out_files_per_proc=24,\n",
" output_path=OUTPUT_PATH,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The code shown above will lead to the following execution procedure:\n",
"\n",
"1. Convert `Dataset` to internal `ddf`\n",
"2. Apply `Workflow` Phases\n",
" 1. Compute statistical dependencies on `ddf` (eager)\n",
" 2. Transform `ddf` (lazy)\n",
" 3. Repeat if multiple passes are required\n",
"3. Write to Disk (Optional)\n",
" 1. Shuffle transformed `ddf` partitions\n",
" 2. Append shuffled partitions into `out_files_per_proc` parquet files"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Optional - Follow-up Processing with Dask-CuDF\n",
"\n",
"Since all transformations are lazily applied to an underlying Dask-CuDF DataFrame (`ddf`), `apply` will only trigger the complete execution of all transforms if an `output_format` option is specified. If the user elects not to write the data to disk, the transformations will not be executed, but the transformations will be recorded in the DAG of the underlying ddf. This means the lazily transformed data can be operated on with the Dask-CuDF API, or can be used to initialize a new `Dataset` API for further NVTabular consumption (e.g loaded into PyTorch or Tensorflow for RecSys training)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment