diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d9f04a627bc5..90fbeb4cf6ca 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -505,6 +505,7 @@ set(ARROW_UTIL_SRCS util/counting_semaphore.cc util/cpu_info.cc util/crc32.cc + util/crc32c.cc util/debug.cc util/decimal.cc util/delimiting.cc diff --git a/cpp/src/arrow/meson.build b/cpp/src/arrow/meson.build index 48d01db729d7..64cc951b9712 100644 --- a/cpp/src/arrow/meson.build +++ b/cpp/src/arrow/meson.build @@ -247,7 +247,7 @@ if needs_lz4 endif if needs_snappy - arrow_util_srcs += ['util/compression_snappy.cc'] + arrow_util_srcs += ['util/compression_snappy.cc', 'util/crc32c.cc'] arrow_util_deps += dependency('snappy', 'Snappy') endif diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 56d69f05d686..e221c1933b42 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -19,12 +19,15 @@ #include #include +#include #include +#include #include #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/util/crc32c.h" #include "arrow/util/logging_internal.h" #include "arrow/util/macros.h" @@ -37,7 +40,403 @@ namespace internal { namespace { // ---------------------------------------------------------------------- -// Snappy implementation +// Snappy framing constants +constexpr uint8_t kChunkTypeCompressedData = 0x00; +constexpr uint8_t kChunkTypeUncompressedData = 0x01; +constexpr uint8_t kChunkTypePadding = 0xFEu; +constexpr uint8_t kChunkTypeStreamIdentifier = 0xFFu; + +constexpr size_t kMaxUncompressedChunkSize = 64 * 1024; // 64 KiB +constexpr size_t kChunkHeaderSize = 4; // 1 byte type + 3 bytes length +constexpr size_t kChunkChecksumSize = 4; // masked CRC32C + +constexpr uint8_t kStreamIdentifierPayload[] = {'s', 'N', 'a', 'P', 'p', 'Y'}; +constexpr size_t kStreamIdentifierPayloadSize = sizeof(kStreamIdentifierPayload); + +constexpr uint8_t kStreamHeader[] = {kChunkTypeStreamIdentifier, + static_cast(kStreamIdentifierPayloadSize & 0xFF), + static_cast((kStreamIdentifierPayloadSize >> 8) & 0xFF), + static_cast((kStreamIdentifierPayloadSize >> 16) & 0xFF), + 's', 'N', 'a', 'P', 'p', 'Y'}; +constexpr size_t kStreamHeaderSize = sizeof(kStreamHeader); + +inline uint32_t LoadLittleEndian32(const uint8_t* p) { + return static_cast(p[0]) | (static_cast(p[1]) << 8) | + (static_cast(p[2]) << 16) | (static_cast(p[3]) << 24); +} + +inline void StoreLittleEndian32(uint32_t value, uint8_t* out) { + out[0] = static_cast(value & 0xFF); + out[1] = static_cast((value >> 8) & 0xFF); + out[2] = static_cast((value >> 16) & 0xFF); + out[3] = static_cast((value >> 24) & 0xFF); +} + +inline uint32_t LoadLittleEndian24(const uint8_t* p) { + return static_cast(p[0]) | (static_cast(p[1]) << 8) | + (static_cast(p[2]) << 16); +} + +// ---------------------------------------------------------------------- +// Snappy framed compressor implementation + +class SnappyFramedCompressor : public Compressor { + public: + SnappyFramedCompressor() = default; + + Status Init() { + header_emitted_ = false; + return Status::OK(); + } + + Result Compress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output) override { + int64_t bytes_read = 0; + int64_t bytes_written = 0; + + uint8_t* dst = output; + int64_t dst_remaining = output_len; + + // Emit stream header once at the beginning of the stream. + if (!header_emitted_) { + if (dst_remaining < static_cast(kStreamHeaderSize)) { + // Need a larger output buffer. + return CompressResult{0, 0}; + } + std::memcpy(dst, kStreamHeader, kStreamHeaderSize); + dst += kStreamHeaderSize; + dst_remaining -= static_cast(kStreamHeaderSize); + bytes_written += static_cast(kStreamHeaderSize); + header_emitted_ = true; + } + + while (input_len > 0) { + const int64_t chunk_uncompressed = + std::min(input_len, static_cast(kMaxUncompressedChunkSize)); + const size_t max_compressed = + snappy::MaxCompressedLength(static_cast(chunk_uncompressed)); + const int64_t required = static_cast(kChunkHeaderSize + kChunkChecksumSize + + static_cast(max_compressed)); + if (dst_remaining < required) { + // Not enough space to compress even a single full chunk. + break; + } + + uint8_t* header = dst; + uint8_t* checksum_ptr = dst + kChunkHeaderSize; + char* compressed_data = reinterpret_cast(dst + kChunkHeaderSize + + kChunkChecksumSize); + + size_t compressed_size = 0; + snappy::RawCompress(reinterpret_cast(input), + static_cast(chunk_uncompressed), compressed_data, + &compressed_size); + + const uint32_t masked_crc = + crc32c_masked(input, static_cast(chunk_uncompressed)); + StoreLittleEndian32(masked_crc, checksum_ptr); + + const uint32_t chunk_data_length = + kChunkChecksumSize + static_cast(compressed_size); + header[0] = kChunkTypeCompressedData; + header[1] = static_cast(chunk_data_length & 0xFF); + header[2] = static_cast((chunk_data_length >> 8) & 0xFF); + header[3] = static_cast((chunk_data_length >> 16) & 0xFF); + + const int64_t total_chunk_size = + static_cast(kChunkHeaderSize + chunk_data_length); + dst += total_chunk_size; + dst_remaining -= total_chunk_size; + bytes_written += total_chunk_size; + + input += chunk_uncompressed; + input_len -= chunk_uncompressed; + bytes_read += chunk_uncompressed; + } + + return CompressResult{bytes_read, bytes_written}; + } + + Result Flush(int64_t output_len, uint8_t* output) override { + // There is no internal buffering other than the stream header. + if (!header_emitted_) { + if (output_len < static_cast(kStreamHeaderSize)) { + return FlushResult{0, true}; + } + std::memcpy(output, kStreamHeader, kStreamHeaderSize); + header_emitted_ = true; + return FlushResult{static_cast(kStreamHeaderSize), false}; + } + return FlushResult{0, false}; + } + + Result End(int64_t output_len, uint8_t* output) override { + // For Snappy framed streams there is no explicit end-of-stream marker. + // We only ensure the stream header is emitted for an empty stream. + if (!header_emitted_) { + if (output_len < static_cast(kStreamHeaderSize)) { + return EndResult{0, true}; + } + std::memcpy(output, kStreamHeader, kStreamHeaderSize); + header_emitted_ = true; + return EndResult{static_cast(kStreamHeaderSize), false}; + } + return EndResult{0, false}; + } + + private: + bool header_emitted_ = false; +}; + +// ---------------------------------------------------------------------- +// Snappy framed decompressor implementation + +class SnappyFramedDecompressor : public Decompressor { + public: + SnappyFramedDecompressor() = default; + + Status Init() { + finished_ = false; + input_offset_ = 0; + saw_eof_ = false; + saw_stream_identifier_ = false; + return Status::OK(); + } + + Result Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output) override { + // Accept all input for this call and keep it in the internal buffer. + if (input_len > 0) { + finished_ = false; + const auto old_size = static_cast(input_buffer_.size()); + input_buffer_.resize(static_cast(old_size + input_len)); + std::memcpy(input_buffer_.data() + old_size, input, static_cast(input_len)); + } else if (input == nullptr) { + // The Snappy framed format has no explicit end-of-stream marker. + // Treat a call with zero input and a null pointer as an explicit + // end-of-input signal. + saw_eof_ = true; + } + + int64_t bytes_read = input_len; + int64_t bytes_written = 0; + + // First, exhaust any pending uncompressed data from a previous chunk. + while (!uncompressed_buffer_.empty() && bytes_written < output_len) { + const int64_t remaining = + static_cast(uncompressed_buffer_.size()) - uncompressed_offset_; + const int64_t to_copy = std::min(remaining, output_len - bytes_written); + if (to_copy > 0) { + std::memcpy(output + bytes_written, + uncompressed_buffer_.data() + uncompressed_offset_, + static_cast(to_copy)); + bytes_written += to_copy; + uncompressed_offset_ += to_copy; + } + if (uncompressed_offset_ < static_cast(uncompressed_buffer_.size())) { + // Output buffer is full, but the current chunk is not completely written. + return DecompressResult{bytes_read, bytes_written, true}; + } + // Current chunk finished. + uncompressed_buffer_.clear(); + uncompressed_offset_ = 0; + } + + if (!uncompressed_buffer_.empty()) { + // Output buffer is full but we still have pending decoded data. + return DecompressResult{bytes_read, bytes_written, true}; + } + + if (saw_eof_ && input_buffer_.empty()) { + if (!saw_stream_identifier_) { + return Status::IOError( + "Invalid Snappy framed stream: missing stream identifier"); + } + finished_ = true; + return DecompressResult{bytes_read, bytes_written, false}; + } + + // Now parse as many chunks as possible from the input buffer. + bool need_more_input = false; + while (true) { + const int64_t available = + static_cast(input_buffer_.size()) - static_cast(input_offset_); + if (available == 0) { + break; + } + if (available < static_cast(kChunkHeaderSize)) { + need_more_input = true; + break; // Need more input for a complete header. + } + + const uint8_t* header = input_buffer_.data() + input_offset_; + const uint8_t chunk_type = header[0]; + // Length is stored as a 24-bit little-endian value. + const uint32_t chunk_length = LoadLittleEndian24(header + 1); + + if (available < static_cast(kChunkHeaderSize + chunk_length)) { + // Wait for more input. + need_more_input = true; + break; + } + + const uint8_t* chunk_data = header + kChunkHeaderSize; + + if (chunk_type == kChunkTypeStreamIdentifier) { + if (saw_stream_identifier_) { + return Status::IOError( + "Invalid Snappy framed stream: duplicate stream identifier"); + } + if (chunk_length != kStreamIdentifierPayloadSize || + std::memcmp(chunk_data, kStreamIdentifierPayload, + kStreamIdentifierPayloadSize) != 0) { + return Status::IOError("Invalid Snappy framed stream identifier"); + } + saw_stream_identifier_ = true; + ConsumeFromInputBuffer(kChunkHeaderSize + chunk_length); + continue; + } + + if (chunk_type == kChunkTypePadding || (chunk_type >= 0x80u && chunk_type <= 0xFDu)) { + // Skippable chunk types. + ConsumeFromInputBuffer(kChunkHeaderSize + chunk_length); + continue; + } + + if (!saw_stream_identifier_) { + return Status::IOError( + "Invalid Snappy framed stream: missing stream identifier"); + } + + if (chunk_type >= 0x02u && chunk_type <= 0x7Fu) { + return Status::IOError("Encountered reserved unskippable Snappy framed chunk"); + } + + if (chunk_type != kChunkTypeCompressedData && + chunk_type != kChunkTypeUncompressedData) { + return Status::IOError("Unknown Snappy framed chunk type"); + } + + if (chunk_length < kChunkChecksumSize) { + return Status::IOError("Snappy framed chunk too small for checksum"); + } + + const uint8_t* checksum_ptr = chunk_data; + const uint8_t* payload = chunk_data + kChunkChecksumSize; + const uint32_t payload_length = chunk_length - static_cast(kChunkChecksumSize); + const uint32_t expected_masked_crc = LoadLittleEndian32(checksum_ptr); + + // Decode chunk payload into the uncompressed buffer. + if (chunk_type == kChunkTypeCompressedData) { + size_t uncompressed_size = 0; + if (!snappy::GetUncompressedLength(reinterpret_cast(payload), + static_cast(payload_length), + &uncompressed_size)) { + return Status::IOError("Corrupt Snappy framed compressed chunk"); + } + if (uncompressed_size > kMaxUncompressedChunkSize) { + return Status::IOError("Snappy framed chunk exceeds maximum uncompressed size"); + } + uncompressed_buffer_.resize(uncompressed_size); + if (!snappy::RawUncompress(reinterpret_cast(payload), + static_cast(payload_length), + reinterpret_cast(uncompressed_buffer_.data()))) { + return Status::IOError("Corrupt Snappy framed compressed chunk"); + } + } else { + // Uncompressed data. + if (payload_length > kMaxUncompressedChunkSize) { + return Status::IOError("Snappy framed chunk exceeds maximum uncompressed size"); + } + uncompressed_buffer_.resize(payload_length); + if (payload_length > 0) { + std::memcpy(uncompressed_buffer_.data(), payload, payload_length); + } + } + + const uint32_t actual_masked_crc = + crc32c_masked(uncompressed_buffer_.data(), uncompressed_buffer_.size()); + if (actual_masked_crc != expected_masked_crc) { + return Status::IOError("Snappy framed chunk failed CRC32C check"); + } + + // Consume this chunk from the input buffer now that it has been decoded. + ConsumeFromInputBuffer(kChunkHeaderSize + chunk_length); + + // Emit as much of the decoded data as fits into the caller's output buffer. + const int64_t to_copy = + std::min(static_cast(uncompressed_buffer_.size()), + output_len - bytes_written); + if (to_copy > 0) { + std::memcpy(output + bytes_written, uncompressed_buffer_.data(), + static_cast(to_copy)); + bytes_written += to_copy; + uncompressed_offset_ = to_copy; + } else { + uncompressed_offset_ = 0; + } + + if (uncompressed_offset_ < static_cast(uncompressed_buffer_.size())) { + // Output buffer is full, but the chunk is only partially written. + return DecompressResult{bytes_read, bytes_written, true}; + } + + // Entire chunk emitted. + uncompressed_buffer_.clear(); + uncompressed_offset_ = 0; + } + + if (saw_eof_ && need_more_input) { + return Status::IOError("Truncated Snappy framed stream"); + } + + finished_ = saw_eof_ && input_buffer_.empty() && uncompressed_buffer_.empty(); + + const bool need_more_output = + (!uncompressed_buffer_.empty() && bytes_written == output_len); + + return DecompressResult{bytes_read, bytes_written, need_more_output}; + } + + bool IsFinished() override { return finished_; } + + Status Reset() override { + input_buffer_.clear(); + input_offset_ = 0; + uncompressed_buffer_.clear(); + uncompressed_offset_ = 0; + finished_ = false; + saw_eof_ = false; + saw_stream_identifier_ = false; + return Status::OK(); + } + + private: + void ConsumeFromInputBuffer(size_t nbytes) { + input_offset_ += nbytes; + if (input_offset_ >= input_buffer_.size()) { + input_buffer_.clear(); + input_offset_ = 0; + } else if (input_offset_ > input_buffer_.size() / 2) { + // Compact the buffer to avoid unbounded growth. + const auto remaining = input_buffer_.size() - input_offset_; + std::memmove(input_buffer_.data(), input_buffer_.data() + input_offset_, remaining); + input_buffer_.resize(remaining); + input_offset_ = 0; + } + } + + std::vector input_buffer_; + size_t input_offset_ = 0; + std::vector uncompressed_buffer_; + int64_t uncompressed_offset_ = 0; + bool finished_ = false; + bool saw_eof_ = false; + bool saw_stream_identifier_ = false; +}; + +// ---------------------------------------------------------------------- +// Snappy codec (one-shot raw Snappy bitstream) class SnappyCodec : public Codec { public: @@ -78,11 +477,15 @@ class SnappyCodec : public Codec { } Result> MakeCompressor() override { - return Status::NotImplemented("Streaming compression unsupported with Snappy"); + auto ptr = std::make_shared(); + RETURN_NOT_OK(ptr->Init()); + return ptr; } Result> MakeDecompressor() override { - return Status::NotImplemented("Streaming decompression unsupported with Snappy"); + auto ptr = std::make_shared(); + RETURN_NOT_OK(ptr->Init()); + return ptr; } Compression::type compression_type() const override { return Compression::SNAPPY; } diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 5ba93cc291df..e5f6b6baedd8 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -105,7 +105,7 @@ void CheckCodecRoundtrip(std::unique_ptr& c1, std::unique_ptr& c2, } } -// Check the streaming compressor against one-shot decompression +// Check the streaming compressor against a streaming decompressor void CheckStreamingCompressor(Codec* codec, const std::vector& data) { std::shared_ptr compressor; @@ -136,85 +136,148 @@ void CheckStreamingCompressor(Codec* codec, const std::vector& data) { } // Once every two iterations, do a flush if (do_flush) { - Compressor::FlushResult result; + Compressor::FlushResult flush_result; do { output_len = compressed.size() - compressed_size; output = compressed.data() + compressed_size; - ASSERT_OK_AND_ASSIGN(result, compressor->Flush(output_len, output)); - ASSERT_LE(result.bytes_written, output_len); - compressed_size += result.bytes_written; - if (result.should_retry) { + ASSERT_OK_AND_ASSIGN(flush_result, compressor->Flush(output_len, output)); + ASSERT_LE(flush_result.bytes_written, output_len); + compressed_size += flush_result.bytes_written; + if (flush_result.should_retry) { compressed.resize(compressed.capacity() * 2); } - } while (result.should_retry); + } while (flush_result.should_retry); } do_flush = !do_flush; } // End the compressed stream - Compressor::EndResult result; + Compressor::EndResult end_result; do { int64_t output_len = compressed.size() - compressed_size; uint8_t* output = compressed.data() + compressed_size; - ASSERT_OK_AND_ASSIGN(result, compressor->End(output_len, output)); - ASSERT_LE(result.bytes_written, output_len); - compressed_size += result.bytes_written; - if (result.should_retry) { + ASSERT_OK_AND_ASSIGN(end_result, compressor->End(output_len, output)); + ASSERT_LE(end_result.bytes_written, output_len); + compressed_size += end_result.bytes_written; + if (end_result.should_retry) { compressed.resize(compressed.capacity() * 2); } - } while (result.should_retry); + } while (end_result.should_retry); - // Check decompressing the compressed data - std::vector decompressed(data.size()); - ASSERT_OK(codec->Decompress(compressed_size, compressed.data(), decompressed.size(), - decompressed.data())); + // Check decompressing the compressed data using a streaming decompressor. + std::shared_ptr decompressor; + ASSERT_OK_AND_ASSIGN(decompressor, codec->MakeDecompressor()); + + // Ensure a non-zero output buffer even when the uncompressed size is 0. + std::vector decompressed(std::max(1, data.size())); + int64_t decompressed_size = 0; + const uint8_t* comp_input = compressed.data(); + int64_t comp_remaining = compressed_size; + + while (!decompressor->IsFinished()) { + const bool at_eof = (comp_remaining == 0); + const int64_t input_len = + at_eof ? 0 : std::min(comp_remaining, static_cast(23)); + const uint8_t* input = at_eof ? nullptr : comp_input; + int64_t output_len = decompressed.size() - decompressed_size; + uint8_t* output = decompressed.data() + decompressed_size; + ASSERT_OK_AND_ASSIGN(auto result, + decompressor->Decompress(input_len, input, output_len, output)); + ASSERT_LE(result.bytes_read, input_len); + ASSERT_LE(result.bytes_written, output_len); + ASSERT_TRUE(decompressor->IsFinished() || result.need_more_output || + result.bytes_written > 0 || result.bytes_read > 0) + << "Decompression not progressing anymore"; + if (result.need_more_output) { + decompressed.resize(decompressed.capacity() * 2); + } + decompressed_size += result.bytes_written; + comp_input += result.bytes_read; + comp_remaining -= result.bytes_read; + } + ASSERT_TRUE(decompressor->IsFinished()); + ASSERT_EQ(comp_remaining, 0); + + decompressed.resize(decompressed_size); ASSERT_EQ(data, decompressed); } -// Check the streaming decompressor against one-shot compression +// Check the streaming decompressor against a streaming compressor void CheckStreamingDecompressor(Codec* codec, const std::vector& data) { - // Create compressed data - int64_t max_compressed_len = codec->MaxCompressedLen(data.size(), data.data()); - std::vector compressed(max_compressed_len); - int64_t compressed_size; - ASSERT_OK_AND_ASSIGN( - compressed_size, - codec->Compress(data.size(), data.data(), max_compressed_len, compressed.data())); + // Create compressed data using the streaming compressor + std::shared_ptr compressor; + ASSERT_OK_AND_ASSIGN(compressor, codec->MakeCompressor()); + + std::vector compressed(1); + int64_t compressed_size = 0; + const uint8_t* input = data.data(); + int64_t remaining = data.size(); + + while (remaining > 0) { + int64_t input_len = std::min(remaining, static_cast(1111)); + int64_t output_len = compressed.size() - compressed_size; + uint8_t* output = compressed.data() + compressed_size; + ASSERT_OK_AND_ASSIGN(auto result, + compressor->Compress(input_len, input, output_len, output)); + ASSERT_LE(result.bytes_read, input_len); + ASSERT_LE(result.bytes_written, output_len); + compressed_size += result.bytes_written; + input += result.bytes_read; + remaining -= result.bytes_read; + if (result.bytes_read == 0) { + compressed.resize(compressed.capacity() * 2); + } + } + + // End the compressed stream + Compressor::EndResult end_result; + do { + int64_t output_len = compressed.size() - compressed_size; + uint8_t* output = compressed.data() + compressed_size; + ASSERT_OK_AND_ASSIGN(end_result, compressor->End(output_len, output)); + ASSERT_LE(end_result.bytes_written, output_len); + compressed_size += end_result.bytes_written; + if (end_result.should_retry) { + compressed.resize(compressed.capacity() * 2); + } + } while (end_result.should_retry); + compressed.resize(compressed_size); // Run streaming decompression std::shared_ptr decompressor; ASSERT_OK_AND_ASSIGN(decompressor, codec->MakeDecompressor()); - std::vector decompressed; + std::vector decompressed(10); int64_t decompressed_size = 0; - const uint8_t* input = compressed.data(); - int64_t remaining = compressed.size(); + const uint8_t* comp_input = compressed.data(); + int64_t comp_remaining = compressed.size(); - decompressed.resize(10); while (!decompressor->IsFinished()) { - // Feed a small amount each time - int64_t input_len = std::min(remaining, static_cast(23)); + const bool at_eof = (comp_remaining == 0); + const int64_t input_len = + at_eof ? 0 : std::min(comp_remaining, static_cast(23)); + const uint8_t* input = at_eof ? nullptr : comp_input; int64_t output_len = decompressed.size() - decompressed_size; uint8_t* output = decompressed.data() + decompressed_size; ASSERT_OK_AND_ASSIGN(auto result, decompressor->Decompress(input_len, input, output_len, output)); ASSERT_LE(result.bytes_read, input_len); ASSERT_LE(result.bytes_written, output_len); - ASSERT_TRUE(result.need_more_output || result.bytes_written > 0 || - result.bytes_read > 0) + ASSERT_TRUE(decompressor->IsFinished() || result.need_more_output || + result.bytes_written > 0 || result.bytes_read > 0) << "Decompression not progressing anymore"; if (result.need_more_output) { decompressed.resize(decompressed.capacity() * 2); } decompressed_size += result.bytes_written; - input += result.bytes_read; - remaining -= result.bytes_read; + comp_input += result.bytes_read; + comp_remaining -= result.bytes_read; } ASSERT_TRUE(decompressor->IsFinished()); - ASSERT_EQ(remaining, 0); + ASSERT_EQ(comp_remaining, 0); // Check the decompressed data decompressed.resize(decompressed_size); @@ -281,16 +344,18 @@ void CheckStreamingRoundtrip(std::shared_ptr compressor, int64_t remaining = compressed.size(); while (!decompressor->IsFinished()) { + const bool at_eof = (remaining == 0); // Feed a varying amount each time - int64_t input_len = std::min(remaining, make_buf_size()); + int64_t input_len = at_eof ? 0 : std::min(remaining, make_buf_size()); + const uint8_t* in_ptr = at_eof ? nullptr : input; int64_t output_len = decompressed.size() - decompressed_size; uint8_t* output = decompressed.data() + decompressed_size; ASSERT_OK_AND_ASSIGN( - auto result, decompressor->Decompress(input_len, input, output_len, output)); + auto result, decompressor->Decompress(input_len, in_ptr, output_len, output)); ASSERT_LE(result.bytes_read, input_len); ASSERT_LE(result.bytes_written, output_len); - ASSERT_TRUE(result.need_more_output || result.bytes_written > 0 || - result.bytes_read > 0) + ASSERT_TRUE(decompressor->IsFinished() || result.need_more_output || + result.bytes_written > 0 || result.bytes_read > 0) << "Decompression not progressing anymore"; if (result.need_more_output) { decompressed.resize(decompressed.capacity() * 2); @@ -695,9 +760,6 @@ TEST_P(CodecTest, OutputBufferIsSmall) { } TEST_P(CodecTest, StreamingCompressor) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming compression"; - } if (GetCompression() == Compression::BZ2) { GTEST_SKIP() << "Z2 doesn't support one-shot decompression"; } @@ -719,9 +781,6 @@ TEST_P(CodecTest, StreamingCompressor) { } TEST_P(CodecTest, StreamingDecompressor) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression."; - } if (GetCompression() == Compression::BZ2) { GTEST_SKIP() << "Z2 doesn't support one-shot compression"; } @@ -743,9 +802,6 @@ TEST_P(CodecTest, StreamingDecompressor) { } TEST_P(CodecTest, StreamingRoundtrip) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression"; - } if (GetCompression() == Compression::LZ4 || GetCompression() == Compression::LZ4_HADOOP) { GTEST_SKIP() << "LZ4 raw format doesn't support streaming compression."; @@ -764,9 +820,6 @@ TEST_P(CodecTest, StreamingRoundtrip) { } TEST_P(CodecTest, StreamingDecompressorReuse) { - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression"; - } if (GetCompression() == Compression::LZ4 || GetCompression() == Compression::LZ4_HADOOP) { GTEST_SKIP() << "LZ4 raw format doesn't support streaming decompression."; @@ -789,9 +842,6 @@ TEST_P(CodecTest, StreamingDecompressorReuse) { TEST_P(CodecTest, StreamingMultiFlush) { // Regression test for ARROW-11937 - if (GetCompression() == Compression::SNAPPY) { - GTEST_SKIP() << "snappy doesn't support streaming decompression"; - } if (GetCompression() == Compression::LZ4 || GetCompression() == Compression::LZ4_HADOOP) { GTEST_SKIP() << "LZ4 raw format doesn't support streaming decompression."; diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index b06cf2d22430..6c561bbf1e52 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -145,6 +145,14 @@ class GZipDecompressor : public Decompressor { int64_t output_len, uint8_t* output) override { static constexpr auto input_limit = static_cast(std::numeric_limits::max()); + + // Some zlib versions return Z_STREAM_ERROR if next_out is NULL, even when + // avail_out is 0. Our streaming API uses need_more_output to request a + // non-empty buffer in that case. + if (output_len == 0) { + return DecompressResult{0, 0, true}; + } + stream_.next_in = const_cast(reinterpret_cast(input)); stream_.avail_in = static_cast(std::min(input_len, input_limit)); stream_.next_out = reinterpret_cast(output); diff --git a/cpp/src/arrow/util/crc32c.cc b/cpp/src/arrow/util/crc32c.cc new file mode 100644 index 000000000000..ef044ef9e4c0 --- /dev/null +++ b/cpp/src/arrow/util/crc32c.cc @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/crc32c.h" + +#include +#include + +namespace arrow { +namespace util { + +namespace { + +// Castagnoli polynomial, reflected representation. +constexpr uint32_t kCrc32cPolynomialReflected = 0x82F63B78u; + +uint32_t ComputeCrc32c(const uint8_t* data, std::size_t length) { + uint32_t crc = 0xFFFFFFFFu; + for (std::size_t i = 0; i < length; ++i) { + crc ^= static_cast(data[i]); + for (int bit = 0; bit < 8; ++bit) { + const uint32_t mask = static_cast(-(crc & 1u)); + crc = (crc >> 1) ^ (kCrc32cPolynomialReflected & mask); + } + } + return ~crc; +} + +} // namespace + +uint32_t crc32c_masked(const void* data, std::size_t length) { + const auto* bytes = static_cast(data); + const uint32_t crc = ComputeCrc32c(bytes, length); + // Masking as defined in the Snappy framing format specification. + return ((crc >> 15) | (crc << 17)) + 0xa282ead8u; +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/crc32c.h b/cpp/src/arrow/util/crc32c.h new file mode 100644 index 000000000000..950b854fc2f4 --- /dev/null +++ b/cpp/src/arrow/util/crc32c.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/util/visibility.h" + +namespace arrow { +namespace util { + +/// \brief Compute the CRC32C checksum of the given data and return the masked value. +/// +/// The CRC32C checksum uses the Castagnoli polynomial and is masked following +/// the Snappy framing specification. The masking is reversible and is intended +/// to avoid undesirable interactions between the checksum and the data it +/// protects. +/// +/// This helper is intended for Snappy framed streams, where each uncompressed +/// chunk is protected by a masked CRC32C of the uncompressed data. +ARROW_EXPORT +uint32_t crc32c_masked(const void* data, std::size_t length); + +} // namespace util +} // namespace arrow