From fa750607ed07e56dcc09ca3cfc312a14cdfbb6f7 Mon Sep 17 00:00:00 2001 From: OA jder bot Date: Fri, 27 Feb 2026 10:37:03 -0500 Subject: [PATCH 1/6] Add nvCOMP-backed GPU decode for Blosc zstd --- docs/user-guide/gpu.md | 11 +- pyproject.toml | 1 + src/zarr/codecs/__init__.py | 2 + src/zarr/codecs/gpu.py | 220 +++++++++++++++++++++++++ src/zarr/core/config.py | 6 +- tests/test_codecs/test_nvcomp_blosc.py | 96 +++++++++++ tests/test_config.py | 8 + 7 files changed, 337 insertions(+), 7 deletions(-) create mode 100644 src/zarr/codecs/gpu.py create mode 100644 tests/test_codecs/test_nvcomp_blosc.py diff --git a/docs/user-guide/gpu.md b/docs/user-guide/gpu.md index 3317bdf065..791c2094a7 100644 --- a/docs/user-guide/gpu.md +++ b/docs/user-guide/gpu.md @@ -3,13 +3,12 @@ Zarr can use GPUs to accelerate your workload by running `zarr.Config.enable_gpu`. !!! note - `zarr-python` currently supports reading the ndarray data into device (GPU) - memory as the final stage of the codec pipeline. Data will still be read into - or copied to host (CPU) memory for encoding and decoding. + GPU codec acceleration is currently available for the Blosc codec when all of + the following hold: - In the future, codecs will be available compressing and decompressing data on - the GPU, avoiding the need to move data between the host and device for - compression and decompression. + - `cname="zstd"` + - `shuffle` is `bitshuffle` or `noshuffle` + - array dtype is `float32` ## Reading data into device memory diff --git a/pyproject.toml b/pyproject.toml index 068caa1f0d..70812cb5ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,6 +68,7 @@ remote = [ ] gpu = [ "cupy-cuda12x", + "nvidia-nvcomp-cu12", ] cli = ["typer"] # Testing extras diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 4c621290e7..192ddd7efe 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -3,6 +3,7 @@ from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle from zarr.codecs.bytes import BytesCodec, Endian from zarr.codecs.crc32c_ import Crc32cCodec +from zarr.codecs.gpu import NvcompBloscCodec from zarr.codecs.gzip import GzipCodec from zarr.codecs.numcodecs import ( BZ2, @@ -41,6 +42,7 @@ "Crc32cCodec", "Endian", "GzipCodec", + "NvcompBloscCodec", "ShardingCodec", "ShardingCodecIndexLocation", "TransposeCodec", diff --git a/src/zarr/codecs/gpu.py b/src/zarr/codecs/gpu.py new file mode 100644 index 0000000000..b0fa95209a --- /dev/null +++ b/src/zarr/codecs/gpu.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import struct +from functools import cached_property +from typing import TYPE_CHECKING + +import numpy as np + +from zarr.codecs.blosc import BloscCodec +from zarr.registry import register_codec + +if TYPE_CHECKING: + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer + +try: + import cupy as cp +except ImportError: # pragma: no cover + cp = None + +try: + from nvidia import nvcomp +except ImportError: # pragma: no cover + nvcomp = None + +_BLOSC_MAX_OVERHEAD = 16 +_BLOSC_FLAG_DOSHUFFLE = 0x01 +_BLOSC_FLAG_MEMCPYED = 0x02 +_BLOSC_FLAG_DOBITSHUFFLE = 0x04 +_BLOSC_FLAG_DONT_SPLIT = 0x10 +_BLOSC_COMPFORMAT_ZSTD = 4 +_BLOSC_MIN_BUFFERSIZE = 128 +_BLOSC_MAX_SPLITS = 16 + + +def _read_i32(source: "cp.ndarray", offset: int) -> int: + raw = cp.asnumpy(source[offset : offset + 4]).tobytes() + return int(struct.unpack(" nvcomp.Codec: + assert cp is not None + assert nvcomp is not None + device = cp.cuda.Device() + stream = cp.cuda.get_current_stream() + return nvcomp.Codec( + algorithm="Zstd", + bitstream_kind=nvcomp.BitstreamKind.RAW, + device_id=device.id, + cuda_stream=stream.ptr, + ) + + @staticmethod + def _ensure_supported_dtype(chunk_spec: ArraySpec) -> None: + dtype = np.dtype(chunk_spec.dtype.to_native_dtype()) + if dtype != np.dtype("float32"): + raise ValueError( + "NvcompBloscCodec only supports float32 for GPU decode. " + f"Got dtype={dtype!s}." + ) + + @staticmethod + def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.ndarray": + bits = cp.unpackbits(src, bitorder="little") + matrix = bits.reshape((typesize * 8, n_elements)) + out_bits = matrix.T.reshape((-1,)) + return cp.packbits(out_bits, bitorder="little") + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + source = chunk_bytes.as_array_like() + if cp is None or not isinstance(source, cp.ndarray): + return await super()._decode_single(chunk_bytes, chunk_spec) + if nvcomp is None: + raise RuntimeError( + "NvcompBloscCodec requires `nvidia-nvcomp-cu12` to decode Blosc zstd chunks on GPU." + ) + + if source.size < _BLOSC_MAX_OVERHEAD: + raise ValueError( + f"Invalid Blosc payload: expected at least {_BLOSC_MAX_OVERHEAD} bytes, got {source.size}." + ) + + header = cp.asnumpy(source[:_BLOSC_MAX_OVERHEAD]).tobytes() + _, _, flags, typesize, nbytes, blocksize, cbytes = struct.unpack("> 5 + + self._ensure_supported_dtype(chunk_spec) + + if do_shuffle: + raise ValueError("NvcompBloscCodec does not support byte-shuffle Blosc chunks.") + if compformat != _BLOSC_COMPFORMAT_ZSTD: + raise ValueError( + "NvcompBloscCodec only supports Blosc chunks with cname='zstd'. " + f"Got compformat={compformat}." + ) + + if is_memcpyed: + if cbytes < _BLOSC_MAX_OVERHEAD + nbytes: + raise ValueError( + "Invalid memcpyed Blosc payload: missing raw bytes after header." + ) + raw = source[_BLOSC_MAX_OVERHEAD : _BLOSC_MAX_OVERHEAD + nbytes] + return chunk_spec.prototype.buffer.from_array_like(raw.copy()) + + if blocksize == 0: + raise ValueError("Invalid Blosc payload: blocksize must be > 0.") + if nbytes % typesize != 0: + raise ValueError( + f"Invalid Blosc payload: nbytes={nbytes} is not divisible by typesize={typesize}." + ) + + leftover = nbytes % blocksize + nblocks = nbytes // blocksize + (1 if leftover else 0) + + bstarts_offset = _BLOSC_MAX_OVERHEAD + bstarts_size = nblocks * 4 + bstarts_end = bstarts_offset + bstarts_size + if cbytes < bstarts_end: + raise ValueError("Invalid Blosc payload: missing block-start table.") + + bstarts = np.frombuffer( + cp.asnumpy(source[bstarts_offset:bstarts_end]).tobytes(), + dtype=" 0) else blocksize + leftover_block = bi == nblocks - 1 and leftover > 0 + + if ( + (not dont_split) + and (typesize <= _BLOSC_MAX_SPLITS) + and ((blocksize // typesize) >= _BLOSC_MIN_BUFFERSIZE) + and (not leftover_block) + ): + nsplits = typesize + else: + nsplits = 1 + + if bsize % nsplits != 0: + raise ValueError( + f"Invalid Blosc payload: blocksize {bsize} not divisible by nsplits {nsplits}." + ) + neblock = bsize // nsplits + + block_tmp = cp.empty((bsize,), dtype=cp.uint8) + decode_inputs: list[cp.ndarray] = [] + decode_targets: list[tuple[int, int]] = [] + + p = int(bstarts[bi]) + for si in range(nsplits): + if p < 0 or (p + 4) > cbytes: + raise ValueError("Invalid Blosc payload: split header outside payload.") + csize = _read_i32(source, p) + p += 4 + if csize < 0 or (p + csize) > cbytes: + raise ValueError("Invalid Blosc payload: split data outside payload.") + split = source[p : p + csize] + p += csize + + start = si * neblock + end = start + neblock + if csize == neblock: + block_tmp[start:end] = split + else: + decode_inputs.append(split) + decode_targets.append((start, end)) + + if decode_inputs: + decoded = self._zstd_codec.decode(nvcomp.as_arrays(decode_inputs)) + cp.cuda.get_current_stream().synchronize() + for dec, (start, end) in zip(decoded, decode_targets, strict=True): + block_tmp[start:end] = cp.asarray(dec, dtype=cp.uint8) + + if do_bitshuffle: + if bsize % typesize != 0: + raise ValueError( + f"Invalid bitshuffle payload: block bytes {bsize} not divisible by typesize {typesize}." + ) + n_elements = bsize // typesize + if n_elements % 8 != 0: + raise ValueError( + "NvcompBloscCodec only supports bitshuffle blocks with element count divisible by 8." + ) + block_out = self._bitunshuffle( + block_tmp, + typesize=typesize, + n_elements=n_elements, + ) + else: + block_out = block_tmp + + block_start = bi * blocksize + output[block_start : block_start + bsize] = block_out + + return chunk_spec.prototype.buffer.from_array_like(output) + + +register_codec("blosc", NvcompBloscCodec, qualname="zarr.codecs.gpu.NvcompBloscCodec") diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index f8f8ea4f5f..ac89bdbef1 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -64,7 +64,11 @@ def enable_gpu(self) -> ConfigSet: Configure Zarr to use GPUs where possible. """ return self.set( - {"buffer": "zarr.buffer.gpu.Buffer", "ndbuffer": "zarr.buffer.gpu.NDBuffer"} + { + "buffer": "zarr.buffer.gpu.Buffer", + "ndbuffer": "zarr.buffer.gpu.NDBuffer", + "codecs": {"blosc": "zarr.codecs.gpu.NvcompBloscCodec"}, + } ) diff --git a/tests/test_codecs/test_nvcomp_blosc.py b/tests/test_codecs/test_nvcomp_blosc.py new file mode 100644 index 0000000000..12eb47125c --- /dev/null +++ b/tests/test_codecs/test_nvcomp_blosc.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import warnings + +import numpy as np +import pytest + +import zarr +from zarr.codecs.blosc import BloscCodec +from zarr.errors import ZarrUserWarning +from zarr.testing.utils import gpu_test + + +@gpu_test +@pytest.mark.parametrize("shuffle", ["bitshuffle", "noshuffle"]) +def test_nvcomp_blosc_decode_supported(shuffle: str) -> None: + import cupy as cp + + src = np.arange(256, dtype=np.float32).reshape(16, 16) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle=shuffle), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_byte_shuffle() -> None: + src = np.arange(64, dtype=np.float32).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="shuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="byte-shuffle"): + _ = zr[:, :] + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_non_float32() -> None: + src = np.arange(64, dtype=np.float64).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="bitshuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="float32"): + _ = zr[:, :] + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_non_zstd() -> None: + src = np.arange(64, dtype=np.float32).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="lz4", shuffle="noshuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="cname='zstd'"): + _ = zr[:, :] diff --git a/tests/test_config.py b/tests/test_config.py index c3102e8efe..6fc993f901 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -291,6 +291,14 @@ def test_config_buffer_backwards_compatibility_gpu() -> None: get_ndbuffer_class() +@pytest.mark.gpu +def test_enable_gpu_sets_gpu_blosc_codec() -> None: + with zarr.config.enable_gpu(): + assert config.get("buffer") == "zarr.buffer.gpu.Buffer" + assert config.get("ndbuffer") == "zarr.buffer.gpu.NDBuffer" + assert config.get("codecs.blosc") == "zarr.codecs.gpu.NvcompBloscCodec" + + @pytest.mark.filterwarnings("error") def test_warning_on_missing_codec_config() -> None: class NewCodec(BytesCodec): From 96e2f752b9483ac36cc891eb7390f8af9556c0d9 Mon Sep 17 00:00:00 2001 From: OA jder bot Date: Fri, 27 Feb 2026 10:45:01 -0500 Subject: [PATCH 2/6] Set hatch-vcs fallback version for downstream xarray compatibility --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 70812cb5ed..53f056856d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -155,6 +155,9 @@ omit = [ [tool.hatch] version.source = "vcs" +[tool.hatch.version.raw-options] +fallback_version = "3.1.0" + [tool.hatch.build] hooks.vcs.version-file = "src/zarr/_version.py" From 06a7289822cbc1e58abb26ad0a432ac40a00ae2a Mon Sep 17 00:00:00 2001 From: Jesse Rusak Date: Thu, 2 Apr 2026 20:14:19 -0400 Subject: [PATCH 3/6] Add batched GPU zstd and blosc decode paths --- src/zarr/codecs/__init__.py | 3 +- src/zarr/codecs/gpu.py | 410 ++++++++++++++++++++----- src/zarr/core/array.py | 6 +- src/zarr/core/config.py | 9 +- tests/test_codecs/test_codecs.py | 21 ++ tests/test_codecs/test_nvcomp.py | 232 ++++++++++++++ tests/test_codecs/test_nvcomp_blosc.py | 86 +++++- tests/test_config.py | 5 +- 8 files changed, 688 insertions(+), 84 deletions(-) create mode 100644 tests/test_codecs/test_nvcomp.py diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 192ddd7efe..9beebbf295 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -3,7 +3,7 @@ from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle from zarr.codecs.bytes import BytesCodec, Endian from zarr.codecs.crc32c_ import Crc32cCodec -from zarr.codecs.gpu import NvcompBloscCodec +from zarr.codecs.gpu import NvcompBloscCodec, NvcompZstdCodec from zarr.codecs.gzip import GzipCodec from zarr.codecs.numcodecs import ( BZ2, @@ -43,6 +43,7 @@ "Endian", "GzipCodec", "NvcompBloscCodec", + "NvcompZstdCodec", "ShardingCodec", "ShardingCodecIndexLocation", "TransposeCodec", diff --git a/src/zarr/codecs/gpu.py b/src/zarr/codecs/gpu.py index b0fa95209a..1a15509229 100644 --- a/src/zarr/codecs/gpu.py +++ b/src/zarr/codecs/gpu.py @@ -1,15 +1,28 @@ from __future__ import annotations +import asyncio import struct +from dataclasses import dataclass from functools import cached_property -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Literal +import numcodecs import numpy as np +from numcodecs.zstd import Zstd as NumcodecsZstd +from packaging.version import Version +from zarr.abc.codec import BytesBytesCodec from zarr.codecs.blosc import BloscCodec +from zarr.codecs.zstd import parse_checksum, parse_zstd_level +from zarr.core.buffer.cpu import as_numpy_array_wrapper +from zarr.core.common import JSON, concurrent_map, parse_named_configuration +from zarr.core.config import config from zarr.registry import register_codec if TYPE_CHECKING: + from collections.abc import Iterable + from typing import Self + from zarr.core.array_spec import ArraySpec from zarr.core.buffer import Buffer @@ -38,11 +51,40 @@ def _read_i32(source: "cp.ndarray", offset: int) -> int: return int(struct.unpack(" None: + if nvcomp is None: + raise RuntimeError( + f"{codec_name} requires `nvidia-nvcomp-cu12` to {operation} GPU-backed zstd chunks." + ) @cached_property - def _zstd_codec(self) -> nvcomp.Codec: + def _nvcomp_zstd_codec(self) -> nvcomp.Codec: assert cp is not None assert nvcomp is not None device = cp.cuda.Device() @@ -55,14 +97,166 @@ def _zstd_codec(self) -> nvcomp.Codec: ) @staticmethod - def _ensure_supported_dtype(chunk_spec: ArraySpec) -> None: - dtype = np.dtype(chunk_spec.dtype.to_native_dtype()) - if dtype != np.dtype("float32"): - raise ValueError( - "NvcompBloscCodec only supports float32 for GPU decode. " - f"Got dtype={dtype!s}." + def _coerce_nvcomp_output(array: Any) -> "cp.ndarray": + out = cp.asarray(array) + if out.dtype != np.dtype("B"): + out = out.view(np.dtype("B")) + return out + + async def _run_nvcomp_zstd_batch( + self, + arrays: Iterable["cp.ndarray"], + *, + operation: Literal["decode", "encode"], + ) -> list["cp.ndarray"]: + array_list = list(arrays) + if not array_list: + return [] + + outputs = getattr(self._nvcomp_zstd_codec, operation)(nvcomp.as_arrays(array_list)) + event = cp.cuda.Event() + event.record() + await asyncio.to_thread(event.synchronize) + return [self._coerce_nvcomp_output(output) for output in outputs] + + +@dataclass(frozen=True) +class NvcompZstdCodec(_NvcompZstdMixin, BytesBytesCodec): + """GPU-backed zstd codec using nvCOMP for batched encode/decode.""" + + is_fixed_size = True + + level: int = 0 + checksum: bool = False + + def __init__(self, *, level: int = 0, checksum: bool = False) -> None: + _numcodecs_version = Version(numcodecs.__version__) + if _numcodecs_version < Version("0.13.0"): + raise RuntimeError( + "numcodecs version >= 0.13.0 is required to use the zstd codec. " + f"Version {_numcodecs_version} is currently installed." ) + object.__setattr__(self, "level", parse_zstd_level(level)) + object.__setattr__(self, "checksum", parse_checksum(checksum)) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "zstd") + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> dict[str, JSON]: + return {"name": "zstd", "configuration": {"level": self.level, "checksum": self.checksum}} + + @cached_property + def _cpu_zstd_codec(self) -> NumcodecsZstd: + return NumcodecsZstd.from_config({"level": self.level, "checksum": self.checksum}) + + async def _decode_single_cpu( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return await asyncio.to_thread( + as_numpy_array_wrapper, + self._cpu_zstd_codec.decode, + chunk_bytes, + chunk_spec.prototype, + ) + + async def _encode_single_cpu( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return await asyncio.to_thread( + as_numpy_array_wrapper, + self._cpu_zstd_codec.encode, + chunk_bytes, + chunk_spec.prototype, + ) + + async def decode( + self, + chunks_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + batch = list(chunks_and_specs) + results: list[Buffer | None] = [None] * len(batch) + gpu_entries: list[tuple[int, cp.ndarray, ArraySpec]] = [] + cpu_entries: list[tuple[int, Buffer, ArraySpec]] = [] + + for index, (chunk_bytes, chunk_spec) in enumerate(batch): + if chunk_bytes is None: + continue + source = chunk_bytes.as_array_like() + if cp is not None and isinstance(source, cp.ndarray): + gpu_entries.append((index, source, chunk_spec)) + else: + cpu_entries.append((index, chunk_bytes, chunk_spec)) + + if gpu_entries: + self._require_nvcomp(codec_name=type(self).__name__, operation="decode") + outputs = await self._run_nvcomp_zstd_batch( + (source for _, source, _ in gpu_entries), operation="decode" + ) + for (index, _, chunk_spec), output in zip(gpu_entries, outputs, strict=True): + results[index] = chunk_spec.prototype.buffer.from_array_like(output) + + if cpu_entries: + cpu_outputs = await concurrent_map( + [(chunk_bytes, chunk_spec) for _, chunk_bytes, chunk_spec in cpu_entries], + self._decode_single_cpu, + config.get("async.concurrency"), + ) + for (index, _, _), output in zip(cpu_entries, cpu_outputs, strict=True): + results[index] = output + + return results + + async def encode( + self, + chunks_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + batch = list(chunks_and_specs) + results: list[Buffer | None] = [None] * len(batch) + gpu_entries: list[tuple[int, cp.ndarray, ArraySpec]] = [] + cpu_entries: list[tuple[int, Buffer, ArraySpec]] = [] + + for index, (chunk_bytes, chunk_spec) in enumerate(batch): + if chunk_bytes is None: + continue + source = chunk_bytes.as_array_like() + if cp is not None and isinstance(source, cp.ndarray): + gpu_entries.append((index, source, chunk_spec)) + else: + cpu_entries.append((index, chunk_bytes, chunk_spec)) + + if gpu_entries: + self._require_nvcomp(codec_name=type(self).__name__, operation="encode") + outputs = await self._run_nvcomp_zstd_batch( + (source for _, source, _ in gpu_entries), operation="encode" + ) + for (index, _, chunk_spec), output in zip(gpu_entries, outputs, strict=True): + results[index] = chunk_spec.prototype.buffer.from_array_like(output) + + if cpu_entries: + cpu_outputs = await concurrent_map( + [(chunk_bytes, chunk_spec) for _, chunk_bytes, chunk_spec in cpu_entries], + self._encode_single_cpu, + config.get("async.concurrency"), + ) + for (index, _, _), output in zip(cpu_entries, cpu_outputs, strict=True): + results[index] = output + + return results + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError + + +class NvcompBloscCodec(_NvcompZstdMixin, BloscCodec): + """GPU Blosc decoder (zstd + {bitshuffle, noshuffle}) using nvCOMP.""" + @staticmethod def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.ndarray": bits = cp.unpackbits(src, bitorder="little") @@ -70,19 +264,46 @@ def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.n out_bits = matrix.T.reshape((-1,)) return cp.packbits(out_bits, bitorder="little") - async def _decode_single( - self, - chunk_bytes: Buffer, - chunk_spec: ArraySpec, - ) -> Buffer: - source = chunk_bytes.as_array_like() - if cp is None or not isinstance(source, cp.ndarray): - return await super()._decode_single(chunk_bytes, chunk_spec) - if nvcomp is None: - raise RuntimeError( - "NvcompBloscCodec requires `nvidia-nvcomp-cu12` to decode Blosc zstd chunks on GPU." + @staticmethod + def _split_count(*, typesize: int, blocksize: int, dont_split: bool, leftover_block: bool) -> int: + if ( + (not dont_split) + and (typesize <= _BLOSC_MAX_SPLITS) + and ((blocksize // typesize) >= _BLOSC_MIN_BUFFERSIZE) + and (not leftover_block) + ): + return typesize + return 1 + + @staticmethod + def _finalize_block(block_plan: _BloscBlockPlan) -> "cp.ndarray": + if not block_plan.do_bitshuffle: + return block_plan.block_tmp + if block_plan.bsize % block_plan.typesize != 0: + raise ValueError( + "Invalid bitshuffle payload: " + f"block bytes {block_plan.bsize} not divisible by typesize {block_plan.typesize}." + ) + n_elements = block_plan.bsize // block_plan.typesize + if n_elements % 8 != 0: + raise ValueError( + "NvcompBloscCodec only supports bitshuffle blocks with element count divisible by 8." ) + return NvcompBloscCodec._bitunshuffle( + block_plan.block_tmp, + typesize=block_plan.typesize, + n_elements=n_elements, + ) + def _build_chunk_plan( + self, + *, + index: int, + source: "cp.ndarray", + chunk_spec: ArraySpec, + decode_inputs: list["cp.ndarray"], + decode_targets: list[_BloscSplitTarget], + ) -> _BloscChunkPlan | Buffer: if source.size < _BLOSC_MAX_OVERHEAD: raise ValueError( f"Invalid Blosc payload: expected at least {_BLOSC_MAX_OVERHEAD} bytes, got {source.size}." @@ -102,8 +323,6 @@ async def _decode_single( dont_split = (flags & _BLOSC_FLAG_DONT_SPLIT) != 0 compformat = (flags & 0xE0) >> 5 - self._ensure_supported_dtype(chunk_spec) - if do_shuffle: raise ValueError("NvcompBloscCodec does not support byte-shuffle Blosc chunks.") if compformat != _BLOSC_COMPFORMAT_ZSTD: @@ -114,9 +333,7 @@ async def _decode_single( if is_memcpyed: if cbytes < _BLOSC_MAX_OVERHEAD + nbytes: - raise ValueError( - "Invalid memcpyed Blosc payload: missing raw bytes after header." - ) + raise ValueError("Invalid memcpyed Blosc payload: missing raw bytes after header.") raw = source[_BLOSC_MAX_OVERHEAD : _BLOSC_MAX_OVERHEAD + nbytes] return chunk_spec.prototype.buffer.from_array_like(raw.copy()) @@ -143,33 +360,36 @@ async def _decode_single( ) output = cp.empty((nbytes,), dtype=cp.uint8) - - for bi in range(nblocks): - bsize = leftover if (bi == nblocks - 1 and leftover > 0) else blocksize - leftover_block = bi == nblocks - 1 and leftover > 0 - - if ( - (not dont_split) - and (typesize <= _BLOSC_MAX_SPLITS) - and ((blocksize // typesize) >= _BLOSC_MIN_BUFFERSIZE) - and (not leftover_block) - ): - nsplits = typesize - else: - nsplits = 1 - + block_plans: list[_BloscBlockPlan] = [] + + for block_index in range(nblocks): + bsize = leftover if (block_index == nblocks - 1 and leftover > 0) else blocksize + leftover_block = block_index == nblocks - 1 and leftover > 0 + nsplits = self._split_count( + typesize=typesize, + blocksize=bsize, + dont_split=dont_split, + leftover_block=leftover_block, + ) if bsize % nsplits != 0: raise ValueError( f"Invalid Blosc payload: blocksize {bsize} not divisible by nsplits {nsplits}." ) - neblock = bsize // nsplits + neblock = bsize // nsplits block_tmp = cp.empty((bsize,), dtype=cp.uint8) - decode_inputs: list[cp.ndarray] = [] - decode_targets: list[tuple[int, int]] = [] + block_plan = _BloscBlockPlan( + block_tmp=block_tmp, + output=output, + output_start=block_index * blocksize, + bsize=bsize, + do_bitshuffle=do_bitshuffle, + typesize=typesize, + ) + block_plans.append(block_plan) - p = int(bstarts[bi]) - for si in range(nsplits): + p = int(bstarts[block_index]) + for split_index in range(nsplits): if p < 0 or (p + 4) > cbytes: raise ValueError("Invalid Blosc payload: split header outside payload.") csize = _read_i32(source, p) @@ -179,42 +399,82 @@ async def _decode_single( split = source[p : p + csize] p += csize - start = si * neblock - end = start + neblock + start = split_index * neblock + stop = start + neblock if csize == neblock: - block_tmp[start:end] = split + block_tmp[start:stop] = split else: decode_inputs.append(split) - decode_targets.append((start, end)) - - if decode_inputs: - decoded = self._zstd_codec.decode(nvcomp.as_arrays(decode_inputs)) - cp.cuda.get_current_stream().synchronize() - for dec, (start, end) in zip(decoded, decode_targets, strict=True): - block_tmp[start:end] = cp.asarray(dec, dtype=cp.uint8) - - if do_bitshuffle: - if bsize % typesize != 0: - raise ValueError( - f"Invalid bitshuffle payload: block bytes {bsize} not divisible by typesize {typesize}." - ) - n_elements = bsize // typesize - if n_elements % 8 != 0: - raise ValueError( - "NvcompBloscCodec only supports bitshuffle blocks with element count divisible by 8." + decode_targets.append( + _BloscSplitTarget(block_plan=block_plan, start=start, stop=stop) ) - block_out = self._bitunshuffle( - block_tmp, - typesize=typesize, - n_elements=n_elements, - ) + + return _BloscChunkPlan( + index=index, + output=output, + chunk_spec=chunk_spec, + block_plans=tuple(block_plans), + ) + + async def decode( + self, + chunks_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + batch = list(chunks_and_specs) + results: list[Buffer | None] = [None] * len(batch) + decode_inputs: list[cp.ndarray] = [] + decode_targets: list[_BloscSplitTarget] = [] + gpu_chunk_plans: list[_BloscChunkPlan] = [] + cpu_entries: list[tuple[int, Buffer, ArraySpec]] = [] + + for index, (chunk_bytes, chunk_spec) in enumerate(batch): + if chunk_bytes is None: + continue + + source = chunk_bytes.as_array_like() + if cp is None or not isinstance(source, cp.ndarray): + cpu_entries.append((index, chunk_bytes, chunk_spec)) + continue + + self._require_nvcomp(codec_name=type(self).__name__, operation="decode") + plan_or_buffer = self._build_chunk_plan( + index=index, + source=source, + chunk_spec=chunk_spec, + decode_inputs=decode_inputs, + decode_targets=decode_targets, + ) + if isinstance(plan_or_buffer, _BloscChunkPlan): + gpu_chunk_plans.append(plan_or_buffer) else: - block_out = block_tmp + results[index] = plan_or_buffer + + if decode_inputs: + decoded_splits = await self._run_nvcomp_zstd_batch(decode_inputs, operation="decode") + for decoded, decode_target in zip(decoded_splits, decode_targets, strict=True): + decode_target.block_plan.block_tmp[decode_target.start : decode_target.stop] = decoded + + for chunk_plan in gpu_chunk_plans: + for block_plan in chunk_plan.block_plans: + block_out = self._finalize_block(block_plan) + block_plan.output[ + block_plan.output_start : block_plan.output_start + block_plan.bsize + ] = block_out + results[chunk_plan.index] = chunk_plan.chunk_spec.prototype.buffer.from_array_like( + chunk_plan.output + ) - block_start = bi * blocksize - output[block_start : block_start + bsize] = block_out + if cpu_entries: + cpu_outputs = await concurrent_map( + [(chunk_bytes, chunk_spec) for _, chunk_bytes, chunk_spec in cpu_entries], + super()._decode_single, + config.get("async.concurrency"), + ) + for (index, _, _), output in zip(cpu_entries, cpu_outputs, strict=True): + results[index] = output - return chunk_spec.prototype.buffer.from_array_like(output) + return results +register_codec("zstd", NvcompZstdCodec, qualname="zarr.codecs.gpu.NvcompZstdCodec") register_codec("blosc", NvcompBloscCodec, qualname="zarr.codecs.gpu.NvcompBloscCodec") diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 564d0e915a..11673cbece 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -28,7 +28,6 @@ from zarr.codecs._v2 import V2Codec from zarr.codecs.bytes import BytesCodec from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec -from zarr.codecs.zstd import ZstdCodec from zarr.core._info import ArrayInfo from zarr.core.array_spec import ArrayConfig, ArrayConfigLike, parse_array_config from zarr.core.attributes import Attributes @@ -128,6 +127,7 @@ _parse_array_array_codec, _parse_array_bytes_codec, _parse_bytes_bytes_codec, + get_codec_class, get_pipeline_class, ) from zarr.storage._common import StorePath, ensure_no_existing_node, make_store_path @@ -5040,9 +5040,9 @@ def default_compressors_v3(dtype: ZDType[Any, Any]) -> tuple[BytesBytesCodec, .. """ Given a data type, return the default compressors for that data type. - This is just a tuple containing ``ZstdCodec`` + This is just a tuple containing an instance of the configured ``zstd`` codec class. """ - return (ZstdCodec(),) + return (cast("BytesBytesCodec", get_codec_class("zstd")()),) def default_serializer_v3(dtype: ZDType[Any, Any]) -> ArrayBytesCodec: diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index ac89bdbef1..0ad9147f04 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -67,7 +67,14 @@ def enable_gpu(self) -> ConfigSet: { "buffer": "zarr.buffer.gpu.Buffer", "ndbuffer": "zarr.buffer.gpu.NDBuffer", - "codecs": {"blosc": "zarr.codecs.gpu.NvcompBloscCodec"}, + "codecs": { + "blosc": "zarr.codecs.gpu.NvcompBloscCodec", + "zstd": "zarr.codecs.gpu.NvcompZstdCodec", + }, + "codec_pipeline": { + "path": "zarr.core.codec_pipeline.BatchedCodecPipeline", + "batch_size": 65536, + }, } ) diff --git a/tests/test_codecs/test_codecs.py b/tests/test_codecs/test_codecs.py index fa2017876e..7da8462259 100644 --- a/tests/test_codecs/test_codecs.py +++ b/tests/test_codecs/test_codecs.py @@ -16,12 +16,14 @@ GzipCodec, ShardingCodec, TransposeCodec, + ZstdCodec, ) from zarr.core.buffer import default_buffer_prototype from zarr.core.indexing import BasicSelection, decode_morton, morton_order_iter from zarr.core.metadata.v3 import ArrayV3Metadata from zarr.dtype import UInt8 from zarr.errors import ZarrUserWarning +from zarr.registry import register_codec from zarr.storage import StorePath if TYPE_CHECKING: @@ -401,3 +403,22 @@ async def test_resize(store: Store) -> None: assert await store.get(f"{path}/0.1", prototype=default_buffer_prototype()) is not None assert await store.get(f"{path}/1.0", prototype=default_buffer_prototype()) is None assert await store.get(f"{path}/1.1", prototype=default_buffer_prototype()) is None + + +def test_uses_default_codec() -> None: + class MyZstdCodec(ZstdCodec): + pass + + register_codec("zstd", MyZstdCodec) + + with zarr.config.set( + {"codecs": {"zstd": f"{MyZstdCodec.__module__}.{MyZstdCodec.__qualname__}"}} + ): + a = zarr.create_array( + StorePath(zarr.storage.MemoryStore(), path="mycodec"), + shape=(10, 10), + chunks=(10, 10), + dtype="int32", + ) + assert a.metadata.zarr_format == 3 + assert isinstance(a.metadata.codecs[-1], MyZstdCodec) diff --git a/tests/test_codecs/test_nvcomp.py b/tests/test_codecs/test_nvcomp.py new file mode 100644 index 0000000000..7b8ee0d9e2 --- /dev/null +++ b/tests/test_codecs/test_nvcomp.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +import contextlib +import typing +import warnings +from collections.abc import Iterator + +import numpy as np +import pytest + +import zarr +from zarr.abc.store import Store +from zarr.buffer.gpu import buffer_prototype +from zarr.codecs import NvcompZstdCodec, ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.storage import StorePath +from zarr.testing.utils import gpu_test + +if typing.TYPE_CHECKING: + from zarr.core.common import JSON + + +@gpu_test +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("checksum", [True, False]) +@pytest.mark.parametrize( + "selection", + [ + (slice(None), slice(None)), + (slice(4, None), slice(4, None)), + ], +) +def test_nvcomp_zstd(store: Store, checksum: bool, selection: tuple[slice, slice]) -> None: + import cupy as cp + + with zarr.config.enable_gpu(): + data = cp.arange(0, 256, dtype="uint16").reshape((16, 16)) + + a = zarr.create_array( + StorePath(store, path="nvcomp_zstd"), + shape=data.shape, + chunks=(4, 4), + dtype=data.dtype, + fill_value=0, + compressors=NvcompZstdCodec(level=0, checksum=checksum), + ) + + a[selection] = data[selection] + + if selection == (slice(None), slice(None)): + cp.testing.assert_array_equal(data[selection], a[selection]) + cp.testing.assert_array_equal(data[:, :], a[:, :]) + else: + assert a.nchunks_initialized < a.nchunks + expected = cp.full(data.shape, a.fill_value) + expected[selection] = data[selection] + cp.testing.assert_array_equal(expected[selection], a[selection]) + cp.testing.assert_array_equal(expected[:, :], a[:, :]) + + +@gpu_test +@pytest.mark.parametrize("host_encode", [True, False]) +def test_gpu_codec_compatibility(host_encode: bool) -> None: + import cupy as cp + + @contextlib.contextmanager + def gpu_context() -> Iterator[None]: + with zarr.config.enable_gpu(): + yield + + if host_encode: + write_ctx: contextlib.AbstractContextManager[None] = contextlib.nullcontext() + read_ctx: contextlib.AbstractContextManager[None] = gpu_context() + write_data = np.arange(16, dtype="int32").reshape(4, 4) + read_data = cp.array(write_data) + xp = cp + expected_warning: pytest.WarningsRecorder | contextlib.AbstractContextManager[None] = ( + pytest.warns(zarr.errors.ZarrUserWarning) + ) + else: + write_ctx = gpu_context() + read_ctx = contextlib.nullcontext() + write_data = cp.arange(16, dtype="int32").reshape(4, 4) + read_data = write_data.get() + xp = np + expected_warning = contextlib.nullcontext() + + store = zarr.storage.MemoryStore() + + with write_ctx: + z = zarr.create_array( + store=store, + shape=write_data.shape, + chunks=(4, 4), + dtype=write_data.dtype, + ) + z[:] = write_data + + with read_ctx, expected_warning: + z = zarr.open_array(store=store, mode="r") + result = z[:] + assert isinstance(result, type(read_data)) + xp.testing.assert_array_equal(result, read_data) + + +@gpu_test +def test_uses_default_codec() -> None: + with zarr.config.enable_gpu(): + a = zarr.create_array( + StorePath(zarr.storage.MemoryStore(), path="nvcomp_zstd"), + shape=(10, 10), + chunks=(10, 10), + dtype="int32", + ) + assert a.metadata.zarr_format == 3 + assert isinstance(a.metadata.codecs[-1], NvcompZstdCodec) + + +@gpu_test +def test_nvcomp_zstd_batched_decode_uses_single_nvcomp_call( + monkeypatch: pytest.MonkeyPatch, +) -> None: + calls = 0 + original = NvcompZstdCodec._run_nvcomp_zstd_batch + + async def counted( + self: NvcompZstdCodec, + arrays: typing.Iterable[typing.Any], + *, + operation: str, + ) -> list[typing.Any]: + nonlocal calls + calls += 1 + return await original(self, arrays, operation=typing.cast("typing.Any", operation)) + + monkeypatch.setattr(NvcompZstdCodec, "_run_nvcomp_zstd_batch", counted) + + src = np.arange(32 * 32, dtype=np.int32).reshape(32, 32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=ZstdCodec(level=0, checksum=False), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=zarr.errors.ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert calls == 1 + np.testing.assert_array_equal(np.asarray(out), src) + + +def test_invalid_raises() -> None: + with pytest.raises(ValueError): + NvcompZstdCodec(level=100, checksum=False) + + with pytest.raises(TypeError): + NvcompZstdCodec(level="100", checksum=False) # type: ignore[arg-type] + + with pytest.raises(TypeError): + NvcompZstdCodec(checksum="False") # type: ignore[arg-type] + + +def test_nvcomp_from_dict() -> None: + config: dict[str, JSON] = { + "name": "zstd", + "configuration": { + "level": 0, + "checksum": False, + }, + } + codec = NvcompZstdCodec.from_dict(config) + assert codec.level == 0 + assert codec.checksum is False + + +def test_compute_encoded_chunk_size() -> None: + codec = NvcompZstdCodec(level=0, checksum=False) + with pytest.raises(NotImplementedError): + codec.compute_encoded_size( + _input_byte_length=0, + _chunk_spec=ArraySpec( + shape=(10, 10), + dtype=zarr.core.dtype.Int32(), + fill_value=0, + config=ArrayConfig(order="C", write_empty_chunks=False), + prototype=buffer_prototype, + ), + ) + + +@pytest.mark.asyncio +async def test_nvcomp_zstd_decode_none() -> None: + codec = NvcompZstdCodec(level=0, checksum=False) + chunks_and_specs = [ + ( + None, + ArraySpec( + shape=(10, 10), + dtype=zarr.core.dtype.Int32(), + fill_value=0, + config=ArrayConfig(order="C", write_empty_chunks=False), + prototype=buffer_prototype, + ), + ) + ] + result = await codec.decode(chunks_and_specs) + assert result == [None] + + +@pytest.mark.asyncio +async def test_nvcomp_zstd_encode_none() -> None: + codec = NvcompZstdCodec(level=0, checksum=False) + chunks_and_specs = [ + ( + None, + ArraySpec( + shape=(10, 10), + dtype=zarr.core.dtype.Int32(), + fill_value=0, + config=ArrayConfig(order="C", write_empty_chunks=False), + prototype=buffer_prototype, + ), + ) + ] + result = await codec.encode(chunks_and_specs) + assert result == [None] diff --git a/tests/test_codecs/test_nvcomp_blosc.py b/tests/test_codecs/test_nvcomp_blosc.py index 12eb47125c..7b727f1b4e 100644 --- a/tests/test_codecs/test_nvcomp_blosc.py +++ b/tests/test_codecs/test_nvcomp_blosc.py @@ -6,6 +6,7 @@ import pytest import zarr +from zarr.codecs import NvcompBloscCodec from zarr.codecs.blosc import BloscCodec from zarr.errors import ZarrUserWarning from zarr.testing.utils import gpu_test @@ -57,7 +58,7 @@ def test_nvcomp_blosc_decode_raises_on_byte_shuffle() -> None: @gpu_test -def test_nvcomp_blosc_decode_raises_on_non_float32() -> None: +def test_nvcomp_blosc_decode_supported_non_float32() -> None: src = np.arange(64, dtype=np.float64).reshape(8, 8) store = zarr.storage.MemoryStore() z = zarr.create_array( @@ -72,8 +73,9 @@ def test_nvcomp_blosc_decode_raises_on_non_float32() -> None: with zarr.config.enable_gpu(), warnings.catch_warnings(): warnings.filterwarnings("ignore", category=ZarrUserWarning) zr = zarr.open_array(store=store, mode="r") - with pytest.raises(ValueError, match="float32"): - _ = zr[:, :] + out = zr[:, :] + + np.testing.assert_array_equal(np.asarray(out), src) @gpu_test @@ -94,3 +96,81 @@ def test_nvcomp_blosc_decode_raises_on_non_zstd() -> None: zr = zarr.open_array(store=store, mode="r") with pytest.raises(ValueError, match="cname='zstd'"): _ = zr[:, :] + + +@gpu_test +def test_nvcomp_blosc_decode_batches_multi_block_frames( + monkeypatch: pytest.MonkeyPatch, +) -> None: + calls = 0 + original = NvcompBloscCodec._run_nvcomp_zstd_batch + + async def counted( + self: NvcompBloscCodec, + arrays: object, + *, + operation: str, + ) -> list[object]: + nonlocal calls + calls += 1 + return await original(self, arrays, operation=operation) + + monkeypatch.setattr(NvcompBloscCodec, "_run_nvcomp_zstd_batch", counted) + + src = np.arange(2048, dtype=np.float32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(2048,), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="noshuffle", blocksize=256), + ) + z[:] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:] + + assert calls == 1 + np.testing.assert_array_equal(np.asarray(out), src) + + +@gpu_test +def test_nvcomp_blosc_decode_batches_multi_chunk_frames( + monkeypatch: pytest.MonkeyPatch, +) -> None: + calls = 0 + original = NvcompBloscCodec._run_nvcomp_zstd_batch + + async def counted( + self: NvcompBloscCodec, + arrays: object, + *, + operation: str, + ) -> list[object]: + nonlocal calls + calls += 1 + return await original(self, arrays, operation=operation) + + monkeypatch.setattr(NvcompBloscCodec, "_run_nvcomp_zstd_batch", counted) + + src = np.arange(4096, dtype=np.float32).reshape(64, 64) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(16, 16), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="noshuffle", blocksize=256), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert calls == 1 + np.testing.assert_array_equal(np.asarray(out), src) diff --git a/tests/test_config.py b/tests/test_config.py index 6fc993f901..0827eb221f 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -292,11 +292,14 @@ def test_config_buffer_backwards_compatibility_gpu() -> None: @pytest.mark.gpu -def test_enable_gpu_sets_gpu_blosc_codec() -> None: +def test_enable_gpu_sets_gpu_codecs() -> None: with zarr.config.enable_gpu(): assert config.get("buffer") == "zarr.buffer.gpu.Buffer" assert config.get("ndbuffer") == "zarr.buffer.gpu.NDBuffer" assert config.get("codecs.blosc") == "zarr.codecs.gpu.NvcompBloscCodec" + assert config.get("codecs.zstd") == "zarr.codecs.gpu.NvcompZstdCodec" + assert config.get("codec_pipeline.path") == "zarr.core.codec_pipeline.BatchedCodecPipeline" + assert config.get("codec_pipeline.batch_size") == 65536 @pytest.mark.filterwarnings("error") From 80477e01d527551d1a85ea776bf5a852365e5193 Mon Sep 17 00:00:00 2001 From: OA jder bot Date: Thu, 2 Apr 2026 20:26:34 -0400 Subject: [PATCH 4/6] fix --- docs/user-guide/gpu.md | 16 ++++++++++------ tests/test_codecs/test_nvcomp.py | 5 ++++- tests/test_codecs/test_nvcomp_blosc.py | 15 ++++++++++++--- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/docs/user-guide/gpu.md b/docs/user-guide/gpu.md index 791c2094a7..2cecdd3dbb 100644 --- a/docs/user-guide/gpu.md +++ b/docs/user-guide/gpu.md @@ -1,14 +1,18 @@ # Using GPUs with Zarr -Zarr can use GPUs to accelerate your workload by running `zarr.Config.enable_gpu`. +Zarr can use GPUs to accelerate your workload by enabling GPU buffers and codecs +with `zarr.config.enable_gpu()`. !!! note - GPU codec acceleration is currently available for the Blosc codec when all of - the following hold: + With `enable_gpu()`: - - `cname="zstd"` - - `shuffle` is `bitshuffle` or `noshuffle` - - array dtype is `float32` + - array reads return `cupy.ndarray` values + - the default Zarr v3 `zstd` codec is replaced with a GPU-backed nvCOMP + implementation + - Blosc chunks can be decoded on the GPU when `cname="zstd"` and `shuffle` + is `bitshuffle` or `noshuffle` + + Blosc GPU acceleration currently applies to decode only. ## Reading data into device memory diff --git a/tests/test_codecs/test_nvcomp.py b/tests/test_codecs/test_nvcomp.py index 7b8ee0d9e2..44930b715c 100644 --- a/tests/test_codecs/test_nvcomp.py +++ b/tests/test_codecs/test_nvcomp.py @@ -120,6 +120,8 @@ def test_uses_default_codec() -> None: def test_nvcomp_zstd_batched_decode_uses_single_nvcomp_call( monkeypatch: pytest.MonkeyPatch, ) -> None: + import cupy as cp + calls = 0 original = NvcompZstdCodec._run_nvcomp_zstd_batch @@ -152,7 +154,8 @@ async def counted( out = zr[:, :] assert calls == 1 - np.testing.assert_array_equal(np.asarray(out), src) + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) def test_invalid_raises() -> None: diff --git a/tests/test_codecs/test_nvcomp_blosc.py b/tests/test_codecs/test_nvcomp_blosc.py index 7b727f1b4e..57d6ad7c4f 100644 --- a/tests/test_codecs/test_nvcomp_blosc.py +++ b/tests/test_codecs/test_nvcomp_blosc.py @@ -59,6 +59,8 @@ def test_nvcomp_blosc_decode_raises_on_byte_shuffle() -> None: @gpu_test def test_nvcomp_blosc_decode_supported_non_float32() -> None: + import cupy as cp + src = np.arange(64, dtype=np.float64).reshape(8, 8) store = zarr.storage.MemoryStore() z = zarr.create_array( @@ -75,7 +77,8 @@ def test_nvcomp_blosc_decode_supported_non_float32() -> None: zr = zarr.open_array(store=store, mode="r") out = zr[:, :] - np.testing.assert_array_equal(np.asarray(out), src) + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) @gpu_test @@ -102,6 +105,8 @@ def test_nvcomp_blosc_decode_raises_on_non_zstd() -> None: def test_nvcomp_blosc_decode_batches_multi_block_frames( monkeypatch: pytest.MonkeyPatch, ) -> None: + import cupy as cp + calls = 0 original = NvcompBloscCodec._run_nvcomp_zstd_batch @@ -134,13 +139,16 @@ async def counted( out = zr[:] assert calls == 1 - np.testing.assert_array_equal(np.asarray(out), src) + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) @gpu_test def test_nvcomp_blosc_decode_batches_multi_chunk_frames( monkeypatch: pytest.MonkeyPatch, ) -> None: + import cupy as cp + calls = 0 original = NvcompBloscCodec._run_nvcomp_zstd_batch @@ -173,4 +181,5 @@ async def counted( out = zr[:, :] assert calls == 1 - np.testing.assert_array_equal(np.asarray(out), src) + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) From b8ae2078315c1acf7e7e22935bf3cab87c9435bb Mon Sep 17 00:00:00 2001 From: OA jder bot Date: Fri, 3 Apr 2026 16:01:13 -0400 Subject: [PATCH 5/6] Reduce GPU Blosc decode kernel overhead --- src/zarr/codecs/gpu.py | 261 +++++++++++++++++++------ src/zarr/core/config.py | 1 + tests/test_codecs/test_nvcomp_blosc.py | 69 +++++++ tests/test_config.py | 1 + 4 files changed, 270 insertions(+), 62 deletions(-) diff --git a/src/zarr/codecs/gpu.py b/src/zarr/codecs/gpu.py index 1a15509229..2d747ae5b3 100644 --- a/src/zarr/codecs/gpu.py +++ b/src/zarr/codecs/gpu.py @@ -12,7 +12,7 @@ from packaging.version import Version from zarr.abc.codec import BytesBytesCodec -from zarr.codecs.blosc import BloscCodec +from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle, CName, Shuffle from zarr.codecs.zstd import parse_checksum, parse_zstd_level from zarr.core.buffer.cpu import as_numpy_array_wrapper from zarr.core.common import JSON, concurrent_map, parse_named_configuration @@ -44,36 +44,40 @@ _BLOSC_COMPFORMAT_ZSTD = 4 _BLOSC_MIN_BUFFERSIZE = 128 _BLOSC_MAX_SPLITS = 16 +_DEFAULT_BLOSC_BITUNSHUFFLE_MAX_BYTES = 100 * 1024 * 1024 -def _read_i32(source: "cp.ndarray", offset: int) -> int: - raw = cp.asnumpy(source[offset : offset + 4]).tobytes() - return int(struct.unpack(" int: + if not isinstance(value, int): + raise TypeError(f"{name} must be an int. Got {type(value)} instead.") + if value <= 0: + raise ValueError(f"{name} must be greater than 0. Got {value}.") + return value @dataclass(frozen=True) -class _BloscBlockPlan: - block_tmp: "cp.ndarray" - output: "cp.ndarray" - output_start: int - bsize: int - do_bitshuffle: bool - typesize: int +class _BloscSegmentPlan: + dest_offset: int + length: int + raw_source: "cp.ndarray | None" = None + decode_index: int | None = None @dataclass(frozen=True) -class _BloscSplitTarget: - block_plan: _BloscBlockPlan - start: int - stop: int +class _BloscBitshufflePlan: + typesize: int + blocksize: int + full_block_bytes: int + tail_bytes: int @dataclass(frozen=True) class _BloscChunkPlan: index: int - output: "cp.ndarray" + scratch: "cp.ndarray" chunk_spec: ArraySpec - block_plans: tuple[_BloscBlockPlan, ...] + segments: tuple[_BloscSegmentPlan, ...] + bitshuffle: _BloscBitshufflePlan | None class _NvcompZstdMixin: @@ -257,6 +261,37 @@ def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) class NvcompBloscCodec(_NvcompZstdMixin, BloscCodec): """GPU Blosc decoder (zstd + {bitshuffle, noshuffle}) using nvCOMP.""" + _default_bitshuffle_max_bytes = _DEFAULT_BLOSC_BITUNSHUFFLE_MAX_BYTES + bitshuffle_max_bytes: int + + def __init__( + self, + *, + typesize: int | None = None, + cname: BloscCname | CName = BloscCname.zstd, + clevel: int = 5, + shuffle: BloscShuffle | Shuffle | None = None, + blocksize: int = 0, + bitshuffle_max_bytes: int | None = None, + ) -> None: + super().__init__( + typesize=typesize, + cname=cname, + clevel=clevel, + shuffle=shuffle, + blocksize=blocksize, + ) + if bitshuffle_max_bytes is None: + bitshuffle_max_bytes = config.get( + "gpu.blosc_bitshuffle_max_bytes", + type(self)._default_bitshuffle_max_bytes, + ) + object.__setattr__( + self, + "bitshuffle_max_bytes", + _parse_positive_int(bitshuffle_max_bytes, name="bitshuffle_max_bytes"), + ) + @staticmethod def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.ndarray": bits = cp.unpackbits(src, bitorder="little") @@ -264,6 +299,43 @@ def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.n out_bits = matrix.T.reshape((-1,)) return cp.packbits(out_bits, bitorder="little") + @staticmethod + def _bitunshuffle_blocks( + src: "cp.ndarray", + *, + typesize: int, + blocksize: int, + nblocks: int, + ) -> "cp.ndarray": + bits = cp.unpackbits(src, bitorder="little") + matrix = bits.reshape((nblocks, typesize * 8, blocksize // typesize)) + out_bits = matrix.transpose((0, 2, 1)).reshape((-1,)) + return cp.packbits(out_bits, bitorder="little") + + @cached_property + def _scatter_segments_kernel(self) -> cp.RawKernel: + return cp.RawKernel( + r""" + extern "C" __global__ + void scatter_segments( + const unsigned long long* src_ptrs, + const long long* dst_offsets, + const long long* lengths, + unsigned char* out + ) { + const long long seg = static_cast(blockIdx.x); + const unsigned char* src = + reinterpret_cast(src_ptrs[seg]); + unsigned char* dst = out + dst_offsets[seg]; + const long long len = lengths[seg]; + for (long long i = threadIdx.x; i < len; i += blockDim.x) { + dst[i] = src[i]; + } + } + """, + "scatter_segments", + ) + @staticmethod def _split_count(*, typesize: int, blocksize: int, dont_split: bool, leftover_block: bool) -> int: if ( @@ -275,23 +347,82 @@ def _split_count(*, typesize: int, blocksize: int, dont_split: bool, leftover_bl return typesize return 1 - @staticmethod - def _finalize_block(block_plan: _BloscBlockPlan) -> "cp.ndarray": - if not block_plan.do_bitshuffle: - return block_plan.block_tmp - if block_plan.bsize % block_plan.typesize != 0: + def _scatter_segments( + self, + chunk_plan: _BloscChunkPlan, + decoded_splits: list["cp.ndarray"], + ) -> None: + if not chunk_plan.segments: + return + + src_ptrs = np.empty((len(chunk_plan.segments),), dtype=np.uintp) + dst_offsets = np.empty((len(chunk_plan.segments),), dtype=np.int64) + lengths = np.empty((len(chunk_plan.segments),), dtype=np.int64) + + for idx, segment in enumerate(chunk_plan.segments): + if segment.raw_source is not None: + source = segment.raw_source + else: + assert segment.decode_index is not None + source = decoded_splits[segment.decode_index] + if source.size != segment.length: + raise ValueError( + "Invalid Blosc payload: split decode size mismatch. " + f"Expected {segment.length} bytes, got {source.size}." + ) + src_ptrs[idx] = source.data.ptr + dst_offsets[idx] = segment.dest_offset + lengths[idx] = segment.length + + threads = 256 + self._scatter_segments_kernel( + (len(chunk_plan.segments),), + (threads,), + ( + cp.asarray(src_ptrs), + cp.asarray(dst_offsets), + cp.asarray(lengths), + chunk_plan.scratch, + ), + ) + + def _apply_bitshuffle(self, chunk_plan: _BloscChunkPlan) -> None: + bitshuffle = chunk_plan.bitshuffle + if bitshuffle is None: + return + + full_block_bytes = bitshuffle.full_block_bytes + if full_block_bytes > 0: + window_blocks = max(1, self.bitshuffle_max_bytes // bitshuffle.blocksize) + window_bytes = window_blocks * bitshuffle.blocksize + for start in range(0, full_block_bytes, window_bytes): + stop = min(start + window_bytes, full_block_bytes) + nblocks = (stop - start) // bitshuffle.blocksize + chunk_plan.scratch[start:stop] = self._bitunshuffle_blocks( + chunk_plan.scratch[start:stop], + typesize=bitshuffle.typesize, + blocksize=bitshuffle.blocksize, + nblocks=nblocks, + ) + + if bitshuffle.tail_bytes == 0: + return + + tail_start = full_block_bytes + tail_stop = tail_start + bitshuffle.tail_bytes + if bitshuffle.tail_bytes % bitshuffle.typesize != 0: raise ValueError( "Invalid bitshuffle payload: " - f"block bytes {block_plan.bsize} not divisible by typesize {block_plan.typesize}." + f"block bytes {bitshuffle.tail_bytes} not divisible by typesize {bitshuffle.typesize}." ) - n_elements = block_plan.bsize // block_plan.typesize + n_elements = bitshuffle.tail_bytes // bitshuffle.typesize if n_elements % 8 != 0: raise ValueError( "NvcompBloscCodec only supports bitshuffle blocks with element count divisible by 8." ) - return NvcompBloscCodec._bitunshuffle( - block_plan.block_tmp, - typesize=block_plan.typesize, + chunk_plan.scratch[tail_start:tail_stop] = self._bitunshuffle( + chunk_plan.scratch[tail_start:tail_stop], + typesize=bitshuffle.typesize, n_elements=n_elements, ) @@ -302,19 +433,21 @@ def _build_chunk_plan( source: "cp.ndarray", chunk_spec: ArraySpec, decode_inputs: list["cp.ndarray"], - decode_targets: list[_BloscSplitTarget], ) -> _BloscChunkPlan | Buffer: if source.size < _BLOSC_MAX_OVERHEAD: raise ValueError( f"Invalid Blosc payload: expected at least {_BLOSC_MAX_OVERHEAD} bytes, got {source.size}." ) - header = cp.asnumpy(source[:_BLOSC_MAX_OVERHEAD]).tobytes() - _, _, flags, typesize, nbytes, blocksize, cbytes = struct.unpack(" 0) else blocksize @@ -377,43 +510,53 @@ def _build_chunk_plan( ) neblock = bsize // nsplits - block_tmp = cp.empty((bsize,), dtype=cp.uint8) - block_plan = _BloscBlockPlan( - block_tmp=block_tmp, - output=output, - output_start=block_index * blocksize, - bsize=bsize, - do_bitshuffle=do_bitshuffle, - typesize=typesize, - ) - block_plans.append(block_plan) - p = int(bstarts[block_index]) + block_start = block_index * blocksize for split_index in range(nsplits): if p < 0 or (p + 4) > cbytes: raise ValueError("Invalid Blosc payload: split header outside payload.") - csize = _read_i32(source, p) + csize = int(struct.unpack_from(" cbytes: raise ValueError("Invalid Blosc payload: split data outside payload.") split = source[p : p + csize] p += csize - start = split_index * neblock - stop = start + neblock + dest_offset = block_start + split_index * neblock if csize == neblock: - block_tmp[start:stop] = split + segments.append( + _BloscSegmentPlan( + dest_offset=dest_offset, + length=neblock, + raw_source=split, + ) + ) else: + decode_index = len(decode_inputs) decode_inputs.append(split) - decode_targets.append( - _BloscSplitTarget(block_plan=block_plan, start=start, stop=stop) + segments.append( + _BloscSegmentPlan( + dest_offset=dest_offset, + length=neblock, + decode_index=decode_index, + ) ) + bitshuffle = None + if do_bitshuffle: + bitshuffle = _BloscBitshufflePlan( + typesize=typesize, + blocksize=blocksize, + full_block_bytes=(nblocks - (1 if leftover else 0)) * blocksize, + tail_bytes=leftover, + ) + return _BloscChunkPlan( index=index, - output=output, + scratch=scratch, chunk_spec=chunk_spec, - block_plans=tuple(block_plans), + segments=tuple(segments), + bitshuffle=bitshuffle, ) async def decode( @@ -423,7 +566,6 @@ async def decode( batch = list(chunks_and_specs) results: list[Buffer | None] = [None] * len(batch) decode_inputs: list[cp.ndarray] = [] - decode_targets: list[_BloscSplitTarget] = [] gpu_chunk_plans: list[_BloscChunkPlan] = [] cpu_entries: list[tuple[int, Buffer, ArraySpec]] = [] @@ -442,26 +584,21 @@ async def decode( source=source, chunk_spec=chunk_spec, decode_inputs=decode_inputs, - decode_targets=decode_targets, ) if isinstance(plan_or_buffer, _BloscChunkPlan): gpu_chunk_plans.append(plan_or_buffer) else: results[index] = plan_or_buffer + decoded_splits: list[cp.ndarray] = [] if decode_inputs: decoded_splits = await self._run_nvcomp_zstd_batch(decode_inputs, operation="decode") - for decoded, decode_target in zip(decoded_splits, decode_targets, strict=True): - decode_target.block_plan.block_tmp[decode_target.start : decode_target.stop] = decoded for chunk_plan in gpu_chunk_plans: - for block_plan in chunk_plan.block_plans: - block_out = self._finalize_block(block_plan) - block_plan.output[ - block_plan.output_start : block_plan.output_start + block_plan.bsize - ] = block_out + self._scatter_segments(chunk_plan, decoded_splits) + self._apply_bitshuffle(chunk_plan) results[chunk_plan.index] = chunk_plan.chunk_spec.prototype.buffer.from_array_like( - chunk_plan.output + chunk_plan.scratch ) if cpu_entries: diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index 0ad9147f04..c75ccc60e5 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -112,6 +112,7 @@ def enable_gpu(self) -> ConfigSet: "async": {"concurrency": 10, "timeout": None}, "threading": {"max_workers": None}, "json_indent": 2, + "gpu": {"blosc_bitshuffle_max_bytes": 100 * 1024 * 1024}, "codec_pipeline": { "path": "zarr.core.codec_pipeline.BatchedCodecPipeline", "batch_size": 1, diff --git a/tests/test_codecs/test_nvcomp_blosc.py b/tests/test_codecs/test_nvcomp_blosc.py index 57d6ad7c4f..deadb992ec 100644 --- a/tests/test_codecs/test_nvcomp_blosc.py +++ b/tests/test_codecs/test_nvcomp_blosc.py @@ -12,6 +12,17 @@ from zarr.testing.utils import gpu_test +def test_nvcomp_blosc_bitshuffle_max_bytes_validation() -> None: + codec = NvcompBloscCodec(bitshuffle_max_bytes=1024) + assert codec.bitshuffle_max_bytes == 1024 + + with pytest.raises(ValueError): + NvcompBloscCodec(bitshuffle_max_bytes=0) + + with pytest.raises(TypeError): + NvcompBloscCodec(bitshuffle_max_bytes="1024") # type: ignore[arg-type] + + @gpu_test @pytest.mark.parametrize("shuffle", ["bitshuffle", "noshuffle"]) def test_nvcomp_blosc_decode_supported(shuffle: str) -> None: @@ -150,7 +161,9 @@ def test_nvcomp_blosc_decode_batches_multi_chunk_frames( import cupy as cp calls = 0 + scatter_calls = 0 original = NvcompBloscCodec._run_nvcomp_zstd_batch + original_scatter = NvcompBloscCodec._scatter_segments async def counted( self: NvcompBloscCodec, @@ -162,7 +175,17 @@ async def counted( calls += 1 return await original(self, arrays, operation=operation) + def counted_scatter( + self: NvcompBloscCodec, + chunk_plan: object, + decoded_splits: object, + ) -> None: + nonlocal scatter_calls + scatter_calls += 1 + original_scatter(self, chunk_plan, decoded_splits) + monkeypatch.setattr(NvcompBloscCodec, "_run_nvcomp_zstd_batch", counted) + monkeypatch.setattr(NvcompBloscCodec, "_scatter_segments", counted_scatter) src = np.arange(4096, dtype=np.float32).reshape(64, 64) store = zarr.storage.MemoryStore() @@ -181,5 +204,51 @@ async def counted( out = zr[:, :] assert calls == 1 + assert scatter_calls == z.nchunks + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +@gpu_test +def test_nvcomp_blosc_bitshuffle_windowing_batches_full_blocks( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import cupy as cp + + calls = 0 + original = NvcompBloscCodec._bitunshuffle_blocks + + def counted( + src: object, + *, + typesize: int, + blocksize: int, + nblocks: int, + ) -> object: + nonlocal calls + calls += 1 + return original(src, typesize=typesize, blocksize=blocksize, nblocks=nblocks) + + monkeypatch.setattr(NvcompBloscCodec, "_bitunshuffle_blocks", staticmethod(counted)) + + src = np.arange(1024, dtype=np.float32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(1024,), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="bitshuffle", blocksize=256), + ) + z[:] = src + + with zarr.config.enable_gpu(), zarr.config.set( + {"gpu.blosc_bitshuffle_max_bytes": 512} + ), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:] + + assert calls == src.nbytes // 512 assert isinstance(out, cp.ndarray) cp.testing.assert_array_equal(out, cp.asarray(src)) diff --git a/tests/test_config.py b/tests/test_config.py index 0827eb221f..2479437645 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -298,6 +298,7 @@ def test_enable_gpu_sets_gpu_codecs() -> None: assert config.get("ndbuffer") == "zarr.buffer.gpu.NDBuffer" assert config.get("codecs.blosc") == "zarr.codecs.gpu.NvcompBloscCodec" assert config.get("codecs.zstd") == "zarr.codecs.gpu.NvcompZstdCodec" + assert config.get("gpu.blosc_bitshuffle_max_bytes") == 100 * 1024 * 1024 assert config.get("codec_pipeline.path") == "zarr.core.codec_pipeline.BatchedCodecPipeline" assert config.get("codec_pipeline.batch_size") == 65536 From f59c571deda1f895c60b5f390f8cebf8f09673d1 Mon Sep 17 00:00:00 2001 From: OA jder bot Date: Fri, 3 Apr 2026 19:23:01 -0400 Subject: [PATCH 6/6] Avoid full host copies in GPU Blosc metadata parsing --- src/zarr/codecs/gpu.py | 144 +++++++++++++++++++++---- tests/test_codecs/test_nvcomp_blosc.py | 40 +++++++ 2 files changed, 165 insertions(+), 19 deletions(-) diff --git a/src/zarr/codecs/gpu.py b/src/zarr/codecs/gpu.py index 2d747ae5b3..495a8e5ce3 100644 --- a/src/zarr/codecs/gpu.py +++ b/src/zarr/codecs/gpu.py @@ -336,6 +336,61 @@ def _scatter_segments_kernel(self) -> cp.RawKernel: "scatter_segments", ) + @cached_property + def _parse_split_headers_kernel(self) -> cp.RawKernel: + return cp.RawKernel( + r""" + extern "C" __global__ + void parse_split_headers( + const unsigned char* source, + const int* bstarts, + const int* split_counts, + const long long* split_bases, + const long long cbytes, + long long* src_offsets, + long long* src_lengths, + int* error_code + ) { + if (threadIdx.x != 0) { + return; + } + const int block = static_cast(blockIdx.x); + long long p = static_cast(bstarts[block]); + const int nsplits = split_counts[block]; + const long long base = split_bases[block]; + + for (int i = 0; i < nsplits; ++i) { + if (p < 0 || p + 4 > cbytes) { + atomicCAS(error_code, 0, 1); + return; + } + + const unsigned int csize_u = + static_cast(source[p]) | + (static_cast(source[p + 1]) << 8) | + (static_cast(source[p + 2]) << 16) | + (static_cast(source[p + 3]) << 24); + const int csize = static_cast(csize_u); + p += 4; + + if (csize < 0) { + atomicCAS(error_code, 0, 2); + return; + } + if (p + static_cast(csize) > cbytes) { + atomicCAS(error_code, 0, 3); + return; + } + + src_offsets[base + i] = p; + src_lengths[base + i] = csize; + p += static_cast(csize); + } + } + """, + "parse_split_headers", + ) + @staticmethod def _split_count(*, typesize: int, blocksize: int, dont_split: bool, leftover_block: bool) -> int: if ( @@ -426,6 +481,43 @@ def _apply_bitshuffle(self, chunk_plan: _BloscChunkPlan) -> None: n_elements=n_elements, ) + def _parse_split_headers( + self, + source: "cp.ndarray", + *, + bstarts: np.ndarray, + split_counts: np.ndarray, + split_bases: np.ndarray, + cbytes: int, + ) -> tuple[np.ndarray, np.ndarray]: + total_splits = int(split_counts.sum()) + src_offsets = cp.empty((total_splits,), dtype=cp.int64) + src_lengths = cp.empty((total_splits,), dtype=cp.int64) + error_code = cp.zeros((1,), dtype=cp.int32) + + self._parse_split_headers_kernel( + (len(bstarts),), + (1,), + ( + source, + cp.asarray(bstarts), + cp.asarray(split_counts), + cp.asarray(split_bases), + np.int64(cbytes), + src_offsets, + src_lengths, + error_code, + ), + ) + + error_code_host = int(cp.asnumpy(error_code)[0]) + if error_code_host == 1: + raise ValueError("Invalid Blosc payload: split header outside payload.") + if error_code_host in (2, 3): + raise ValueError("Invalid Blosc payload: split data outside payload.") + + return cp.asnumpy(src_offsets), cp.asnumpy(src_lengths) + def _build_chunk_plan( self, *, @@ -439,15 +531,12 @@ def _build_chunk_plan( f"Invalid Blosc payload: expected at least {_BLOSC_MAX_OVERHEAD} bytes, got {source.size}." ) - host_source = memoryview(cp.asnumpy(source)) - _, _, flags, typesize, nbytes, blocksize, cbytes = struct.unpack_from( - " 0) else blocksize @@ -509,19 +598,35 @@ def _build_chunk_plan( f"Invalid Blosc payload: blocksize {bsize} not divisible by nsplits {nsplits}." ) + split_counts[block_index] = nsplits + split_bases[block_index] = total_splits + total_splits += nsplits + + src_offsets, src_lengths = self._parse_split_headers( + source, + bstarts=bstarts, + split_counts=split_counts, + split_bases=split_bases, + cbytes=cbytes, + ) + + segments: list[_BloscSegmentPlan] = [] + split_index_global = 0 + for block_index in range(nblocks): + bsize = leftover if (block_index == nblocks - 1 and leftover > 0) else blocksize + leftover_block = block_index == nblocks - 1 and leftover > 0 + nsplits = self._split_count( + typesize=typesize, + blocksize=bsize, + dont_split=dont_split, + leftover_block=leftover_block, + ) neblock = bsize // nsplits - p = int(bstarts[block_index]) block_start = block_index * blocksize for split_index in range(nsplits): - if p < 0 or (p + 4) > cbytes: - raise ValueError("Invalid Blosc payload: split header outside payload.") - csize = int(struct.unpack_from(" cbytes: - raise ValueError("Invalid Blosc payload: split data outside payload.") - split = source[p : p + csize] - p += csize - + csize = int(src_lengths[split_index_global]) + src_offset = int(src_offsets[split_index_global]) + split = source[src_offset : src_offset + csize] dest_offset = block_start + split_index * neblock if csize == neblock: segments.append( @@ -541,6 +646,7 @@ def _build_chunk_plan( decode_index=decode_index, ) ) + split_index_global += 1 bitshuffle = None if do_bitshuffle: diff --git a/tests/test_codecs/test_nvcomp_blosc.py b/tests/test_codecs/test_nvcomp_blosc.py index deadb992ec..a35369079a 100644 --- a/tests/test_codecs/test_nvcomp_blosc.py +++ b/tests/test_codecs/test_nvcomp_blosc.py @@ -154,6 +154,46 @@ async def counted( cp.testing.assert_array_equal(out, cp.asarray(src)) +@gpu_test +def test_nvcomp_blosc_decode_avoids_full_chunk_host_copy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import cupy as cp + + original = cp.asnumpy + copied_sizes: list[int] = [] + + def counted(array: object) -> object: + size = int(getattr(array, "size", -1)) + copied_sizes.append(size) + if size > 1024: + raise AssertionError(f"unexpected large host copy of {size} bytes") + return original(array) + + monkeypatch.setattr(cp, "asnumpy", counted) + + rng = np.random.default_rng(0) + src = rng.standard_normal(8192, dtype=np.float32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8192,), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="noshuffle", blocksize=256), + ) + z[:] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:] + + assert copied_sizes + assert isinstance(out, cp.ndarray) + np.testing.assert_array_equal(original(out), src) + + @gpu_test def test_nvcomp_blosc_decode_batches_multi_chunk_frames( monkeypatch: pytest.MonkeyPatch,