Skip to content

Instantly share code, notes, and snippets.

@TomAugspurger
Created January 28, 2025 12:12
"""
Test reading (and writing) Zarr data with host and GPU.
$ python gputest.py gpu read
gpu read 2.96
$ python gputest.py cpu read
cpu read 1.24
"""
import argparse
import dataclasses
import time
import cupy as cp
import numcodecs
import numcodecs.registry
from kvikio.nvcomp_codec import ensure_contiguous_ndarray_like
import zarr
import zarr.core.buffer.gpu
import zarr.registry
import zarr.storage
from zarr.abc.codec import BytesBytesCodec
from zarr.core.array_spec import ArraySpec
@dataclasses.dataclass(frozen=True)
class GPUZstdCodec(BytesBytesCodec):
name: str = "zstd"
configuration: dict = dataclasses.field(default_factory=dict)
is_fixed_size = True
level: int = 0
checksum: bool = False
def __post_init__(self) -> None:
object.__setattr__(
self,
"_codec",
numcodecs.registry.get_codec({"id": "nvcomp_batch", "algorithm": "zstd"}),
)
async def _encode_single(
self, chunk_bytes: zarr.core.buffer.Buffer, chunk_spec: ArraySpec
) -> zarr.core.buffer.Buffer:
# bufs = [cp.random.uniform(size=100)]
bufs = [chunk_bytes.as_array_like()]
num_chunks = len(bufs)
bufs = [cp.asarray(ensure_contiguous_ndarray_like(b)) for b in bufs]
buf_sizes = [b.size * b.itemsize for b in bufs]
max_chunk_size = max(buf_sizes)
# Get temp and output buffer sizes.
temp_size = self._codec._algo.get_compress_temp_size(num_chunks, max_chunk_size)
comp_chunk_size = self._codec._algo.get_compress_chunk_size(max_chunk_size)
# Prepare data and size buffers.
# uncomp_chunks is used as a container that stores pointers to actual chunks.
# nvCOMP requires this and sizes buffers to be in GPU memory.
uncomp_chunks = cp.array([b.data.ptr for b in bufs], dtype=cp.uintp)
uncomp_chunk_sizes = cp.array(buf_sizes, dtype=cp.uint64)
temp_buf = cp.empty(temp_size, dtype=cp.uint8)
comp_chunks = cp.empty((num_chunks, comp_chunk_size), dtype=cp.uint8)
# Array of pointers to each compressed chunk.
comp_chunk_ptrs = cp.array([c.data.ptr for c in comp_chunks], dtype=cp.uintp)
# Resulting compressed chunk sizes.
comp_chunk_sizes = cp.empty(num_chunks, dtype=cp.uint64)
self._codec._algo.compress(
uncomp_chunks,
uncomp_chunk_sizes,
max_chunk_size,
num_chunks,
temp_buf,
comp_chunk_ptrs,
comp_chunk_sizes,
self._codec._stream,
)
self._codec._stream.synchronize()
result = comp_chunks[0]
assert isinstance(result, cp.ndarray)
return chunk_spec.prototype.buffer.from_array_like(result.view("b"))
async def _decode_single(
self, chunk_bytes: zarr.core.buffer.Buffer, chunk_spec: ArraySpec
) -> zarr.core.buffer.Buffer:
return chunk_spec.prototype.buffer.from_array_like(
self._codec.decode(chunk_bytes.as_array_like()).view("b")
)
def parse_args(args=None):
parser = argparse.ArgumentParser()
parser.add_argument("device", choices=["gpu", "cpu"])
parser.add_argument("task", choices=["write", "read"])
return parser.parse_args(args)
def main(args=None):
args = parse_args(args)
gpu = args.device == "gpu"
write = args.task == "write"
if gpu:
zarr.registry.register_codec("zstd", GPUZstdCodec)
zarr.config.enable_gpu()
zarr.config.config["codecs"]["zstd"] = "__main__.GPUZstdCodec"
store = zarr.storage.LocalStore("/tmp/data/test-gpu.zarr")
xp = cp
else:
store = zarr.storage.LocalStore("/tmp/data/test-cpu.zarr")
import numpy as xp
t0 = time.time()
if write:
z = zarr.create_array(
store,
name="a",
shape=(10_000, 10_000),
chunks=(1_000, 1_000),
dtype="float32",
overwrite=True,
)
for i in range(10):
for j in range(10):
src = xp.random.uniform(size=(1000, 1000))
xs = slice(i * 1000, (i + 1) * 1000)
ys = slice(j * 1000, (j + 1) * 1000)
z[xs, ys] = src
else:
g = zarr.open_group(store)
z = g["a"]
arr = z[:]
if not gpu:
# do the final copy here
cp.asarray(z)
t1 = time.time()
print(f"{args.device} {args.task} {t1 - t0:0.2f}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment