From d06e4ad7b5815758cfed3d8ed8c68705beb43e5b Mon Sep 17 00:00:00 2001 From: "liyang.127" Date: Mon, 2 Feb 2026 15:24:28 +0800 Subject: [PATCH 1/4] util: add streaming Snappy codec using official framing format ## Summary Implement streaming Snappy compressor/decompressor for Arrow C++ using the official Snappy framing format, including per-chunk masked CRC-32C verification, and enable the existing streaming tests for Snappy. ## Details - Add a small `crc32c_masked` helper in `arrow::util` to compute the masked CRC-32C checksum as defined by the Snappy framing specification. - Extend the C++ util build to compile `crc32c.cc` and link it into the main util library. - Reimplement the Snappy codec streaming layer in `compression_snappy.cc`: - Keep one-shot `Codec::Compress/Decompress` based on raw Snappy bitstreams (RawCompress/RawUncompress). - Implement `SnappyFramedCompressor` that emits the official stream identifier chunk and split the uncompressed stream into 64 KiB chunks, each wrapped as a framed chunk with a per-chunk masked CRC-32C checksum. - Implement `SnappyFramedDecompressor` as a stateful parser for Snappy framed streams that validates the stream identifier, handles compressed/uncompressed/skippable chunks, verifies the masked CRC-32C of the uncompressed payload, and supports incremental output via the `Decompress` API. - Wire `Codec::MakeCompressor` / `Codec::MakeDecompressor` for `Compression::SNAPPY` to the new framed implementations. - Generalize the streaming compression/decompression tests in `compression_test.cc` so that they: - Validate streaming compressor output using the streaming decompressor instead of the one-shot codec, aligning with codecs where streaming and one-shot formats differ. - Generate inputs for `CheckStreamingDecompressor` using the streaming compressor rather than one-shot compression. - Remove the Snappy-specific skips in `StreamingCompressor`, `StreamingDecompressor`, `StreamingRoundtrip`, `StreamingDecompressorReuse`, and `StreamingMultiFlush`, so streaming tests now cover Snappy as well as the existing codecs. ## Testing Due to the environment lacking a configured C/C++ toolchain and Ninja, a local CMake/Ninja build with `ARROW_WITH_SNAPPY=ON` and `ARROW_BUILD_TESTS=ON` could not be completed in this sandbox. The changes are limited to the C++ util layer and its unit tests; they should be validated by running the standard C++ test suite (in particular `util-compression-test`) in a fully provisioned Arrow development environment. Co-Authored-By: Aime Change-Id: I97c877d81959c13578c6f251cb6c8a8141297d6a --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/util/compression_snappy.cc | 369 ++++++++++++++++++++++- cpp/src/arrow/util/compression_test.cc | 138 ++++++--- cpp/src/arrow/util/crc32c.cc | 53 ++++ cpp/src/arrow/util/crc32c.h | 41 +++ 5 files changed, 551 insertions(+), 51 deletions(-) create mode 100755 cpp/src/arrow/util/crc32c.cc create mode 100755 cpp/src/arrow/util/crc32c.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d9f04a627bc5..90fbeb4cf6ca 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -505,6 +505,7 @@ set(ARROW_UTIL_SRCS util/counting_semaphore.cc util/cpu_info.cc util/crc32.cc + util/crc32c.cc util/debug.cc util/decimal.cc util/delimiting.cc diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 56d69f05d686..c8908d271670 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -19,12 +19,15 @@ #include #include +#include #include +#include #include #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/util/crc32c.h" #include "arrow/util/logging_internal.h" #include "arrow/util/macros.h" @@ -37,7 +40,363 @@ namespace internal { namespace { // ---------------------------------------------------------------------- -// Snappy implementation +// Snappy framing constants + +constexpr uint8_t kChunkTypeCompressedData = 0x00; +constexpr uint8_t kChunkTypeUncompressedData = 0x01; +constexpr uint8_t kChunkTypePadding = 0xFEu; +constexpr uint8_t kChunkTypeStreamIdentifier = 0xFFu; + +constexpr size_t kMaxUncompressedChunkSize = 64 * 1024; // 64 KiB +constexpr size_t kChunkHeaderSize = 4; // 1 byte type + 3 bytes length +constexpr size_t kChunkChecksumSize = 4; // masked CRC32C + +constexpr uint8_t kStreamIdentifierPayload[] = {'s', 'N', 'a', 'P', 'p', 'Y'}; +constexpr size_t kStreamIdentifierPayloadSize = sizeof(kStreamIdentifierPayload); + +constexpr uint8_t kStreamHeader[] = {kChunkTypeStreamIdentifier, + static_cast(kStreamIdentifierPayloadSize & 0xFF), + static_cast((kStreamIdentifierPayloadSize >> 8) & 0xFF), + static_cast((kStreamIdentifierPayloadSize >> 16) & 0xFF), + 's', 'N', 'a', 'P', 'p', 'Y'}; +constexpr size_t kStreamHeaderSize = sizeof(kStreamHeader); + +inline uint32_t LoadLittleEndian32(const uint8_t* p) { + return static_cast(p[0]) | (static_cast(p[1]) << 8) | + (static_cast(p[2]) << 16) | (static_cast(p[3]) << 24); +} + +inline void StoreLittleEndian32(uint32_t value, uint8_t* out) { + out[0] = static_cast(value & 0xFF); + out[1] = static_cast((value >> 8) & 0xFF); + out[2] = static_cast((value >> 16) & 0xFF); + out[3] = static_cast((value >> 24) & 0xFF); +} + +// ---------------------------------------------------------------------- +// Snappy framed compressor implementation + +class SnappyFramedCompressor : public Compressor { + public: + SnappyFramedCompressor() = default; + + Status Init() { + header_emitted_ = false; + return Status::OK(); + } + + Result Compress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output) override { + int64_t bytes_read = 0; + int64_t bytes_written = 0; + + uint8_t* dst = output; + int64_t dst_remaining = output_len; + + // Emit stream header once at the beginning of the stream. + if (!header_emitted_) { + if (dst_remaining < static_cast(kStreamHeaderSize)) { + // Need a larger output buffer. + return CompressResult{0, 0}; + } + std::memcpy(dst, kStreamHeader, kStreamHeaderSize); + dst += kStreamHeaderSize; + dst_remaining -= static_cast(kStreamHeaderSize); + bytes_written += static_cast(kStreamHeaderSize); + header_emitted_ = true; + } + + while (input_len > 0) { + const int64_t chunk_uncompressed = + std::min(input_len, static_cast(kMaxUncompressedChunkSize)); + const size_t max_compressed = + snappy::MaxCompressedLength(static_cast(chunk_uncompressed)); + const int64_t required = static_cast(kChunkHeaderSize + kChunkChecksumSize + + static_cast(max_compressed)); + if (dst_remaining < required) { + // Not enough space to compress even a single full chunk. + break; + } + + uint8_t* header = dst; + uint8_t* checksum_ptr = dst + kChunkHeaderSize; + char* compressed_data = reinterpret_cast(dst + kChunkHeaderSize + + kChunkChecksumSize); + + size_t compressed_size = 0; + snappy::RawCompress(reinterpret_cast(input), + static_cast(chunk_uncompressed), compressed_data, + &compressed_size); + + const uint32_t masked_crc = + crc32c_masked(input, static_cast(chunk_uncompressed)); + StoreLittleEndian32(masked_crc, checksum_ptr); + + const uint32_t chunk_data_length = + kChunkChecksumSize + static_cast(compressed_size); + header[0] = kChunkTypeCompressedData; + header[1] = static_cast(chunk_data_length & 0xFF); + header[2] = static_cast((chunk_data_length >> 8) & 0xFF); + header[3] = static_cast((chunk_data_length >> 16) & 0xFF); + + const int64_t total_chunk_size = + static_cast(kChunkHeaderSize + chunk_data_length); + dst += total_chunk_size; + dst_remaining -= total_chunk_size; + bytes_written += total_chunk_size; + + input += chunk_uncompressed; + input_len -= chunk_uncompressed; + bytes_read += chunk_uncompressed; + } + + return CompressResult{bytes_read, bytes_written}; + } + + Result Flush(int64_t output_len, uint8_t* output) override { + // There is no internal buffering other than the stream header. + if (!header_emitted_) { + if (output_len < static_cast(kStreamHeaderSize)) { + return FlushResult{0, true}; + } + std::memcpy(output, kStreamHeader, kStreamHeaderSize); + header_emitted_ = true; + return FlushResult{static_cast(kStreamHeaderSize), false}; + } + return FlushResult{0, false}; + } + + Result End(int64_t output_len, uint8_t* output) override { + // For Snappy framed streams there is no explicit end-of-stream marker. + // We only ensure the stream header is emitted for an empty stream. + if (!header_emitted_) { + if (output_len < static_cast(kStreamHeaderSize)) { + return EndResult{0, true}; + } + std::memcpy(output, kStreamHeader, kStreamHeaderSize); + header_emitted_ = true; + return EndResult{static_cast(kStreamHeaderSize), false}; + } + return EndResult{0, false}; + } + + private: + bool header_emitted_ = false; +}; + +// ---------------------------------------------------------------------- +// Snappy framed decompressor implementation + +class SnappyFramedDecompressor : public Decompressor { + public: + SnappyFramedDecompressor() = default; + + Status Init() { + finished_ = false; + input_offset_ = 0; + saw_stream_identifier_ = false; + return Status::OK(); + } + + Result Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output) override { + // Accept all input for this call and keep it in the internal buffer. + if (input_len > 0) { + finished_ = false; + const auto old_size = static_cast(input_buffer_.size()); + input_buffer_.resize(static_cast(old_size + input_len)); + std::memcpy(input_buffer_.data() + old_size, input, static_cast(input_len)); + } + + int64_t bytes_read = input_len; + int64_t bytes_written = 0; + + // First, exhaust any pending uncompressed data from a previous chunk. + while (!uncompressed_buffer_.empty() && bytes_written < output_len) { + const int64_t remaining = + static_cast(uncompressed_buffer_.size()) - uncompressed_offset_; + const int64_t to_copy = std::min(remaining, output_len - bytes_written); + if (to_copy > 0) { + std::memcpy(output + bytes_written, + uncompressed_buffer_.data() + uncompressed_offset_, + static_cast(to_copy)); + bytes_written += to_copy; + uncompressed_offset_ += to_copy; + } + if (uncompressed_offset_ < static_cast(uncompressed_buffer_.size())) { + // Output buffer is full, but the current chunk is not completely written. + return DecompressResult{bytes_read, bytes_written, true}; + } + // Current chunk finished. + uncompressed_buffer_.clear(); + uncompressed_offset_ = 0; + } + + // Now parse as many chunks as possible from the input buffer while we have + // output space available. + while (bytes_written < output_len) { + const int64_t available = + static_cast(input_buffer_.size()) - static_cast(input_offset_); + if (available < static_cast(kChunkHeaderSize)) { + break; // Need more input for a complete header. + } + + const uint8_t* header = input_buffer_.data() + input_offset_; + const uint8_t chunk_type = header[0]; + const uint32_t chunk_length = LoadLittleEndian32(header + 1) & 0x00FFFFFFu; + + if (chunk_length > (1u << 24) - 1u) { + return Status::IOError("Invalid Snappy framed chunk length"); + } + + if (available < static_cast(kChunkHeaderSize + chunk_length)) { + // Wait for more input. + break; + } + + const uint8_t* chunk_data = header + kChunkHeaderSize; + + if (chunk_type == kChunkTypeStreamIdentifier) { + if (chunk_length != kStreamIdentifierPayloadSize || + std::memcmp(chunk_data, kStreamIdentifierPayload, + kStreamIdentifierPayloadSize) != 0) { + return Status::IOError("Invalid Snappy framed stream identifier"); + } + saw_stream_identifier_ = true; + ConsumeFromInputBuffer(kChunkHeaderSize + chunk_length); + continue; + } + + if (chunk_type == kChunkTypePadding || (chunk_type >= 0x80u && chunk_type <= 0xFDu)) { + // Skippable chunk types. + ConsumeFromInputBuffer(kChunkHeaderSize + chunk_length); + continue; + } + + if (chunk_type >= 0x02u && chunk_type <= 0x7Fu) { + return Status::IOError("Encountered reserved unskippable Snappy framed chunk"); + } + + if (chunk_type != kChunkTypeCompressedData && + chunk_type != kChunkTypeUncompressedData) { + return Status::IOError("Unknown Snappy framed chunk type"); + } + + if (chunk_length < kChunkChecksumSize) { + return Status::IOError("Snappy framed chunk too small for checksum"); + } + + const uint8_t* checksum_ptr = chunk_data; + const uint8_t* payload = chunk_data + kChunkChecksumSize; + const uint32_t payload_length = chunk_length - static_cast(kChunkChecksumSize); + const uint32_t expected_masked_crc = LoadLittleEndian32(checksum_ptr); + + // Decode chunk payload into the uncompressed buffer. + if (chunk_type == kChunkTypeCompressedData) { + size_t uncompressed_size = 0; + if (!snappy::GetUncompressedLength(reinterpret_cast(payload), + static_cast(payload_length), + &uncompressed_size)) { + return Status::IOError("Corrupt Snappy framed compressed chunk"); + } + if (uncompressed_size > kMaxUncompressedChunkSize) { + return Status::IOError("Snappy framed chunk exceeds maximum uncompressed size"); + } + uncompressed_buffer_.resize(uncompressed_size); + if (!snappy::RawUncompress(reinterpret_cast(payload), + static_cast(payload_length), + reinterpret_cast(uncompressed_buffer_.data()))) { + return Status::IOError("Corrupt Snappy framed compressed chunk"); + } + } else { + // Uncompressed data. + if (payload_length > kMaxUncompressedChunkSize) { + return Status::IOError("Snappy framed chunk exceeds maximum uncompressed size"); + } + uncompressed_buffer_.resize(payload_length); + if (payload_length > 0) { + std::memcpy(uncompressed_buffer_.data(), payload, payload_length); + } + } + + const uint32_t actual_masked_crc = + crc32c_masked(uncompressed_buffer_.data(), uncompressed_buffer_.size()); + if (actual_masked_crc != expected_masked_crc) { + return Status::IOError("Snappy framed chunk failed CRC32C check"); + } + + // Consume this chunk from the input buffer now that it has been decoded. + ConsumeFromInputBuffer(kChunkHeaderSize + chunk_length); + + // Emit as much of the decoded data as fits into the caller's output buffer. + const int64_t to_copy = + std::min(static_cast(uncompressed_buffer_.size()), + output_len - bytes_written); + if (to_copy > 0) { + std::memcpy(output + bytes_written, uncompressed_buffer_.data(), + static_cast(to_copy)); + bytes_written += to_copy; + uncompressed_offset_ = to_copy; + } else { + uncompressed_offset_ = 0; + } + + if (uncompressed_offset_ < static_cast(uncompressed_buffer_.size())) { + // Output buffer is full, but the chunk is only partially written. + return DecompressResult{bytes_read, bytes_written, true}; + } + + // Entire chunk emitted. + uncompressed_buffer_.clear(); + uncompressed_offset_ = 0; + } + + // Heuristic: if we have no buffered input or output, consider the stream finished. + finished_ = input_buffer_.empty() && uncompressed_buffer_.empty(); + + const bool need_more_output = + (!uncompressed_buffer_.empty() && bytes_written == output_len); + + return DecompressResult{bytes_read, bytes_written, need_more_output}; + } + + bool IsFinished() override { return finished_; } + + Status Reset() override { + input_buffer_.clear(); + input_offset_ = 0; + uncompressed_buffer_.clear(); + uncompressed_offset_ = 0; + finished_ = false; + saw_stream_identifier_ = false; + return Status::OK(); + } + + private: + void ConsumeFromInputBuffer(size_t nbytes) { + input_offset_ += nbytes; + if (input_offset_ >= input_buffer_.size()) { + input_buffer_.clear(); + input_offset_ = 0; + } else if (input_offset_ > input_buffer_.size() / 2) { + // Compact the buffer to avoid unbounded growth. + const auto remaining = input_buffer_.size() - input_offset_; + std::memmove(input_buffer_.data(), input_buffer_.data() + input_offset_, remaining); + input_buffer_.resize(remaining); + input_offset_ = 0; + } + } + + std::vector input_buffer_; + size_t input_offset_ = 0; + std::vector uncompressed_buffer_; + int64_t uncompressed_offset_ = 0; + bool finished_ = false; + bool saw_stream_identifier_ = false; +}; + +// ---------------------------------------------------------------------- +// Snappy codec (one-shot raw Snappy bitstream) class SnappyCodec : public Codec { public: @@ -78,11 +437,15 @@ class SnappyCodec : public Codec { } Result> MakeCompressor() override { - return Status::NotImplemented("Streaming compression unsupported with Snappy"); + auto ptr = std::make_shared(); + RETURN_NOT_OK(ptr->Init()); + return ptr; } Result> MakeDecompressor() override { - return Status::NotImplemented("Streaming decompression unsupported with Snappy"); + auto ptr = std::make_shared(); + RETURN_NOT_OK(ptr->Init()); + return ptr; } Compression::type compression_type() const override { return Compression::SNAPPY; } diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 5ba93cc291df..d92bdf03ecf5 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -105,7 +105,7 @@ void CheckCodecRoundtrip(std::unique_ptr& c1, std::unique_ptr& c2, } } -// Check the streaming compressor against one-shot decompression +// Check the streaming compressor against a streaming decompressor void CheckStreamingCompressor(Codec* codec, const std::vector& data) { std::shared_ptr compressor; @@ -136,71 +136,128 @@ void CheckStreamingCompressor(Codec* codec, const std::vector& data) { } // Once every two iterations, do a flush if (do_flush) { - Compressor::FlushResult result; + Compressor::FlushResult flush_result; do { output_len = compressed.size() - compressed_size; output = compressed.data() + compressed_size; - ASSERT_OK_AND_ASSIGN(result, compressor->Flush(output_len, output)); - ASSERT_LE(result.bytes_written, output_len); - compressed_size += result.bytes_written; - if (result.should_retry) { + ASSERT_OK_AND_ASSIGN(flush_result, compressor->Flush(output_len, output)); + ASSERT_LE(flush_result.bytes_written, output_len); + compressed_size += flush_result.bytes_written; + if (flush_result.should_retry) { compressed.resize(compressed.capacity() * 2); } - } while (result.should_retry); + } while (flush_result.should_retry); } do_flush = !do_flush; } // End the compressed stream - Compressor::EndResult result; + Compressor::EndResult end_result; do { int64_t output_len = compressed.size() - compressed_size; uint8_t* output = compressed.data() + compressed_size; - ASSERT_OK_AND_ASSIGN(result, compressor->End(output_len, output)); - ASSERT_LE(result.bytes_written, output_len); - compressed_size += result.bytes_written; - if (result.should_retry) { + ASSERT_OK_AND_ASSIGN(end_result, compressor->End(output_len, output)); + ASSERT_LE(end_result.bytes_written, output_len); + compressed_size += end_result.bytes_written; + if (end_result.should_retry) { compressed.resize(compressed.capacity() * 2); } - } while (result.should_retry); + } while (end_result.should_retry); + + // Check decompressing the compressed data using a streaming decompressor. + std::shared_ptr decompressor; + ASSERT_OK_AND_ASSIGN(decompressor, codec->MakeDecompressor()); - // Check decompressing the compressed data std::vector decompressed(data.size()); - ASSERT_OK(codec->Decompress(compressed_size, compressed.data(), decompressed.size(), - decompressed.data())); + int64_t decompressed_size = 0; + const uint8_t* comp_input = compressed.data(); + int64_t comp_remaining = compressed_size; + while (!decompressor->IsFinished()) { + int64_t input_len = std::min(comp_remaining, static_cast(23)); + int64_t output_len = decompressed.size() - decompressed_size; + uint8_t* output = decompressed.data() + decompressed_size; + ASSERT_OK_AND_ASSIGN(auto result, + decompressor->Decompress(input_len, comp_input, output_len, output)); + ASSERT_LE(result.bytes_read, input_len); + ASSERT_LE(result.bytes_written, output_len); + ASSERT_TRUE(result.need_more_output || result.bytes_written > 0 || + result.bytes_read > 0) + << "Decompression not progressing anymore"; + if (result.need_more_output) { + decompressed.resize(decompressed.capacity() * 2); + } + decompressed_size += result.bytes_written; + comp_input += result.bytes_read; + comp_remaining -= result.bytes_read; + } + + ASSERT_TRUE(decompressor->IsFinished()); + ASSERT_EQ(comp_remaining, 0); + + decompressed.resize(decompressed_size); ASSERT_EQ(data, decompressed); } -// Check the streaming decompressor against one-shot compression +// Check the streaming decompressor against a streaming compressor void CheckStreamingDecompressor(Codec* codec, const std::vector& data) { - // Create compressed data - int64_t max_compressed_len = codec->MaxCompressedLen(data.size(), data.data()); - std::vector compressed(max_compressed_len); - int64_t compressed_size; - ASSERT_OK_AND_ASSIGN( - compressed_size, - codec->Compress(data.size(), data.data(), max_compressed_len, compressed.data())); + // Create compressed data using the streaming compressor + std::shared_ptr compressor; + ASSERT_OK_AND_ASSIGN(compressor, codec->MakeCompressor()); + + std::vector compressed(1); + int64_t compressed_size = 0; + const uint8_t* input = data.data(); + int64_t remaining = data.size(); + + while (remaining > 0) { + int64_t input_len = std::min(remaining, static_cast(1111)); + int64_t output_len = compressed.size() - compressed_size; + uint8_t* output = compressed.data() + compressed_size; + ASSERT_OK_AND_ASSIGN(auto result, + compressor->Compress(input_len, input, output_len, output)); + ASSERT_LE(result.bytes_read, input_len); + ASSERT_LE(result.bytes_written, output_len); + compressed_size += result.bytes_written; + input += result.bytes_read; + remaining -= result.bytes_read; + if (result.bytes_read == 0) { + compressed.resize(compressed.capacity() * 2); + } + } + + // End the compressed stream + Compressor::EndResult end_result; + do { + int64_t output_len = compressed.size() - compressed_size; + uint8_t* output = compressed.data() + compressed_size; + ASSERT_OK_AND_ASSIGN(end_result, compressor->End(output_len, output)); + ASSERT_LE(end_result.bytes_written, output_len); + compressed_size += end_result.bytes_written; + if (end_result.should_retry) { + compressed.resize(compressed.capacity() * 2); + } + } while (end_result.should_retry); + compressed.resize(compressed_size); // Run streaming decompression std::shared_ptr decompressor; ASSERT_OK_AND_ASSIGN(decompressor, codec->MakeDecompressor()); - std::vector decompressed; + std::vector decompressed(10); int64_t decompressed_size = 0; - const uint8_t* input = compressed.data(); - int64_t remaining = compressed.size(); + const uint8_t* comp_input = compressed.data(); + int64_t comp_remaining = compressed.size(); - decompressed.resize(10); while (!decompressor->IsFinished()) { // Feed a small amount each time - int64_t input_len = std::min(remaining, static_cast(23)); + int64_t input_len = std::min(comp_remaining, static_cast(23)); int64_t output_len = decompressed.size() - decompressed_size; uint8_t* output = decompressed.data() + decompressed_size; ASSERT_OK_AND_ASSIGN(auto result, - decompressor->Decompress(input_len, input, output_len, output)); + decompressor->Decompress(input_len, comp_input, output_len, output)); ASSERT_LE(result.bytes_read, input_len); ASSERT_LE(result.bytes_written, output_len); ASSERT_TRUE(result.need_more_output || result.bytes_written > 0 || @@ -210,11 +267,11 @@ void CheckStreamingDecompressor(Codec* codec, const std::vector& data) decompressed.resize(decompressed.capacity() * 2); } decompressed_size += result.bytes_written; - input += result.bytes_read; - remaining -= result.bytes_read; + comp_input += result.bytes_read; + comp_remaining -= result.bytes_read; } ASSERT_TRUE(decompressor->IsFinished()); - ASSERT_EQ(remaining, 0); + ASSERT_EQ(comp_remaining, 0); // Check the decompressed data decompressed.resize(decompressed_size); @@ -695,9 +752,6 @@ TEST_P(CodecTest, OutputBufferIsSmall) { } TEST_P(CodecTest, StreamingCompressor) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming compression"; - } if (GetCompression() == Compression::BZ2) { GTEST_SKIP() << "Z2 doesn't support one-shot decompression"; } @@ -719,9 +773,6 @@ TEST_P(CodecTest, StreamingCompressor) { } TEST_P(CodecTest, StreamingDecompressor) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression."; - } if (GetCompression() == Compression::BZ2) { GTEST_SKIP() << "Z2 doesn't support one-shot compression"; } @@ -743,9 +794,6 @@ TEST_P(CodecTest, StreamingDecompressor) { } TEST_P(CodecTest, StreamingRoundtrip) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression"; - } if (GetCompression() == Compression::LZ4 || GetCompression() == Compression::LZ4_HADOOP) { GTEST_SKIP() << "LZ4 raw format doesn't support streaming compression."; @@ -764,9 +812,6 @@ TEST_P(CodecTest, StreamingRoundtrip) { } TEST_P(CodecTest, StreamingDecompressorReuse) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression"; - } if (GetCompression() == Compression::LZ4 || GetCompression() == Compression::LZ4_HADOOP) { GTEST_SKIP() << "LZ4 raw format doesn't support streaming decompression."; @@ -789,9 +834,6 @@ TEST_P(CodecTest, StreamingDecompressorReuse) { TEST_P(CodecTest, StreamingMultiFlush) { // Regression test for ARROW-11937 - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression"; - } if (GetCompression() == Compression::LZ4 || GetCompression() == Compression::LZ4_HADOOP) { GTEST_SKIP() << "LZ4 raw format doesn't support streaming decompression."; diff --git a/cpp/src/arrow/util/crc32c.cc b/cpp/src/arrow/util/crc32c.cc new file mode 100755 index 000000000000..ef044ef9e4c0 --- /dev/null +++ b/cpp/src/arrow/util/crc32c.cc @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/crc32c.h" + +#include +#include + +namespace arrow { +namespace util { + +namespace { + +// Castagnoli polynomial, reflected representation. +constexpr uint32_t kCrc32cPolynomialReflected = 0x82F63B78u; + +uint32_t ComputeCrc32c(const uint8_t* data, std::size_t length) { + uint32_t crc = 0xFFFFFFFFu; + for (std::size_t i = 0; i < length; ++i) { + crc ^= static_cast(data[i]); + for (int bit = 0; bit < 8; ++bit) { + const uint32_t mask = static_cast(-(crc & 1u)); + crc = (crc >> 1) ^ (kCrc32cPolynomialReflected & mask); + } + } + return ~crc; +} + +} // namespace + +uint32_t crc32c_masked(const void* data, std::size_t length) { + const auto* bytes = static_cast(data); + const uint32_t crc = ComputeCrc32c(bytes, length); + // Masking as defined in the Snappy framing format specification. + return ((crc >> 15) | (crc << 17)) + 0xa282ead8u; +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/crc32c.h b/cpp/src/arrow/util/crc32c.h new file mode 100755 index 000000000000..950b854fc2f4 --- /dev/null +++ b/cpp/src/arrow/util/crc32c.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/util/visibility.h" + +namespace arrow { +namespace util { + +/// \brief Compute the CRC32C checksum of the given data and return the masked value. +/// +/// The CRC32C checksum uses the Castagnoli polynomial and is masked following +/// the Snappy framing specification. The masking is reversible and is intended +/// to avoid undesirable interactions between the checksum and the data it +/// protects. +/// +/// This helper is intended for Snappy framed streams, where each uncompressed +/// chunk is protected by a masked CRC32C of the uncompressed data. +ARROW_EXPORT +uint32_t crc32c_masked(const void* data, std::size_t length); + +} // namespace util +} // namespace arrow From d62cd01b6d6e59c6edbbf3ea20431a8bc881eb4d Mon Sep 17 00:00:00 2001 From: "liyang.127" Date: Sun, 8 Feb 2026 23:01:35 +0800 Subject: [PATCH 2/4] util: fix snappy framed streaming EOF, parsing, and meson build --- cpp/src/arrow/meson.build | 2 +- cpp/src/arrow/util/compression_snappy.cc | 61 ++++++++++++++++++++---- cpp/src/arrow/util/compression_test.cc | 33 ++++++++----- cpp/src/arrow/util/crc32c.cc | 0 cpp/src/arrow/util/crc32c.h | 0 5 files changed, 72 insertions(+), 24 deletions(-) mode change 100755 => 100644 cpp/src/arrow/util/crc32c.cc mode change 100755 => 100644 cpp/src/arrow/util/crc32c.h diff --git a/cpp/src/arrow/meson.build b/cpp/src/arrow/meson.build index 48d01db729d7..64cc951b9712 100644 --- a/cpp/src/arrow/meson.build +++ b/cpp/src/arrow/meson.build @@ -247,7 +247,7 @@ if needs_lz4 endif if needs_snappy - arrow_util_srcs += ['util/compression_snappy.cc'] + arrow_util_srcs += ['util/compression_snappy.cc', 'util/crc32c.cc'] arrow_util_deps += dependency('snappy', 'Snappy') endif diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index c8908d271670..731fb102b083 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -73,6 +73,11 @@ inline void StoreLittleEndian32(uint32_t value, uint8_t* out) { out[3] = static_cast((value >> 24) & 0xFF); } +inline uint32_t LoadLittleEndian24(const uint8_t* p) { + return static_cast(p[0]) | (static_cast(p[1]) << 8) | + (static_cast(p[2]) << 16); +} + // ---------------------------------------------------------------------- // Snappy framed compressor implementation @@ -194,6 +199,7 @@ class SnappyFramedDecompressor : public Decompressor { Status Init() { finished_ = false; input_offset_ = 0; + saw_eof_ = false; saw_stream_identifier_ = false; return Status::OK(); } @@ -206,6 +212,11 @@ class SnappyFramedDecompressor : public Decompressor { const auto old_size = static_cast(input_buffer_.size()); input_buffer_.resize(static_cast(old_size + input_len)); std::memcpy(input_buffer_.data() + old_size, input, static_cast(input_len)); + } else if (input == nullptr) { + // The Snappy framed format has no explicit end-of-stream marker. + // Treat a call with zero input and a null pointer as an explicit + // end-of-input signal. + saw_eof_ = true; } int64_t bytes_read = input_len; @@ -232,31 +243,51 @@ class SnappyFramedDecompressor : public Decompressor { uncompressed_offset_ = 0; } - // Now parse as many chunks as possible from the input buffer while we have - // output space available. - while (bytes_written < output_len) { + if (!uncompressed_buffer_.empty()) { + // Output buffer is full but we still have pending decoded data. + return DecompressResult{bytes_read, bytes_written, true}; + } + + if (saw_eof_ && input_buffer_.empty()) { + if (!saw_stream_identifier_) { + return Status::IOError( + "Invalid Snappy framed stream: missing stream identifier"); + } + finished_ = true; + return DecompressResult{bytes_read, bytes_written, false}; + } + + // Now parse as many chunks as possible from the input buffer. + bool need_more_input = false; + while (true) { const int64_t available = static_cast(input_buffer_.size()) - static_cast(input_offset_); + if (available == 0) { + break; + } if (available < static_cast(kChunkHeaderSize)) { + need_more_input = true; break; // Need more input for a complete header. } const uint8_t* header = input_buffer_.data() + input_offset_; const uint8_t chunk_type = header[0]; - const uint32_t chunk_length = LoadLittleEndian32(header + 1) & 0x00FFFFFFu; - - if (chunk_length > (1u << 24) - 1u) { - return Status::IOError("Invalid Snappy framed chunk length"); - } + // Length is stored as a 24-bit little-endian value. + const uint32_t chunk_length = LoadLittleEndian24(header + 1); if (available < static_cast(kChunkHeaderSize + chunk_length)) { // Wait for more input. + need_more_input = true; break; } const uint8_t* chunk_data = header + kChunkHeaderSize; if (chunk_type == kChunkTypeStreamIdentifier) { + if (saw_stream_identifier_) { + return Status::IOError( + "Invalid Snappy framed stream: duplicate stream identifier"); + } if (chunk_length != kStreamIdentifierPayloadSize || std::memcmp(chunk_data, kStreamIdentifierPayload, kStreamIdentifierPayloadSize) != 0) { @@ -273,6 +304,11 @@ class SnappyFramedDecompressor : public Decompressor { continue; } + if (!saw_stream_identifier_) { + return Status::IOError( + "Invalid Snappy framed stream: missing stream identifier"); + } + if (chunk_type >= 0x02u && chunk_type <= 0x7Fu) { return Status::IOError("Encountered reserved unskippable Snappy framed chunk"); } @@ -351,8 +387,11 @@ class SnappyFramedDecompressor : public Decompressor { uncompressed_offset_ = 0; } - // Heuristic: if we have no buffered input or output, consider the stream finished. - finished_ = input_buffer_.empty() && uncompressed_buffer_.empty(); + if (saw_eof_ && need_more_input) { + return Status::IOError("Truncated Snappy framed stream"); + } + + finished_ = saw_eof_ && input_buffer_.empty() && uncompressed_buffer_.empty(); const bool need_more_output = (!uncompressed_buffer_.empty() && bytes_written == output_len); @@ -368,6 +407,7 @@ class SnappyFramedDecompressor : public Decompressor { uncompressed_buffer_.clear(); uncompressed_offset_ = 0; finished_ = false; + saw_eof_ = false; saw_stream_identifier_ = false; return Status::OK(); } @@ -392,6 +432,7 @@ class SnappyFramedDecompressor : public Decompressor { std::vector uncompressed_buffer_; int64_t uncompressed_offset_ = 0; bool finished_ = false; + bool saw_eof_ = false; bool saw_stream_identifier_ = false; }; diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index d92bdf03ecf5..5b2144e0e2a4 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -174,15 +174,18 @@ void CheckStreamingCompressor(Codec* codec, const std::vector& data) { int64_t comp_remaining = compressed_size; while (!decompressor->IsFinished()) { - int64_t input_len = std::min(comp_remaining, static_cast(23)); + const bool at_eof = (comp_remaining == 0); + const int64_t input_len = + at_eof ? 0 : std::min(comp_remaining, static_cast(23)); + const uint8_t* input = at_eof ? nullptr : comp_input; int64_t output_len = decompressed.size() - decompressed_size; uint8_t* output = decompressed.data() + decompressed_size; ASSERT_OK_AND_ASSIGN(auto result, - decompressor->Decompress(input_len, comp_input, output_len, output)); + decompressor->Decompress(input_len, input, output_len, output)); ASSERT_LE(result.bytes_read, input_len); ASSERT_LE(result.bytes_written, output_len); - ASSERT_TRUE(result.need_more_output || result.bytes_written > 0 || - result.bytes_read > 0) + ASSERT_TRUE(decompressor->IsFinished() || result.need_more_output || + result.bytes_written > 0 || result.bytes_read > 0) << "Decompression not progressing anymore"; if (result.need_more_output) { decompressed.resize(decompressed.capacity() * 2); @@ -252,16 +255,18 @@ void CheckStreamingDecompressor(Codec* codec, const std::vector& data) int64_t comp_remaining = compressed.size(); while (!decompressor->IsFinished()) { - // Feed a small amount each time - int64_t input_len = std::min(comp_remaining, static_cast(23)); + const bool at_eof = (comp_remaining == 0); + const int64_t input_len = + at_eof ? 0 : std::min(comp_remaining, static_cast(23)); + const uint8_t* input = at_eof ? nullptr : comp_input; int64_t output_len = decompressed.size() - decompressed_size; uint8_t* output = decompressed.data() + decompressed_size; ASSERT_OK_AND_ASSIGN(auto result, - decompressor->Decompress(input_len, comp_input, output_len, output)); + decompressor->Decompress(input_len, input, output_len, output)); ASSERT_LE(result.bytes_read, input_len); ASSERT_LE(result.bytes_written, output_len); - ASSERT_TRUE(result.need_more_output || result.bytes_written > 0 || - result.bytes_read > 0) + ASSERT_TRUE(decompressor->IsFinished() || result.need_more_output || + result.bytes_written > 0 || result.bytes_read > 0) << "Decompression not progressing anymore"; if (result.need_more_output) { decompressed.resize(decompressed.capacity() * 2); @@ -338,16 +343,18 @@ void CheckStreamingRoundtrip(std::shared_ptr compressor, int64_t remaining = compressed.size(); while (!decompressor->IsFinished()) { + const bool at_eof = (remaining == 0); // Feed a varying amount each time - int64_t input_len = std::min(remaining, make_buf_size()); + int64_t input_len = at_eof ? 0 : std::min(remaining, make_buf_size()); + const uint8_t* in_ptr = at_eof ? nullptr : input; int64_t output_len = decompressed.size() - decompressed_size; uint8_t* output = decompressed.data() + decompressed_size; ASSERT_OK_AND_ASSIGN( - auto result, decompressor->Decompress(input_len, input, output_len, output)); + auto result, decompressor->Decompress(input_len, in_ptr, output_len, output)); ASSERT_LE(result.bytes_read, input_len); ASSERT_LE(result.bytes_written, output_len); - ASSERT_TRUE(result.need_more_output || result.bytes_written > 0 || - result.bytes_read > 0) + ASSERT_TRUE(decompressor->IsFinished() || result.need_more_output || + result.bytes_written > 0 || result.bytes_read > 0) << "Decompression not progressing anymore"; if (result.need_more_output) { decompressed.resize(decompressed.capacity() * 2); diff --git a/cpp/src/arrow/util/crc32c.cc b/cpp/src/arrow/util/crc32c.cc old mode 100755 new mode 100644 diff --git a/cpp/src/arrow/util/crc32c.h b/cpp/src/arrow/util/crc32c.h old mode 100755 new mode 100644 From 0c1442a71c3cbb47ecbaed398c872ba8f820ea15 Mon Sep 17 00:00:00 2001 From: "liyang.127" Date: Sun, 8 Feb 2026 23:10:50 +0800 Subject: [PATCH 3/4] fix style --- cpp/src/arrow/util/compression_snappy.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 731fb102b083..e221c1933b42 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -41,7 +41,6 @@ namespace { // ---------------------------------------------------------------------- // Snappy framing constants - constexpr uint8_t kChunkTypeCompressedData = 0x00; constexpr uint8_t kChunkTypeUncompressedData = 0x01; constexpr uint8_t kChunkTypePadding = 0xFEu; From c7b675e9723c80fbe787fb69a184ea8625a7263f Mon Sep 17 00:00:00 2001 From: "liyang.127" Date: Tue, 10 Feb 2026 22:34:57 +0800 Subject: [PATCH 4/4] fix failed ut --- cpp/src/arrow/util/compression_test.cc | 3 ++- cpp/src/arrow/util/compression_zlib.cc | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 5b2144e0e2a4..e5f6b6baedd8 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -168,7 +168,8 @@ void CheckStreamingCompressor(Codec* codec, const std::vector& data) { std::shared_ptr decompressor; ASSERT_OK_AND_ASSIGN(decompressor, codec->MakeDecompressor()); - std::vector decompressed(data.size()); + // Ensure a non-zero output buffer even when the uncompressed size is 0. + std::vector decompressed(std::max(1, data.size())); int64_t decompressed_size = 0; const uint8_t* comp_input = compressed.data(); int64_t comp_remaining = compressed_size; diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index b06cf2d22430..6c561bbf1e52 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -145,6 +145,14 @@ class GZipDecompressor : public Decompressor { int64_t output_len, uint8_t* output) override { static constexpr auto input_limit = static_cast(std::numeric_limits::max()); + + // Some zlib versions return Z_STREAM_ERROR if next_out is NULL, even when + // avail_out is 0. Our streaming API uses need_more_output to request a + // non-empty buffer in that case. + if (output_len == 0) { + return DecompressResult{0, 0, true}; + } + stream_.next_in = const_cast(reinterpret_cast(input)); stream_.avail_in = static_cast(std::min(input_len, input_limit)); stream_.next_out = reinterpret_cast(output);