diff --git a/docs/user-guide/gpu.md b/docs/user-guide/gpu.md index 3317bdf065..2cecdd3dbb 100644 --- a/docs/user-guide/gpu.md +++ b/docs/user-guide/gpu.md @@ -1,15 +1,18 @@ # Using GPUs with Zarr -Zarr can use GPUs to accelerate your workload by running `zarr.Config.enable_gpu`. +Zarr can use GPUs to accelerate your workload by enabling GPU buffers and codecs +with `zarr.config.enable_gpu()`. !!! note - `zarr-python` currently supports reading the ndarray data into device (GPU) - memory as the final stage of the codec pipeline. Data will still be read into - or copied to host (CPU) memory for encoding and decoding. + With `enable_gpu()`: - In the future, codecs will be available compressing and decompressing data on - the GPU, avoiding the need to move data between the host and device for - compression and decompression. + - array reads return `cupy.ndarray` values + - the default Zarr v3 `zstd` codec is replaced with a GPU-backed nvCOMP + implementation + - Blosc chunks can be decoded on the GPU when `cname="zstd"` and `shuffle` + is `bitshuffle` or `noshuffle` + + Blosc GPU acceleration currently applies to decode only. ## Reading data into device memory diff --git a/pyproject.toml b/pyproject.toml index 068caa1f0d..53f056856d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,6 +68,7 @@ remote = [ ] gpu = [ "cupy-cuda12x", + "nvidia-nvcomp-cu12", ] cli = ["typer"] # Testing extras @@ -154,6 +155,9 @@ omit = [ [tool.hatch] version.source = "vcs" +[tool.hatch.version.raw-options] +fallback_version = "3.1.0" + [tool.hatch.build] hooks.vcs.version-file = "src/zarr/_version.py" diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 4c621290e7..9beebbf295 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -3,6 +3,7 @@ from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle from zarr.codecs.bytes import BytesCodec, Endian from zarr.codecs.crc32c_ import Crc32cCodec +from zarr.codecs.gpu import NvcompBloscCodec, NvcompZstdCodec from zarr.codecs.gzip import GzipCodec from zarr.codecs.numcodecs import ( BZ2, @@ -41,6 +42,8 @@ "Crc32cCodec", "Endian", "GzipCodec", + "NvcompBloscCodec", + "NvcompZstdCodec", "ShardingCodec", "ShardingCodecIndexLocation", "TransposeCodec", diff --git a/src/zarr/codecs/gpu.py b/src/zarr/codecs/gpu.py new file mode 100644 index 0000000000..495a8e5ce3 --- /dev/null +++ b/src/zarr/codecs/gpu.py @@ -0,0 +1,723 @@ +from __future__ import annotations + +import asyncio +import struct +from dataclasses import dataclass +from functools import cached_property +from typing import TYPE_CHECKING, Any, Literal + +import numcodecs +import numpy as np +from numcodecs.zstd import Zstd as NumcodecsZstd +from packaging.version import Version + +from zarr.abc.codec import BytesBytesCodec +from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle, CName, Shuffle +from zarr.codecs.zstd import parse_checksum, parse_zstd_level +from zarr.core.buffer.cpu import as_numpy_array_wrapper +from zarr.core.common import JSON, concurrent_map, parse_named_configuration +from zarr.core.config import config +from zarr.registry import register_codec + +if TYPE_CHECKING: + from collections.abc import Iterable + from typing import Self + + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer + +try: + import cupy as cp +except ImportError: # pragma: no cover + cp = None + +try: + from nvidia import nvcomp +except ImportError: # pragma: no cover + nvcomp = None + +_BLOSC_MAX_OVERHEAD = 16 +_BLOSC_FLAG_DOSHUFFLE = 0x01 +_BLOSC_FLAG_MEMCPYED = 0x02 +_BLOSC_FLAG_DOBITSHUFFLE = 0x04 +_BLOSC_FLAG_DONT_SPLIT = 0x10 +_BLOSC_COMPFORMAT_ZSTD = 4 +_BLOSC_MIN_BUFFERSIZE = 128 +_BLOSC_MAX_SPLITS = 16 +_DEFAULT_BLOSC_BITUNSHUFFLE_MAX_BYTES = 100 * 1024 * 1024 + + +def _parse_positive_int(value: int, *, name: str) -> int: + if not isinstance(value, int): + raise TypeError(f"{name} must be an int. Got {type(value)} instead.") + if value <= 0: + raise ValueError(f"{name} must be greater than 0. Got {value}.") + return value + + +@dataclass(frozen=True) +class _BloscSegmentPlan: + dest_offset: int + length: int + raw_source: "cp.ndarray | None" = None + decode_index: int | None = None + + +@dataclass(frozen=True) +class _BloscBitshufflePlan: + typesize: int + blocksize: int + full_block_bytes: int + tail_bytes: int + + +@dataclass(frozen=True) +class _BloscChunkPlan: + index: int + scratch: "cp.ndarray" + chunk_spec: ArraySpec + segments: tuple[_BloscSegmentPlan, ...] + bitshuffle: _BloscBitshufflePlan | None + + +class _NvcompZstdMixin: + def _require_nvcomp(self, *, codec_name: str, operation: Literal["encode", "decode"]) -> None: + if nvcomp is None: + raise RuntimeError( + f"{codec_name} requires `nvidia-nvcomp-cu12` to {operation} GPU-backed zstd chunks." + ) + + @cached_property + def _nvcomp_zstd_codec(self) -> nvcomp.Codec: + assert cp is not None + assert nvcomp is not None + device = cp.cuda.Device() + stream = cp.cuda.get_current_stream() + return nvcomp.Codec( + algorithm="Zstd", + bitstream_kind=nvcomp.BitstreamKind.RAW, + device_id=device.id, + cuda_stream=stream.ptr, + ) + + @staticmethod + def _coerce_nvcomp_output(array: Any) -> "cp.ndarray": + out = cp.asarray(array) + if out.dtype != np.dtype("B"): + out = out.view(np.dtype("B")) + return out + + async def _run_nvcomp_zstd_batch( + self, + arrays: Iterable["cp.ndarray"], + *, + operation: Literal["decode", "encode"], + ) -> list["cp.ndarray"]: + array_list = list(arrays) + if not array_list: + return [] + + outputs = getattr(self._nvcomp_zstd_codec, operation)(nvcomp.as_arrays(array_list)) + event = cp.cuda.Event() + event.record() + await asyncio.to_thread(event.synchronize) + return [self._coerce_nvcomp_output(output) for output in outputs] + + +@dataclass(frozen=True) +class NvcompZstdCodec(_NvcompZstdMixin, BytesBytesCodec): + """GPU-backed zstd codec using nvCOMP for batched encode/decode.""" + + is_fixed_size = True + + level: int = 0 + checksum: bool = False + + def __init__(self, *, level: int = 0, checksum: bool = False) -> None: + _numcodecs_version = Version(numcodecs.__version__) + if _numcodecs_version < Version("0.13.0"): + raise RuntimeError( + "numcodecs version >= 0.13.0 is required to use the zstd codec. " + f"Version {_numcodecs_version} is currently installed." + ) + + object.__setattr__(self, "level", parse_zstd_level(level)) + object.__setattr__(self, "checksum", parse_checksum(checksum)) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "zstd") + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> dict[str, JSON]: + return {"name": "zstd", "configuration": {"level": self.level, "checksum": self.checksum}} + + @cached_property + def _cpu_zstd_codec(self) -> NumcodecsZstd: + return NumcodecsZstd.from_config({"level": self.level, "checksum": self.checksum}) + + async def _decode_single_cpu( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return await asyncio.to_thread( + as_numpy_array_wrapper, + self._cpu_zstd_codec.decode, + chunk_bytes, + chunk_spec.prototype, + ) + + async def _encode_single_cpu( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return await asyncio.to_thread( + as_numpy_array_wrapper, + self._cpu_zstd_codec.encode, + chunk_bytes, + chunk_spec.prototype, + ) + + async def decode( + self, + chunks_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + batch = list(chunks_and_specs) + results: list[Buffer | None] = [None] * len(batch) + gpu_entries: list[tuple[int, cp.ndarray, ArraySpec]] = [] + cpu_entries: list[tuple[int, Buffer, ArraySpec]] = [] + + for index, (chunk_bytes, chunk_spec) in enumerate(batch): + if chunk_bytes is None: + continue + source = chunk_bytes.as_array_like() + if cp is not None and isinstance(source, cp.ndarray): + gpu_entries.append((index, source, chunk_spec)) + else: + cpu_entries.append((index, chunk_bytes, chunk_spec)) + + if gpu_entries: + self._require_nvcomp(codec_name=type(self).__name__, operation="decode") + outputs = await self._run_nvcomp_zstd_batch( + (source for _, source, _ in gpu_entries), operation="decode" + ) + for (index, _, chunk_spec), output in zip(gpu_entries, outputs, strict=True): + results[index] = chunk_spec.prototype.buffer.from_array_like(output) + + if cpu_entries: + cpu_outputs = await concurrent_map( + [(chunk_bytes, chunk_spec) for _, chunk_bytes, chunk_spec in cpu_entries], + self._decode_single_cpu, + config.get("async.concurrency"), + ) + for (index, _, _), output in zip(cpu_entries, cpu_outputs, strict=True): + results[index] = output + + return results + + async def encode( + self, + chunks_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + batch = list(chunks_and_specs) + results: list[Buffer | None] = [None] * len(batch) + gpu_entries: list[tuple[int, cp.ndarray, ArraySpec]] = [] + cpu_entries: list[tuple[int, Buffer, ArraySpec]] = [] + + for index, (chunk_bytes, chunk_spec) in enumerate(batch): + if chunk_bytes is None: + continue + source = chunk_bytes.as_array_like() + if cp is not None and isinstance(source, cp.ndarray): + gpu_entries.append((index, source, chunk_spec)) + else: + cpu_entries.append((index, chunk_bytes, chunk_spec)) + + if gpu_entries: + self._require_nvcomp(codec_name=type(self).__name__, operation="encode") + outputs = await self._run_nvcomp_zstd_batch( + (source for _, source, _ in gpu_entries), operation="encode" + ) + for (index, _, chunk_spec), output in zip(gpu_entries, outputs, strict=True): + results[index] = chunk_spec.prototype.buffer.from_array_like(output) + + if cpu_entries: + cpu_outputs = await concurrent_map( + [(chunk_bytes, chunk_spec) for _, chunk_bytes, chunk_spec in cpu_entries], + self._encode_single_cpu, + config.get("async.concurrency"), + ) + for (index, _, _), output in zip(cpu_entries, cpu_outputs, strict=True): + results[index] = output + + return results + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError + + +class NvcompBloscCodec(_NvcompZstdMixin, BloscCodec): + """GPU Blosc decoder (zstd + {bitshuffle, noshuffle}) using nvCOMP.""" + + _default_bitshuffle_max_bytes = _DEFAULT_BLOSC_BITUNSHUFFLE_MAX_BYTES + bitshuffle_max_bytes: int + + def __init__( + self, + *, + typesize: int | None = None, + cname: BloscCname | CName = BloscCname.zstd, + clevel: int = 5, + shuffle: BloscShuffle | Shuffle | None = None, + blocksize: int = 0, + bitshuffle_max_bytes: int | None = None, + ) -> None: + super().__init__( + typesize=typesize, + cname=cname, + clevel=clevel, + shuffle=shuffle, + blocksize=blocksize, + ) + if bitshuffle_max_bytes is None: + bitshuffle_max_bytes = config.get( + "gpu.blosc_bitshuffle_max_bytes", + type(self)._default_bitshuffle_max_bytes, + ) + object.__setattr__( + self, + "bitshuffle_max_bytes", + _parse_positive_int(bitshuffle_max_bytes, name="bitshuffle_max_bytes"), + ) + + @staticmethod + def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.ndarray": + bits = cp.unpackbits(src, bitorder="little") + matrix = bits.reshape((typesize * 8, n_elements)) + out_bits = matrix.T.reshape((-1,)) + return cp.packbits(out_bits, bitorder="little") + + @staticmethod + def _bitunshuffle_blocks( + src: "cp.ndarray", + *, + typesize: int, + blocksize: int, + nblocks: int, + ) -> "cp.ndarray": + bits = cp.unpackbits(src, bitorder="little") + matrix = bits.reshape((nblocks, typesize * 8, blocksize // typesize)) + out_bits = matrix.transpose((0, 2, 1)).reshape((-1,)) + return cp.packbits(out_bits, bitorder="little") + + @cached_property + def _scatter_segments_kernel(self) -> cp.RawKernel: + return cp.RawKernel( + r""" + extern "C" __global__ + void scatter_segments( + const unsigned long long* src_ptrs, + const long long* dst_offsets, + const long long* lengths, + unsigned char* out + ) { + const long long seg = static_cast(blockIdx.x); + const unsigned char* src = + reinterpret_cast(src_ptrs[seg]); + unsigned char* dst = out + dst_offsets[seg]; + const long long len = lengths[seg]; + for (long long i = threadIdx.x; i < len; i += blockDim.x) { + dst[i] = src[i]; + } + } + """, + "scatter_segments", + ) + + @cached_property + def _parse_split_headers_kernel(self) -> cp.RawKernel: + return cp.RawKernel( + r""" + extern "C" __global__ + void parse_split_headers( + const unsigned char* source, + const int* bstarts, + const int* split_counts, + const long long* split_bases, + const long long cbytes, + long long* src_offsets, + long long* src_lengths, + int* error_code + ) { + if (threadIdx.x != 0) { + return; + } + const int block = static_cast(blockIdx.x); + long long p = static_cast(bstarts[block]); + const int nsplits = split_counts[block]; + const long long base = split_bases[block]; + + for (int i = 0; i < nsplits; ++i) { + if (p < 0 || p + 4 > cbytes) { + atomicCAS(error_code, 0, 1); + return; + } + + const unsigned int csize_u = + static_cast(source[p]) | + (static_cast(source[p + 1]) << 8) | + (static_cast(source[p + 2]) << 16) | + (static_cast(source[p + 3]) << 24); + const int csize = static_cast(csize_u); + p += 4; + + if (csize < 0) { + atomicCAS(error_code, 0, 2); + return; + } + if (p + static_cast(csize) > cbytes) { + atomicCAS(error_code, 0, 3); + return; + } + + src_offsets[base + i] = p; + src_lengths[base + i] = csize; + p += static_cast(csize); + } + } + """, + "parse_split_headers", + ) + + @staticmethod + def _split_count(*, typesize: int, blocksize: int, dont_split: bool, leftover_block: bool) -> int: + if ( + (not dont_split) + and (typesize <= _BLOSC_MAX_SPLITS) + and ((blocksize // typesize) >= _BLOSC_MIN_BUFFERSIZE) + and (not leftover_block) + ): + return typesize + return 1 + + def _scatter_segments( + self, + chunk_plan: _BloscChunkPlan, + decoded_splits: list["cp.ndarray"], + ) -> None: + if not chunk_plan.segments: + return + + src_ptrs = np.empty((len(chunk_plan.segments),), dtype=np.uintp) + dst_offsets = np.empty((len(chunk_plan.segments),), dtype=np.int64) + lengths = np.empty((len(chunk_plan.segments),), dtype=np.int64) + + for idx, segment in enumerate(chunk_plan.segments): + if segment.raw_source is not None: + source = segment.raw_source + else: + assert segment.decode_index is not None + source = decoded_splits[segment.decode_index] + if source.size != segment.length: + raise ValueError( + "Invalid Blosc payload: split decode size mismatch. " + f"Expected {segment.length} bytes, got {source.size}." + ) + src_ptrs[idx] = source.data.ptr + dst_offsets[idx] = segment.dest_offset + lengths[idx] = segment.length + + threads = 256 + self._scatter_segments_kernel( + (len(chunk_plan.segments),), + (threads,), + ( + cp.asarray(src_ptrs), + cp.asarray(dst_offsets), + cp.asarray(lengths), + chunk_plan.scratch, + ), + ) + + def _apply_bitshuffle(self, chunk_plan: _BloscChunkPlan) -> None: + bitshuffle = chunk_plan.bitshuffle + if bitshuffle is None: + return + + full_block_bytes = bitshuffle.full_block_bytes + if full_block_bytes > 0: + window_blocks = max(1, self.bitshuffle_max_bytes // bitshuffle.blocksize) + window_bytes = window_blocks * bitshuffle.blocksize + for start in range(0, full_block_bytes, window_bytes): + stop = min(start + window_bytes, full_block_bytes) + nblocks = (stop - start) // bitshuffle.blocksize + chunk_plan.scratch[start:stop] = self._bitunshuffle_blocks( + chunk_plan.scratch[start:stop], + typesize=bitshuffle.typesize, + blocksize=bitshuffle.blocksize, + nblocks=nblocks, + ) + + if bitshuffle.tail_bytes == 0: + return + + tail_start = full_block_bytes + tail_stop = tail_start + bitshuffle.tail_bytes + if bitshuffle.tail_bytes % bitshuffle.typesize != 0: + raise ValueError( + "Invalid bitshuffle payload: " + f"block bytes {bitshuffle.tail_bytes} not divisible by typesize {bitshuffle.typesize}." + ) + n_elements = bitshuffle.tail_bytes // bitshuffle.typesize + if n_elements % 8 != 0: + raise ValueError( + "NvcompBloscCodec only supports bitshuffle blocks with element count divisible by 8." + ) + chunk_plan.scratch[tail_start:tail_stop] = self._bitunshuffle( + chunk_plan.scratch[tail_start:tail_stop], + typesize=bitshuffle.typesize, + n_elements=n_elements, + ) + + def _parse_split_headers( + self, + source: "cp.ndarray", + *, + bstarts: np.ndarray, + split_counts: np.ndarray, + split_bases: np.ndarray, + cbytes: int, + ) -> tuple[np.ndarray, np.ndarray]: + total_splits = int(split_counts.sum()) + src_offsets = cp.empty((total_splits,), dtype=cp.int64) + src_lengths = cp.empty((total_splits,), dtype=cp.int64) + error_code = cp.zeros((1,), dtype=cp.int32) + + self._parse_split_headers_kernel( + (len(bstarts),), + (1,), + ( + source, + cp.asarray(bstarts), + cp.asarray(split_counts), + cp.asarray(split_bases), + np.int64(cbytes), + src_offsets, + src_lengths, + error_code, + ), + ) + + error_code_host = int(cp.asnumpy(error_code)[0]) + if error_code_host == 1: + raise ValueError("Invalid Blosc payload: split header outside payload.") + if error_code_host in (2, 3): + raise ValueError("Invalid Blosc payload: split data outside payload.") + + return cp.asnumpy(src_offsets), cp.asnumpy(src_lengths) + + def _build_chunk_plan( + self, + *, + index: int, + source: "cp.ndarray", + chunk_spec: ArraySpec, + decode_inputs: list["cp.ndarray"], + ) -> _BloscChunkPlan | Buffer: + if source.size < _BLOSC_MAX_OVERHEAD: + raise ValueError( + f"Invalid Blosc payload: expected at least {_BLOSC_MAX_OVERHEAD} bytes, got {source.size}." + ) + + header = memoryview(cp.asnumpy(source[:_BLOSC_MAX_OVERHEAD])) + _, _, flags, typesize, nbytes, blocksize, cbytes = struct.unpack_from("> 5 + + if do_shuffle: + raise ValueError("NvcompBloscCodec does not support byte-shuffle Blosc chunks.") + if compformat != _BLOSC_COMPFORMAT_ZSTD: + raise ValueError( + "NvcompBloscCodec only supports Blosc chunks with cname='zstd'. " + f"Got compformat={compformat}." + ) + + if is_memcpyed: + if cbytes < _BLOSC_MAX_OVERHEAD + nbytes: + raise ValueError("Invalid memcpyed Blosc payload: missing raw bytes after header.") + raw = source[_BLOSC_MAX_OVERHEAD : _BLOSC_MAX_OVERHEAD + nbytes] + return chunk_spec.prototype.buffer.from_array_like(raw.copy()) + + if blocksize == 0: + raise ValueError("Invalid Blosc payload: blocksize must be > 0.") + if nbytes % typesize != 0: + raise ValueError( + f"Invalid Blosc payload: nbytes={nbytes} is not divisible by typesize={typesize}." + ) + + leftover = nbytes % blocksize + nblocks = nbytes // blocksize + (1 if leftover else 0) + + bstarts_offset = _BLOSC_MAX_OVERHEAD + bstarts_size = nblocks * 4 + bstarts_end = bstarts_offset + bstarts_size + if cbytes < bstarts_end: + raise ValueError("Invalid Blosc payload: missing block-start table.") + + bstarts = np.frombuffer( + cp.asnumpy(source[bstarts_offset:bstarts_end]).tobytes(), dtype=" 0) else blocksize + leftover_block = block_index == nblocks - 1 and leftover > 0 + nsplits = self._split_count( + typesize=typesize, + blocksize=bsize, + dont_split=dont_split, + leftover_block=leftover_block, + ) + if bsize % nsplits != 0: + raise ValueError( + f"Invalid Blosc payload: blocksize {bsize} not divisible by nsplits {nsplits}." + ) + + split_counts[block_index] = nsplits + split_bases[block_index] = total_splits + total_splits += nsplits + + src_offsets, src_lengths = self._parse_split_headers( + source, + bstarts=bstarts, + split_counts=split_counts, + split_bases=split_bases, + cbytes=cbytes, + ) + + segments: list[_BloscSegmentPlan] = [] + split_index_global = 0 + for block_index in range(nblocks): + bsize = leftover if (block_index == nblocks - 1 and leftover > 0) else blocksize + leftover_block = block_index == nblocks - 1 and leftover > 0 + nsplits = self._split_count( + typesize=typesize, + blocksize=bsize, + dont_split=dont_split, + leftover_block=leftover_block, + ) + neblock = bsize // nsplits + block_start = block_index * blocksize + for split_index in range(nsplits): + csize = int(src_lengths[split_index_global]) + src_offset = int(src_offsets[split_index_global]) + split = source[src_offset : src_offset + csize] + dest_offset = block_start + split_index * neblock + if csize == neblock: + segments.append( + _BloscSegmentPlan( + dest_offset=dest_offset, + length=neblock, + raw_source=split, + ) + ) + else: + decode_index = len(decode_inputs) + decode_inputs.append(split) + segments.append( + _BloscSegmentPlan( + dest_offset=dest_offset, + length=neblock, + decode_index=decode_index, + ) + ) + split_index_global += 1 + + bitshuffle = None + if do_bitshuffle: + bitshuffle = _BloscBitshufflePlan( + typesize=typesize, + blocksize=blocksize, + full_block_bytes=(nblocks - (1 if leftover else 0)) * blocksize, + tail_bytes=leftover, + ) + + return _BloscChunkPlan( + index=index, + scratch=scratch, + chunk_spec=chunk_spec, + segments=tuple(segments), + bitshuffle=bitshuffle, + ) + + async def decode( + self, + chunks_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + batch = list(chunks_and_specs) + results: list[Buffer | None] = [None] * len(batch) + decode_inputs: list[cp.ndarray] = [] + gpu_chunk_plans: list[_BloscChunkPlan] = [] + cpu_entries: list[tuple[int, Buffer, ArraySpec]] = [] + + for index, (chunk_bytes, chunk_spec) in enumerate(batch): + if chunk_bytes is None: + continue + + source = chunk_bytes.as_array_like() + if cp is None or not isinstance(source, cp.ndarray): + cpu_entries.append((index, chunk_bytes, chunk_spec)) + continue + + self._require_nvcomp(codec_name=type(self).__name__, operation="decode") + plan_or_buffer = self._build_chunk_plan( + index=index, + source=source, + chunk_spec=chunk_spec, + decode_inputs=decode_inputs, + ) + if isinstance(plan_or_buffer, _BloscChunkPlan): + gpu_chunk_plans.append(plan_or_buffer) + else: + results[index] = plan_or_buffer + + decoded_splits: list[cp.ndarray] = [] + if decode_inputs: + decoded_splits = await self._run_nvcomp_zstd_batch(decode_inputs, operation="decode") + + for chunk_plan in gpu_chunk_plans: + self._scatter_segments(chunk_plan, decoded_splits) + self._apply_bitshuffle(chunk_plan) + results[chunk_plan.index] = chunk_plan.chunk_spec.prototype.buffer.from_array_like( + chunk_plan.scratch + ) + + if cpu_entries: + cpu_outputs = await concurrent_map( + [(chunk_bytes, chunk_spec) for _, chunk_bytes, chunk_spec in cpu_entries], + super()._decode_single, + config.get("async.concurrency"), + ) + for (index, _, _), output in zip(cpu_entries, cpu_outputs, strict=True): + results[index] = output + + return results + + +register_codec("zstd", NvcompZstdCodec, qualname="zarr.codecs.gpu.NvcompZstdCodec") +register_codec("blosc", NvcompBloscCodec, qualname="zarr.codecs.gpu.NvcompBloscCodec") diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 564d0e915a..11673cbece 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -28,7 +28,6 @@ from zarr.codecs._v2 import V2Codec from zarr.codecs.bytes import BytesCodec from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec -from zarr.codecs.zstd import ZstdCodec from zarr.core._info import ArrayInfo from zarr.core.array_spec import ArrayConfig, ArrayConfigLike, parse_array_config from zarr.core.attributes import Attributes @@ -128,6 +127,7 @@ _parse_array_array_codec, _parse_array_bytes_codec, _parse_bytes_bytes_codec, + get_codec_class, get_pipeline_class, ) from zarr.storage._common import StorePath, ensure_no_existing_node, make_store_path @@ -5040,9 +5040,9 @@ def default_compressors_v3(dtype: ZDType[Any, Any]) -> tuple[BytesBytesCodec, .. """ Given a data type, return the default compressors for that data type. - This is just a tuple containing ``ZstdCodec`` + This is just a tuple containing an instance of the configured ``zstd`` codec class. """ - return (ZstdCodec(),) + return (cast("BytesBytesCodec", get_codec_class("zstd")()),) def default_serializer_v3(dtype: ZDType[Any, Any]) -> ArrayBytesCodec: diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index f8f8ea4f5f..c75ccc60e5 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -64,7 +64,18 @@ def enable_gpu(self) -> ConfigSet: Configure Zarr to use GPUs where possible. """ return self.set( - {"buffer": "zarr.buffer.gpu.Buffer", "ndbuffer": "zarr.buffer.gpu.NDBuffer"} + { + "buffer": "zarr.buffer.gpu.Buffer", + "ndbuffer": "zarr.buffer.gpu.NDBuffer", + "codecs": { + "blosc": "zarr.codecs.gpu.NvcompBloscCodec", + "zstd": "zarr.codecs.gpu.NvcompZstdCodec", + }, + "codec_pipeline": { + "path": "zarr.core.codec_pipeline.BatchedCodecPipeline", + "batch_size": 65536, + }, + } ) @@ -101,6 +112,7 @@ def enable_gpu(self) -> ConfigSet: "async": {"concurrency": 10, "timeout": None}, "threading": {"max_workers": None}, "json_indent": 2, + "gpu": {"blosc_bitshuffle_max_bytes": 100 * 1024 * 1024}, "codec_pipeline": { "path": "zarr.core.codec_pipeline.BatchedCodecPipeline", "batch_size": 1, diff --git a/tests/test_codecs/test_codecs.py b/tests/test_codecs/test_codecs.py index fa2017876e..7da8462259 100644 --- a/tests/test_codecs/test_codecs.py +++ b/tests/test_codecs/test_codecs.py @@ -16,12 +16,14 @@ GzipCodec, ShardingCodec, TransposeCodec, + ZstdCodec, ) from zarr.core.buffer import default_buffer_prototype from zarr.core.indexing import BasicSelection, decode_morton, morton_order_iter from zarr.core.metadata.v3 import ArrayV3Metadata from zarr.dtype import UInt8 from zarr.errors import ZarrUserWarning +from zarr.registry import register_codec from zarr.storage import StorePath if TYPE_CHECKING: @@ -401,3 +403,22 @@ async def test_resize(store: Store) -> None: assert await store.get(f"{path}/0.1", prototype=default_buffer_prototype()) is not None assert await store.get(f"{path}/1.0", prototype=default_buffer_prototype()) is None assert await store.get(f"{path}/1.1", prototype=default_buffer_prototype()) is None + + +def test_uses_default_codec() -> None: + class MyZstdCodec(ZstdCodec): + pass + + register_codec("zstd", MyZstdCodec) + + with zarr.config.set( + {"codecs": {"zstd": f"{MyZstdCodec.__module__}.{MyZstdCodec.__qualname__}"}} + ): + a = zarr.create_array( + StorePath(zarr.storage.MemoryStore(), path="mycodec"), + shape=(10, 10), + chunks=(10, 10), + dtype="int32", + ) + assert a.metadata.zarr_format == 3 + assert isinstance(a.metadata.codecs[-1], MyZstdCodec) diff --git a/tests/test_codecs/test_nvcomp.py b/tests/test_codecs/test_nvcomp.py new file mode 100644 index 0000000000..44930b715c --- /dev/null +++ b/tests/test_codecs/test_nvcomp.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import contextlib +import typing +import warnings +from collections.abc import Iterator + +import numpy as np +import pytest + +import zarr +from zarr.abc.store import Store +from zarr.buffer.gpu import buffer_prototype +from zarr.codecs import NvcompZstdCodec, ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.storage import StorePath +from zarr.testing.utils import gpu_test + +if typing.TYPE_CHECKING: + from zarr.core.common import JSON + + +@gpu_test +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("checksum", [True, False]) +@pytest.mark.parametrize( + "selection", + [ + (slice(None), slice(None)), + (slice(4, None), slice(4, None)), + ], +) +def test_nvcomp_zstd(store: Store, checksum: bool, selection: tuple[slice, slice]) -> None: + import cupy as cp + + with zarr.config.enable_gpu(): + data = cp.arange(0, 256, dtype="uint16").reshape((16, 16)) + + a = zarr.create_array( + StorePath(store, path="nvcomp_zstd"), + shape=data.shape, + chunks=(4, 4), + dtype=data.dtype, + fill_value=0, + compressors=NvcompZstdCodec(level=0, checksum=checksum), + ) + + a[selection] = data[selection] + + if selection == (slice(None), slice(None)): + cp.testing.assert_array_equal(data[selection], a[selection]) + cp.testing.assert_array_equal(data[:, :], a[:, :]) + else: + assert a.nchunks_initialized < a.nchunks + expected = cp.full(data.shape, a.fill_value) + expected[selection] = data[selection] + cp.testing.assert_array_equal(expected[selection], a[selection]) + cp.testing.assert_array_equal(expected[:, :], a[:, :]) + + +@gpu_test +@pytest.mark.parametrize("host_encode", [True, False]) +def test_gpu_codec_compatibility(host_encode: bool) -> None: + import cupy as cp + + @contextlib.contextmanager + def gpu_context() -> Iterator[None]: + with zarr.config.enable_gpu(): + yield + + if host_encode: + write_ctx: contextlib.AbstractContextManager[None] = contextlib.nullcontext() + read_ctx: contextlib.AbstractContextManager[None] = gpu_context() + write_data = np.arange(16, dtype="int32").reshape(4, 4) + read_data = cp.array(write_data) + xp = cp + expected_warning: pytest.WarningsRecorder | contextlib.AbstractContextManager[None] = ( + pytest.warns(zarr.errors.ZarrUserWarning) + ) + else: + write_ctx = gpu_context() + read_ctx = contextlib.nullcontext() + write_data = cp.arange(16, dtype="int32").reshape(4, 4) + read_data = write_data.get() + xp = np + expected_warning = contextlib.nullcontext() + + store = zarr.storage.MemoryStore() + + with write_ctx: + z = zarr.create_array( + store=store, + shape=write_data.shape, + chunks=(4, 4), + dtype=write_data.dtype, + ) + z[:] = write_data + + with read_ctx, expected_warning: + z = zarr.open_array(store=store, mode="r") + result = z[:] + assert isinstance(result, type(read_data)) + xp.testing.assert_array_equal(result, read_data) + + +@gpu_test +def test_uses_default_codec() -> None: + with zarr.config.enable_gpu(): + a = zarr.create_array( + StorePath(zarr.storage.MemoryStore(), path="nvcomp_zstd"), + shape=(10, 10), + chunks=(10, 10), + dtype="int32", + ) + assert a.metadata.zarr_format == 3 + assert isinstance(a.metadata.codecs[-1], NvcompZstdCodec) + + +@gpu_test +def test_nvcomp_zstd_batched_decode_uses_single_nvcomp_call( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import cupy as cp + + calls = 0 + original = NvcompZstdCodec._run_nvcomp_zstd_batch + + async def counted( + self: NvcompZstdCodec, + arrays: typing.Iterable[typing.Any], + *, + operation: str, + ) -> list[typing.Any]: + nonlocal calls + calls += 1 + return await original(self, arrays, operation=typing.cast("typing.Any", operation)) + + monkeypatch.setattr(NvcompZstdCodec, "_run_nvcomp_zstd_batch", counted) + + src = np.arange(32 * 32, dtype=np.int32).reshape(32, 32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=ZstdCodec(level=0, checksum=False), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=zarr.errors.ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert calls == 1 + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +def test_invalid_raises() -> None: + with pytest.raises(ValueError): + NvcompZstdCodec(level=100, checksum=False) + + with pytest.raises(TypeError): + NvcompZstdCodec(level="100", checksum=False) # type: ignore[arg-type] + + with pytest.raises(TypeError): + NvcompZstdCodec(checksum="False") # type: ignore[arg-type] + + +def test_nvcomp_from_dict() -> None: + config: dict[str, JSON] = { + "name": "zstd", + "configuration": { + "level": 0, + "checksum": False, + }, + } + codec = NvcompZstdCodec.from_dict(config) + assert codec.level == 0 + assert codec.checksum is False + + +def test_compute_encoded_chunk_size() -> None: + codec = NvcompZstdCodec(level=0, checksum=False) + with pytest.raises(NotImplementedError): + codec.compute_encoded_size( + _input_byte_length=0, + _chunk_spec=ArraySpec( + shape=(10, 10), + dtype=zarr.core.dtype.Int32(), + fill_value=0, + config=ArrayConfig(order="C", write_empty_chunks=False), + prototype=buffer_prototype, + ), + ) + + +@pytest.mark.asyncio +async def test_nvcomp_zstd_decode_none() -> None: + codec = NvcompZstdCodec(level=0, checksum=False) + chunks_and_specs = [ + ( + None, + ArraySpec( + shape=(10, 10), + dtype=zarr.core.dtype.Int32(), + fill_value=0, + config=ArrayConfig(order="C", write_empty_chunks=False), + prototype=buffer_prototype, + ), + ) + ] + result = await codec.decode(chunks_and_specs) + assert result == [None] + + +@pytest.mark.asyncio +async def test_nvcomp_zstd_encode_none() -> None: + codec = NvcompZstdCodec(level=0, checksum=False) + chunks_and_specs = [ + ( + None, + ArraySpec( + shape=(10, 10), + dtype=zarr.core.dtype.Int32(), + fill_value=0, + config=ArrayConfig(order="C", write_empty_chunks=False), + prototype=buffer_prototype, + ), + ) + ] + result = await codec.encode(chunks_and_specs) + assert result == [None] diff --git a/tests/test_codecs/test_nvcomp_blosc.py b/tests/test_codecs/test_nvcomp_blosc.py new file mode 100644 index 0000000000..a35369079a --- /dev/null +++ b/tests/test_codecs/test_nvcomp_blosc.py @@ -0,0 +1,294 @@ +from __future__ import annotations + +import warnings + +import numpy as np +import pytest + +import zarr +from zarr.codecs import NvcompBloscCodec +from zarr.codecs.blosc import BloscCodec +from zarr.errors import ZarrUserWarning +from zarr.testing.utils import gpu_test + + +def test_nvcomp_blosc_bitshuffle_max_bytes_validation() -> None: + codec = NvcompBloscCodec(bitshuffle_max_bytes=1024) + assert codec.bitshuffle_max_bytes == 1024 + + with pytest.raises(ValueError): + NvcompBloscCodec(bitshuffle_max_bytes=0) + + with pytest.raises(TypeError): + NvcompBloscCodec(bitshuffle_max_bytes="1024") # type: ignore[arg-type] + + +@gpu_test +@pytest.mark.parametrize("shuffle", ["bitshuffle", "noshuffle"]) +def test_nvcomp_blosc_decode_supported(shuffle: str) -> None: + import cupy as cp + + src = np.arange(256, dtype=np.float32).reshape(16, 16) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle=shuffle), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_byte_shuffle() -> None: + src = np.arange(64, dtype=np.float32).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="shuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="byte-shuffle"): + _ = zr[:, :] + + +@gpu_test +def test_nvcomp_blosc_decode_supported_non_float32() -> None: + import cupy as cp + + src = np.arange(64, dtype=np.float64).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="bitshuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_non_zstd() -> None: + src = np.arange(64, dtype=np.float32).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="lz4", shuffle="noshuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="cname='zstd'"): + _ = zr[:, :] + + +@gpu_test +def test_nvcomp_blosc_decode_batches_multi_block_frames( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import cupy as cp + + calls = 0 + original = NvcompBloscCodec._run_nvcomp_zstd_batch + + async def counted( + self: NvcompBloscCodec, + arrays: object, + *, + operation: str, + ) -> list[object]: + nonlocal calls + calls += 1 + return await original(self, arrays, operation=operation) + + monkeypatch.setattr(NvcompBloscCodec, "_run_nvcomp_zstd_batch", counted) + + src = np.arange(2048, dtype=np.float32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(2048,), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="noshuffle", blocksize=256), + ) + z[:] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:] + + assert calls == 1 + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +@gpu_test +def test_nvcomp_blosc_decode_avoids_full_chunk_host_copy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import cupy as cp + + original = cp.asnumpy + copied_sizes: list[int] = [] + + def counted(array: object) -> object: + size = int(getattr(array, "size", -1)) + copied_sizes.append(size) + if size > 1024: + raise AssertionError(f"unexpected large host copy of {size} bytes") + return original(array) + + monkeypatch.setattr(cp, "asnumpy", counted) + + rng = np.random.default_rng(0) + src = rng.standard_normal(8192, dtype=np.float32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8192,), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="noshuffle", blocksize=256), + ) + z[:] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:] + + assert copied_sizes + assert isinstance(out, cp.ndarray) + np.testing.assert_array_equal(original(out), src) + + +@gpu_test +def test_nvcomp_blosc_decode_batches_multi_chunk_frames( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import cupy as cp + + calls = 0 + scatter_calls = 0 + original = NvcompBloscCodec._run_nvcomp_zstd_batch + original_scatter = NvcompBloscCodec._scatter_segments + + async def counted( + self: NvcompBloscCodec, + arrays: object, + *, + operation: str, + ) -> list[object]: + nonlocal calls + calls += 1 + return await original(self, arrays, operation=operation) + + def counted_scatter( + self: NvcompBloscCodec, + chunk_plan: object, + decoded_splits: object, + ) -> None: + nonlocal scatter_calls + scatter_calls += 1 + original_scatter(self, chunk_plan, decoded_splits) + + monkeypatch.setattr(NvcompBloscCodec, "_run_nvcomp_zstd_batch", counted) + monkeypatch.setattr(NvcompBloscCodec, "_scatter_segments", counted_scatter) + + src = np.arange(4096, dtype=np.float32).reshape(64, 64) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(16, 16), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="noshuffle", blocksize=256), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert calls == 1 + assert scatter_calls == z.nchunks + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +@gpu_test +def test_nvcomp_blosc_bitshuffle_windowing_batches_full_blocks( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import cupy as cp + + calls = 0 + original = NvcompBloscCodec._bitunshuffle_blocks + + def counted( + src: object, + *, + typesize: int, + blocksize: int, + nblocks: int, + ) -> object: + nonlocal calls + calls += 1 + return original(src, typesize=typesize, blocksize=blocksize, nblocks=nblocks) + + monkeypatch.setattr(NvcompBloscCodec, "_bitunshuffle_blocks", staticmethod(counted)) + + src = np.arange(1024, dtype=np.float32) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(1024,), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="bitshuffle", blocksize=256), + ) + z[:] = src + + with zarr.config.enable_gpu(), zarr.config.set( + {"gpu.blosc_bitshuffle_max_bytes": 512} + ), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:] + + assert calls == src.nbytes // 512 + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) diff --git a/tests/test_config.py b/tests/test_config.py index c3102e8efe..2479437645 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -291,6 +291,18 @@ def test_config_buffer_backwards_compatibility_gpu() -> None: get_ndbuffer_class() +@pytest.mark.gpu +def test_enable_gpu_sets_gpu_codecs() -> None: + with zarr.config.enable_gpu(): + assert config.get("buffer") == "zarr.buffer.gpu.Buffer" + assert config.get("ndbuffer") == "zarr.buffer.gpu.NDBuffer" + assert config.get("codecs.blosc") == "zarr.codecs.gpu.NvcompBloscCodec" + assert config.get("codecs.zstd") == "zarr.codecs.gpu.NvcompZstdCodec" + assert config.get("gpu.blosc_bitshuffle_max_bytes") == 100 * 1024 * 1024 + assert config.get("codec_pipeline.path") == "zarr.core.codec_pipeline.BatchedCodecPipeline" + assert config.get("codec_pipeline.batch_size") == 65536 + + @pytest.mark.filterwarnings("error") def test_warning_on_missing_codec_config() -> None: class NewCodec(BytesCodec):