Skip to content

Instantly share code, notes, and snippets.

@daskol
Created September 20, 2023 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daskol/7fd557dd3105bd2ebcd331a2a9b0b55a to your computer and use it in GitHub Desktop.
Save daskol/7fd557dd3105bd2ebcd331a2a9b0b55a to your computer and use it in GitHub Desktop.
Benchmark DALI against PyTorch
from os import getenv
from pathlib import Path
from typing import Callable
import numpy as np
import nvidia.dali.fn as fn
import nvidia.dali.ops as ops
import torch as T
from nvidia.dali.pipeline import pipeline_def
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy
from nvidia.dali.types import (FLOAT, GRAY, INT64, INTERP_CUBIC, NCHW, RGB,
UINT8, Constant, DALIDataType)
from PIL import Image
DATA_DIR = Path(getenv('DATA_DIR', 'ImageNet100'))
IMAGENET_DEFAULT_MEAN = np.array((0.485, 0.456, 0.406))
IMAGENET_DEFAULT_STD = np.array((0.229, 0.224, 0.225))
class RandomGrayScaleConversion:
def __init__(self, prob: float = 0.2, device: str = 'gpu'):
self.prob = prob
self.grayscale = ops.ColorSpaceConversion(device=device,
image_type=RGB,
output_type=GRAY)
@property
def device(self):
return self.grayscale.device
def __call__(self, images):
do_op = fn.random.coin_flip(probability=self.prob,
dtype=DALIDataType.BOOL)
if do_op:
images = self.grayscale(images)
images = fn.cat(images, images, images, axis=2)
return images
class RandomColorJitter:
def __init__(self, brightness: float, contrast: float, saturation: float,
hue: float, prob: float = 0.8, device: str = 'gpu'):
assert 0 <= hue <= 0.5
self.prob = prob
self.color = ops.ColorTwist(device=device)
brightnes_range = [max(0, 1 - brightness), 1 + brightness]
self.brightness = ops.random.Uniform(range=brightnes_range)
contrast_range = [max(0, 1 - contrast), 1 + contrast]
self.contrast = ops.random.Uniform(range=contrast_range)
saturation_range = [max(0, 1 - saturation), 1 + saturation]
self.saturation = ops.random.Uniform(range=saturation_range)
hue = 360 * hue
hue_range = [-hue, hue]
self.hue = ops.random.Uniform(range=hue_range)
@property
def device(self):
return self.color.device
def __call__(self, images):
do_op = fn.random.coin_flip(probability=self.prob,
dtype=DALIDataType.BOOL)
if do_op:
images = self.color(images,
brightness=self.brightness(),
contrast=self.contrast(),
saturation=self.saturation(),
hue=self.hue())
return images
class RandomGaussianBlur:
def __init__(self, prob: float = 0.5, window_size: int = 23,
device: str = 'gpu'):
self.prob = prob
self.blur = ops.GaussianBlur(device=device, window_size=window_size)
self.sigma = ops.random.Uniform(range=[0, 1])
@property
def device(self):
return self.blur.device
def __call__(self, images):
do_op = fn.random.coin_flip(probability=self.prob,
dtype=DALIDataType.BOOL)
if not do_op:
return images
sigma = self.sigma() * 1.9 + 0.1
return self.blur(images, sigma=sigma)
class RandomSolarize:
def __init__(self, threshold: int = 128, prob: float = 0.0):
self.prob = prob
self.threshold = threshold
def __call__(self, images):
do_op = fn.random.coin_flip(probability=self.prob,
dtype=DALIDataType.BOOL)
if not do_op:
return images
inverted_img = Constant(255, dtype=UINT8) - images
mask = images >= self.threshold
return mask * inverted_img + (True ^ mask) * images
class RandomCropMirrorNormalize:
def __init__(self, proba: float = 0.5, device: str = 'gpu'):
self.coin = ops.random.CoinFlip(probability=proba)
self.cmn = ops.CropMirrorNormalize(device=device,
dtype=FLOAT,
output_layout=NCHW,
mean=255 * IMAGENET_DEFAULT_MEAN,
std=255 * IMAGENET_DEFAULT_STD)
def __call__(self, images):
return self.cmn(images, mirror=self.coin())
class NCropAugmentation:
def __init__(self, transform: Callable, num_crops: int):
self.transform = transform
self.num_crops = num_crops
def __call__(self, x: Image) -> list[T.Tensor]:
return [self.transform(x) for _ in range(self.num_crops)]
class FullTransformPipeline:
def __init__(self, transforms: Callable) -> None:
self.transforms = transforms
def __call__(self, x: Image) -> list[T.Tensor]:
out = []
for transform in self.transforms:
out.extend(transform(x))
return out
def make_pipeline(batch_size=512, num_workers=32, device='cpu'):
augmentations = [
ops.RandomResizedCrop(device=device,
size=224,
random_area=(0.2, 1.0),
interp_type=INTERP_CUBIC),
RandomColorJitter(0.4, 0.4, 0.2, 0.1, prob=0.8, device=device),
RandomGrayScaleConversion(prob=0.2, device=device),
RandomGaussianBlur(prob=0.5, device=device),
RandomSolarize(prob=0.1),
RandomCropMirrorNormalize(0.5, device=device),
]
def augment(images):
for augmentation in augmentations:
images = augmentation(images)
return images
# pipelines = [NCropAugmentation(augmentations, 2)]
pipelines = [NCropAugmentation(augment, 2)]
transform = FullTransformPipeline(pipelines)
train_path = DATA_DIR / 'train'
device_memory_padding = 211025920
host_memory_padding = 140544512
@pipeline_def(enable_conditionals=True)
def make_pipeline():
inputs, labels = fn.readers.file(file_root=train_path,
prefetch_queue_depth=4,
shuffle_after_epoch=True)
# Read images and apply transformations and augmentations.
device_decoder = 'mixed' if device == 'gpu' else device
images = fn.decoders.image(inputs,
output_type=RGB,
device=device_decoder,
device_memory_padding=device_memory_padding,
host_memory_padding=host_memory_padding)
augments = transform(images)
# PyTorch expects labels as INT64
if device == 'gpu':
labels = labels.gpu()
labels = fn.cast(labels, dtype=INT64, device=device)
return *augments, labels
pipeline = make_pipeline(batch_size=batch_size,
num_threads=num_workers,
device_id=0)
pipeline.build()
return DALIGenericIterator(pipelines=pipeline,
last_batch_policy=LastBatchPolicy.DROP,
output_map=['aug0', 'aug1', 'label'])
import random
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from os import getenv
from pathlib import Path
from time import monotonic
from typing import Callable, Optional, Sequence
import numpy as np
import torch as T
from PIL import Image, ImageFilter, ImageOps
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision.transforms import (ColorJitter, Compose, InterpolationMode,
Normalize, RandomApply, RandomGrayscale,
RandomHorizontalFlip, RandomResizedCrop,
ToTensor)
DATA_DIR = Path(getenv('DATA_DIR', 'ImageNet100'))
IMAGENET_DEFAULT_MEAN = (0.485, 0.456, 0.406)
IMAGENET_DEFAULT_STD = (0.229, 0.224, 0.225)
class IndexedImageFolder(ImageFolder):
def __getitem__(self, index):
data = super().__getitem__(index)
return (index, *data)
class GaussianBlur:
def __init__(self, sigma: Sequence[float] = None):
if sigma is None:
sigma = [0.1, 2.0]
self.sigma = sigma
def __call__(self, img: Image) -> Image:
sigma = random.uniform(self.sigma[0], self.sigma[1])
img = img.filter(ImageFilter.GaussianBlur(radius=sigma))
return img
class Solarization:
def __call__(self, img: Image) -> Image:
return ImageOps.solarize(img)
class NCropAugmentation:
def __init__(self, transform: Callable, num_crops: int):
self.transform = transform
self.num_crops = num_crops
def __call__(self, x: Image) -> list[T.Tensor]:
return [self.transform(x) for _ in range(self.num_crops)]
class FullTransformPipeline:
def __init__(self, transforms: Callable) -> None:
self.transforms = transforms
def __call__(self, x: Image) -> list[T.Tensor]:
out = []
for transform in self.transforms:
out.extend(transform(x))
return out
def make_pipeline(batch_size=512, num_workers=32):
augmentations = Compose([
RandomResizedCrop(size=224,
scale=(0.2, 1.0),
interpolation=InterpolationMode.BICUBIC),
RandomApply([ColorJitter(0.4, 0.4, 0.2, 0.1)], p=0.8),
RandomGrayscale(p=0.2),
RandomApply([GaussianBlur()], p=0.5),
RandomApply([Solarization()], p=0.1),
RandomHorizontalFlip(p=0.5),
ToTensor(),
Normalize(mean=IMAGENET_DEFAULT_MEAN, std=IMAGENET_DEFAULT_STD),
])
pipelines = [NCropAugmentation(augmentations, 2)]
transform = FullTransformPipeline(pipelines)
train_path = DATA_DIR / 'train'
train_dataset = IndexedImageFolder(train_path, transform)
return DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
drop_last=True,
prefetch_factor=10,
persistent_workers=True,
)
@dataclass
class Elapsed:
seconds: float
started_at: datetime = field(default_factory=datetime.now)
finished_at: Optional[datetime] = None
@property
def duration(self) -> timedelta:
return timedelta(seconds=self.seconds)
@contextmanager
def measure():
elapsed = Elapsed(-monotonic())
yield elapsed
elapsed.seconds += monotonic()
elapsed.finished_at = datetime.now()
def test_pipeline(batch_size=512, num_workers=32, num_iters=247):
iters = range(num_iters)
loader = make_pipeline(batch_size, num_workers)
device = T.device('cpu')
if T.cuda.is_available():
device = T.device('cuda')
timings = np.empty(num_iters + 1)
with measure() as elapsed:
timings[0] = monotonic()
for ix, (index, images, labels) in zip(iters, loader):
images = [el.to(device) for el in images]
labels = labels.to(device)
timings[ix + 1] = monotonic()
num_iters = ix + 1
timings = timings[:num_iters]
np.save('timings.npy', timings)
timings = np.diff(timings)
print(timings)
print('mean: ', timings.mean())
print('std: ', timings.std())
print('total iters:', num_iters)
print('duration: ', elapsed.duration)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment