Skip to content

Instantly share code, notes, and snippets.

@stephanie-wang
Created February 16, 2024 19:22
Show Gist options
  • Save stephanie-wang/300160e9877194161cbc28c790342aa1 to your computer and use it in GitHub Desktop.
Save stephanie-wang/300160e9877194161cbc28c790342aa1 to your computer and use it in GitHub Desktop.
Ray Data demo
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Ingesting image datasets for training with PyTorch\n",
"\n",
"This example demonstrates how to work with image training datasets using [Ray Data](data) on a single node.\n",
"\n",
"In this example, we'll use Ray Data to:\n",
"1. Load and preprocess raw images for training.\n",
"2. Load preprocessed images into PyTorch using [Ray Train](train) to train and validate an object classification model.\n",
"\n",
"\n",
"## Before You Begin\n",
"\n",
"Install the following dependencies if you haven't already."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install \"ray[data]\" torchvision awscli"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's first download an image dataset to use. If you already have an image dataset on local disk or in S3, you can skip the rest of this section.\n",
"Otherwise, follow on to download a dataset from Kaggle.\n",
"For this example, we'll use bash to download a [4GB subset of the ImageNet dataset](https://www.kaggle.com/datasets/ifigotin/imagenetmini-1000) and (optionally) upload it to an S3 bucket.\n",
"\n",
"```bash\n",
"$ pip install kaggle awscli\n",
"$ kaggle datasets list -s ifigotin/imagenetmini-1000\n",
"# ref title size lastUpdated downloadCount voteCount usabilityRating \n",
"# -------------------------- -------------------- ---- ------------------- ------------- --------- --------------- \n",
"# ifigotin/imagenetmini-1000 ImageNet 1000 (mini) 4GB 2020-03-10 01:05:11 11779 133 0.375\n",
"$ kaggle datasets download ifigotin/imagenetmini-1000\n",
"$ unzip imagenetmini-1000.zip -d /tmp\n",
"```\n",
"\n",
"The directory structure should look like this:\n",
"```bash\n",
"$ tree /tmp/imagenet-mini\n",
"# ── train\n",
"# │ ├── n01440764\n",
"# │ │ ├── n01440764_10043.JPEG\n",
"# │ │ ├── n01440764_10470.JPEG\n",
"# │ │ ├── n01440764_10744.JPEG\n",
"# ...\n",
"# └── val\n",
"# ├── n01440764\n",
"# │ ├── ILSVRC2012_val_00009111.JPEG\n",
"# │ ├── ILSVRC2012_val_00030740.JPEG\n",
"# ...\n",
"```\n",
"\n",
"If you want to read from S3, run the following commands and pass the S3 URI `s3://imagenetmini-1000/train` in the following Python code.\n",
"Otherwise, you can pass the local path `/tmp/imagenet-mini/train`.\n",
"```bash\n",
"$ aws s3 mb s3://imagenetmini-1000\n",
"$ aws s3 sync /tmp/imagenet-mini s3://imagenetmini-1000\n",
"$ aws s3 ls s3://imagenetmini-1000/train/\n",
"# PRE n01440764/\n",
"# PRE n01443537/\n",
"# PRE n01484850/\n",
"# ...\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading the training dataset\n",
"\n",
"Let's start by loading the training dataset and examining the data.\n",
"To speed things up, we'll first download the dataset from S3 to local disk.\n",
"If you're using a multi-node cluster, you can also just read directly from S3.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!aws s3 sync s3://sampl-example-data/imagenetmini-1000/ /tmp/imagenet-mini"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can use {meth}`~ray.data.Dataset.take_batch` to produce a single batch of data.\n",
"Later, we'll apply preprocessing transforms to the images, one batch at a time."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import ray\n",
"from PIL import Image\n",
"\n",
"\n",
"TRAIN_DATASET_URL = \"/tmp/imagenet-mini/train\"\n",
"\n",
"# Uncomment to read directly from S3.\n",
"# This is recommended if you are using multiple nodes for training\n",
"# or if you do not have enough disk space to hold the entire dataset.\n",
"# TRAIN_DATASET_URL = \"s3://imagenetmini-1000/train\"\n",
"\n",
"ds = ray.data.read_images(TRAIN_DATASET_URL, mode=\"RGB\")\n",
"\n",
"batch = ds.take_batch(batch_size=2)\n",
"print(batch)\n",
"# {'image': array([array([[[110, 117, 123],\n",
"# [111, 118, 124],\n",
"# [113, 118, 124],\n",
"# ...,\n",
"# [164, 191, 210]]], dtype=uint8)], dtype=object)}\n",
"\n",
"def show(image_array):\n",
" image = Image.fromarray(image_array)\n",
" display(image) \n",
"\n",
"show(batch[\"image\"][0])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When we pass the `Dataset` to Ray Train, we will receive a {class}`ray.data.DataIterator` on each training worker.\n",
"Let's use that API to iterate through the dataset here.\n",
"We'll limit the dataset to 1000 rows to shorten the execution and we'll use {meth}`ray.data.DataIterator.iter_torch_batches` since we'll be using a PyTorch model later on.\n",
"\n",
"Try commenting back in the code below and see what happens."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# This is the interface that will be used by Ray Train workers.\n",
"it = ds.limit(1000).iterator()\n",
"\n",
"# for torch_batch in it.iter_torch_batches(batch_size=32):\n",
"# print(torch_batch)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Uh-oh, did you get an error above?\n",
"\n",
"You should have seen an error like this:\n",
"```python\n",
"RuntimeError: Numpy array of object dtype cannot be converted to a Torch Tensor. This may because the numpy array is a ragged tensor--it contains items of different sizes. If using `iter_torch_batches()` API, you can pass in a `collate_fn` argument to specify custom logic to convert the Numpy array batch to a Torch tensor batch.\n",
"```\n",
"\n",
"This happens because each of our images is a different size. Later we'll crop all of the images to the same size, so we can revisit this then."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Extracting labels from the pathname\n",
"\n",
"We also want our dataset to include the label for each image.\n",
"For images where the subdirectory name is also the class name, we can use a {class}`~ray.data.datasource.Partitioning` to extract the class name from each filename and attach it as an additional field to each row."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from ray.data.datasource.partitioning import Partitioning\n",
"\n",
"ds = ray.data.read_images(TRAIN_DATASET_URL, mode=\"RGB\", partitioning=Partitioning(\"dir\", field_names=[\"class\"], base_dir=TRAIN_DATASET_URL))\n",
"batch = ds.take_batch(batch_size=2)\n",
"print(batch[\"class\"])\n",
"# ['n01608432' 'n01608432']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you have a different directory structure, then you can instead attach each image's filename using `include_paths=True` and add a custom {meth}`~ray.data.Dataset.map_batches` call to extract the class label."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from ray.data.datasource.partitioning import Partitioning\n",
"import os\n",
"\n",
"ds = ray.data.read_images(TRAIN_DATASET_URL, mode=\"RGB\", include_paths=True)\n",
"batch = ds.take_batch(batch_size=2)\n",
"print(batch[\"path\"])\n",
"# array(['/tmp/imagenet-mini/train/n01560419/n01560419_2363.JPEG',\n",
"# '/tmp/imagenet-mini/train/n01560419/n01560419_2368.JPEG'],\n",
"# dtype=object)\n",
"\n",
"def extract_class_from_basename(batch):\n",
" batch[\"class\"] = [os.path.basename(path).split(\"_\")[0] for path in batch[\"path\"]]\n",
" return batch\n",
"\n",
"ds = ds.map_batches(extract_class_from_basename)\n",
"batch = ds.take_batch(batch_size=2)\n",
"print(batch[\"class\"])\n",
"# array(['n01530575', 'n01530575'], dtype=object)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To extract the class as an integer, we can do another `map_batches` call to convert the string to an integer.\n",
"\n",
"If your class names don't already have an integer in them, we can also assign each class name an integer. To do this, we first call {meth}`~ray.data.Dataset.unique` to get the unique label values and assign each value an integer."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"ds = ray.data.read_images(TRAIN_DATASET_URL, mode=\"RGB\", partitioning=Partitioning(\"dir\", field_names=[\"class\"], base_dir=TRAIN_DATASET_URL))\n",
"# Create a dict mapping from class_name -> integer.\n",
"classes = ds.unique(column=\"class\")\n",
"classes_to_idx = {class_name: idx for idx, class_name in enumerate(classes)}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we call `map_batches` again to convert each image's class from a string to an integer."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Convert the string class names to integers in the dataset.\n",
"def convert_class_to_idx(row, classes_to_idx):\n",
" row[\"class\"] = classes_to_idx[row[\"class\"]]\n",
" return row\n",
"\n",
"ds = ray.data.read_images(TRAIN_DATASET_URL, mode=\"RGB\", partitioning=Partitioning(\"dir\", field_names=[\"class\"], base_dir=TRAIN_DATASET_URL))\n",
"ds = ds.map(convert_class_to_idx, fn_kwargs={\"classes_to_idx\": classes_to_idx})\n",
"batch = ds.take_batch(batch_size=2)\n",
"print(batch[\"class\"])\n",
"# [0 0]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Preprocessing images\n",
"\n",
"Next we'll apply some preprocessing transforms to our images.\n",
"Let's use [torchvision](https://pytorch.org/vision/stable/index.html) to define a preprocessor function that randomly crops and flips an image, then returns the image as a `torch.Tensor`.\n",
"This code matches the spec for the [MLPerf image classification benchmark](https://github.com/mlcommons/training/tree/master/image_classification)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import torch\n",
"import torchvision\n",
"import numpy as np\n",
"\n",
"DEFAULT_IMAGE_SIZE = 224\n",
"\n",
"def crop_and_flip_image(row):\n",
" transform = torchvision.transforms.Compose(\n",
" [ \n",
" torchvision.transforms.RandomResizedCrop(\n",
" antialias=True,\n",
" size=DEFAULT_IMAGE_SIZE,\n",
" scale=(0.05, 1.0),\n",
" ratio=(0.75, 1.33),\n",
" ),\n",
" torchvision.transforms.RandomHorizontalFlip(),\n",
" ] \n",
" )\n",
"\n",
" # Make sure to use torch.tensor here to avoid a copy from numpy.\n",
" row[\"image\"] = transform(\n",
" torch.tensor(np.transpose(row[\"image\"], axes=(2, 0, 1)))\n",
" )\n",
" return row\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we can try the function out manually on one of the images that we produced earlier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"\n",
"print(\"Before\")\n",
"row = ds.take(1)[0]\n",
"show(row[\"image\"])\n",
"\n",
"row = crop_and_flip_image(row)\n",
"print(\"After\")\n",
"cropped_and_flipped_img = np.transpose(\n",
" np.array(row[\"image\"], dtype=np.uint8),\n",
" axes=(1, 2, 0))\n",
"show(cropped_and_flipped_img)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's apply the preprocessor to the entire dataset."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def get_training_dataset(url):\n",
" ds = ray.data.read_images(url, mode=\"RGB\", partitioning=Partitioning(\"dir\", field_names=[\"class\"], base_dir=url))\n",
" ds = ds.map(convert_class_to_idx, fn_kwargs={\"classes_to_idx\": classes_to_idx})\n",
" ds = ds.map(crop_and_flip_image)\n",
" return ds"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now that all of the images have the same dimensions, we're ready to iterate using {meth}`ray.data.DataIterator.iter_torch_batches`.\n",
"\n",
"Note that because we're using a randomized preprocessor, each time we call `DataIterator.iter_torch_batches`, it will produce different results.\n",
"This can be helpful during training, if you want to randomly apply transforms during each epoch."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# This is the interface that will be used by Ray Train workers.\n",
"ds = get_training_dataset(TRAIN_DATASET_URL)\n",
"it = ds.limit(1000).iterator()\n",
"\n",
"num_rows_read = 0\n",
"for torch_batch in it.iter_torch_batches(batch_size=32):\n",
" num_rows_read += len(torch_batch[\"image\"])\n",
"print(f\"Read {num_rows_read} rows.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment