Created
September 20, 2023 11:49
-
-
Save daskol/7fd557dd3105bd2ebcd331a2a9b0b55a to your computer and use it in GitHub Desktop.
Benchmark DALI against PyTorch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import getenv | |
from pathlib import Path | |
from typing import Callable | |
import numpy as np | |
import nvidia.dali.fn as fn | |
import nvidia.dali.ops as ops | |
import torch as T | |
from nvidia.dali.pipeline import pipeline_def | |
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy | |
from nvidia.dali.types import (FLOAT, GRAY, INT64, INTERP_CUBIC, NCHW, RGB, | |
UINT8, Constant, DALIDataType) | |
from PIL import Image | |
DATA_DIR = Path(getenv('DATA_DIR', 'ImageNet100')) | |
IMAGENET_DEFAULT_MEAN = np.array((0.485, 0.456, 0.406)) | |
IMAGENET_DEFAULT_STD = np.array((0.229, 0.224, 0.225)) | |
class RandomGrayScaleConversion: | |
def __init__(self, prob: float = 0.2, device: str = 'gpu'): | |
self.prob = prob | |
self.grayscale = ops.ColorSpaceConversion(device=device, | |
image_type=RGB, | |
output_type=GRAY) | |
@property | |
def device(self): | |
return self.grayscale.device | |
def __call__(self, images): | |
do_op = fn.random.coin_flip(probability=self.prob, | |
dtype=DALIDataType.BOOL) | |
if do_op: | |
images = self.grayscale(images) | |
images = fn.cat(images, images, images, axis=2) | |
return images | |
class RandomColorJitter: | |
def __init__(self, brightness: float, contrast: float, saturation: float, | |
hue: float, prob: float = 0.8, device: str = 'gpu'): | |
assert 0 <= hue <= 0.5 | |
self.prob = prob | |
self.color = ops.ColorTwist(device=device) | |
brightnes_range = [max(0, 1 - brightness), 1 + brightness] | |
self.brightness = ops.random.Uniform(range=brightnes_range) | |
contrast_range = [max(0, 1 - contrast), 1 + contrast] | |
self.contrast = ops.random.Uniform(range=contrast_range) | |
saturation_range = [max(0, 1 - saturation), 1 + saturation] | |
self.saturation = ops.random.Uniform(range=saturation_range) | |
hue = 360 * hue | |
hue_range = [-hue, hue] | |
self.hue = ops.random.Uniform(range=hue_range) | |
@property | |
def device(self): | |
return self.color.device | |
def __call__(self, images): | |
do_op = fn.random.coin_flip(probability=self.prob, | |
dtype=DALIDataType.BOOL) | |
if do_op: | |
images = self.color(images, | |
brightness=self.brightness(), | |
contrast=self.contrast(), | |
saturation=self.saturation(), | |
hue=self.hue()) | |
return images | |
class RandomGaussianBlur: | |
def __init__(self, prob: float = 0.5, window_size: int = 23, | |
device: str = 'gpu'): | |
self.prob = prob | |
self.blur = ops.GaussianBlur(device=device, window_size=window_size) | |
self.sigma = ops.random.Uniform(range=[0, 1]) | |
@property | |
def device(self): | |
return self.blur.device | |
def __call__(self, images): | |
do_op = fn.random.coin_flip(probability=self.prob, | |
dtype=DALIDataType.BOOL) | |
if not do_op: | |
return images | |
sigma = self.sigma() * 1.9 + 0.1 | |
return self.blur(images, sigma=sigma) | |
class RandomSolarize: | |
def __init__(self, threshold: int = 128, prob: float = 0.0): | |
self.prob = prob | |
self.threshold = threshold | |
def __call__(self, images): | |
do_op = fn.random.coin_flip(probability=self.prob, | |
dtype=DALIDataType.BOOL) | |
if not do_op: | |
return images | |
inverted_img = Constant(255, dtype=UINT8) - images | |
mask = images >= self.threshold | |
return mask * inverted_img + (True ^ mask) * images | |
class RandomCropMirrorNormalize: | |
def __init__(self, proba: float = 0.5, device: str = 'gpu'): | |
self.coin = ops.random.CoinFlip(probability=proba) | |
self.cmn = ops.CropMirrorNormalize(device=device, | |
dtype=FLOAT, | |
output_layout=NCHW, | |
mean=255 * IMAGENET_DEFAULT_MEAN, | |
std=255 * IMAGENET_DEFAULT_STD) | |
def __call__(self, images): | |
return self.cmn(images, mirror=self.coin()) | |
class NCropAugmentation: | |
def __init__(self, transform: Callable, num_crops: int): | |
self.transform = transform | |
self.num_crops = num_crops | |
def __call__(self, x: Image) -> list[T.Tensor]: | |
return [self.transform(x) for _ in range(self.num_crops)] | |
class FullTransformPipeline: | |
def __init__(self, transforms: Callable) -> None: | |
self.transforms = transforms | |
def __call__(self, x: Image) -> list[T.Tensor]: | |
out = [] | |
for transform in self.transforms: | |
out.extend(transform(x)) | |
return out | |
def make_pipeline(batch_size=512, num_workers=32, device='cpu'): | |
augmentations = [ | |
ops.RandomResizedCrop(device=device, | |
size=224, | |
random_area=(0.2, 1.0), | |
interp_type=INTERP_CUBIC), | |
RandomColorJitter(0.4, 0.4, 0.2, 0.1, prob=0.8, device=device), | |
RandomGrayScaleConversion(prob=0.2, device=device), | |
RandomGaussianBlur(prob=0.5, device=device), | |
RandomSolarize(prob=0.1), | |
RandomCropMirrorNormalize(0.5, device=device), | |
] | |
def augment(images): | |
for augmentation in augmentations: | |
images = augmentation(images) | |
return images | |
# pipelines = [NCropAugmentation(augmentations, 2)] | |
pipelines = [NCropAugmentation(augment, 2)] | |
transform = FullTransformPipeline(pipelines) | |
train_path = DATA_DIR / 'train' | |
device_memory_padding = 211025920 | |
host_memory_padding = 140544512 | |
@pipeline_def(enable_conditionals=True) | |
def make_pipeline(): | |
inputs, labels = fn.readers.file(file_root=train_path, | |
prefetch_queue_depth=4, | |
shuffle_after_epoch=True) | |
# Read images and apply transformations and augmentations. | |
device_decoder = 'mixed' if device == 'gpu' else device | |
images = fn.decoders.image(inputs, | |
output_type=RGB, | |
device=device_decoder, | |
device_memory_padding=device_memory_padding, | |
host_memory_padding=host_memory_padding) | |
augments = transform(images) | |
# PyTorch expects labels as INT64 | |
if device == 'gpu': | |
labels = labels.gpu() | |
labels = fn.cast(labels, dtype=INT64, device=device) | |
return *augments, labels | |
pipeline = make_pipeline(batch_size=batch_size, | |
num_threads=num_workers, | |
device_id=0) | |
pipeline.build() | |
return DALIGenericIterator(pipelines=pipeline, | |
last_batch_policy=LastBatchPolicy.DROP, | |
output_map=['aug0', 'aug1', 'label']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from contextlib import contextmanager | |
from dataclasses import dataclass, field | |
from datetime import datetime, timedelta | |
from os import getenv | |
from pathlib import Path | |
from time import monotonic | |
from typing import Callable, Optional, Sequence | |
import numpy as np | |
import torch as T | |
from PIL import Image, ImageFilter, ImageOps | |
from torch.utils.data import DataLoader | |
from torchvision.datasets import ImageFolder | |
from torchvision.transforms import (ColorJitter, Compose, InterpolationMode, | |
Normalize, RandomApply, RandomGrayscale, | |
RandomHorizontalFlip, RandomResizedCrop, | |
ToTensor) | |
DATA_DIR = Path(getenv('DATA_DIR', 'ImageNet100')) | |
IMAGENET_DEFAULT_MEAN = (0.485, 0.456, 0.406) | |
IMAGENET_DEFAULT_STD = (0.229, 0.224, 0.225) | |
class IndexedImageFolder(ImageFolder): | |
def __getitem__(self, index): | |
data = super().__getitem__(index) | |
return (index, *data) | |
class GaussianBlur: | |
def __init__(self, sigma: Sequence[float] = None): | |
if sigma is None: | |
sigma = [0.1, 2.0] | |
self.sigma = sigma | |
def __call__(self, img: Image) -> Image: | |
sigma = random.uniform(self.sigma[0], self.sigma[1]) | |
img = img.filter(ImageFilter.GaussianBlur(radius=sigma)) | |
return img | |
class Solarization: | |
def __call__(self, img: Image) -> Image: | |
return ImageOps.solarize(img) | |
class NCropAugmentation: | |
def __init__(self, transform: Callable, num_crops: int): | |
self.transform = transform | |
self.num_crops = num_crops | |
def __call__(self, x: Image) -> list[T.Tensor]: | |
return [self.transform(x) for _ in range(self.num_crops)] | |
class FullTransformPipeline: | |
def __init__(self, transforms: Callable) -> None: | |
self.transforms = transforms | |
def __call__(self, x: Image) -> list[T.Tensor]: | |
out = [] | |
for transform in self.transforms: | |
out.extend(transform(x)) | |
return out | |
def make_pipeline(batch_size=512, num_workers=32): | |
augmentations = Compose([ | |
RandomResizedCrop(size=224, | |
scale=(0.2, 1.0), | |
interpolation=InterpolationMode.BICUBIC), | |
RandomApply([ColorJitter(0.4, 0.4, 0.2, 0.1)], p=0.8), | |
RandomGrayscale(p=0.2), | |
RandomApply([GaussianBlur()], p=0.5), | |
RandomApply([Solarization()], p=0.1), | |
RandomHorizontalFlip(p=0.5), | |
ToTensor(), | |
Normalize(mean=IMAGENET_DEFAULT_MEAN, std=IMAGENET_DEFAULT_STD), | |
]) | |
pipelines = [NCropAugmentation(augmentations, 2)] | |
transform = FullTransformPipeline(pipelines) | |
train_path = DATA_DIR / 'train' | |
train_dataset = IndexedImageFolder(train_path, transform) | |
return DataLoader( | |
train_dataset, | |
batch_size=batch_size, | |
shuffle=True, | |
num_workers=num_workers, | |
pin_memory=True, | |
drop_last=True, | |
prefetch_factor=10, | |
persistent_workers=True, | |
) | |
@dataclass | |
class Elapsed: | |
seconds: float | |
started_at: datetime = field(default_factory=datetime.now) | |
finished_at: Optional[datetime] = None | |
@property | |
def duration(self) -> timedelta: | |
return timedelta(seconds=self.seconds) | |
@contextmanager | |
def measure(): | |
elapsed = Elapsed(-monotonic()) | |
yield elapsed | |
elapsed.seconds += monotonic() | |
elapsed.finished_at = datetime.now() | |
def test_pipeline(batch_size=512, num_workers=32, num_iters=247): | |
iters = range(num_iters) | |
loader = make_pipeline(batch_size, num_workers) | |
device = T.device('cpu') | |
if T.cuda.is_available(): | |
device = T.device('cuda') | |
timings = np.empty(num_iters + 1) | |
with measure() as elapsed: | |
timings[0] = monotonic() | |
for ix, (index, images, labels) in zip(iters, loader): | |
images = [el.to(device) for el in images] | |
labels = labels.to(device) | |
timings[ix + 1] = monotonic() | |
num_iters = ix + 1 | |
timings = timings[:num_iters] | |
np.save('timings.npy', timings) | |
timings = np.diff(timings) | |
print(timings) | |
print('mean: ', timings.mean()) | |
print('std: ', timings.std()) | |
print('total iters:', num_iters) | |
print('duration: ', elapsed.duration) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment