From 2cd021aa8cbc589137c1ab6b18abf905a3f121e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Cumplido?= Date: Wed, 18 Mar 2026 14:48:12 +0100 Subject: [PATCH 1/3] GH-49548: [C++][FlightRPC] Decouple Flight Serialize/Deserialize from gRPC transport --- cpp/src/arrow/flight/flight_internals_test.cc | 72 ++++++ .../arrow/flight/serialization_internal.cc | 232 +++++++++++++++++ cpp/src/arrow/flight/serialization_internal.h | 8 + .../transport/grpc/serialization_internal.cc | 244 ++---------------- cpp/src/arrow/flight/types.cc | 4 + cpp/src/arrow/flight/types.h | 3 + 6 files changed, 337 insertions(+), 226 deletions(-) diff --git a/cpp/src/arrow/flight/flight_internals_test.cc b/cpp/src/arrow/flight/flight_internals_test.cc index bb14ddd6655e..e77d2d3f4257 100644 --- a/cpp/src/arrow/flight/flight_internals_test.cc +++ b/cpp/src/arrow/flight/flight_internals_test.cc @@ -23,13 +23,18 @@ #include #include +#include "arrow/buffer.h" #include "arrow/flight/client_cookie_middleware.h" #include "arrow/flight/client_middleware.h" #include "arrow/flight/cookie_internal.h" #include "arrow/flight/serialization_internal.h" +#include "arrow/flight/server.h" #include "arrow/flight/test_util.h" +#include "arrow/flight/transport.h" #include "arrow/flight/transport/grpc/util_internal.h" #include "arrow/flight/types.h" +#include "arrow/ipc/reader.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/string.h" @@ -730,6 +735,73 @@ TEST(GrpcTransport, FlightDataDeserialize) { #endif } +// ---------------------------------------------------------------------- +// Transport-agnostic serialization roundtrip tests + +TEST(FlightSerialization, RoundtripPayloadWithBody) { + // Use RecordBatchStream to generate FlightPayloads + auto schema = arrow::schema({arrow::field("a", arrow::int32())}); + auto arr = ArrayFromJSON(arrow::int32(), "[1, 2, 3]"); + auto batch = RecordBatch::Make(schema, 3, {arr}); + auto reader = RecordBatchReader::Make({batch}).ValueOrDie(); + RecordBatchStream stream(std::move(reader)); + + // Get a FlightPayload from the stream + ASSERT_OK_AND_ASSIGN(auto schema_payload, stream.GetSchemaPayload()); + ASSERT_OK_AND_ASSIGN(auto flight_payload, stream.Next()); + + // Add app_metadata to the flight payload + flight_payload.app_metadata = Buffer::FromString("test-metadata"); + + // Serialize FlightPayload to BufferVector + ASSERT_OK_AND_ASSIGN(auto buffers, internal::SerializePayloadToBuffers(flight_payload)); + ASSERT_GT(buffers.size(), 0); + + // Concatenate to a single buffer for deserialization and deserialize. + ASSERT_OK_AND_ASSIGN(auto concat, ConcatenateBuffers(buffers)); + ASSERT_OK_AND_ASSIGN(auto data, internal::DeserializeFlightData(concat)); + + // Verify IPC metadata (data_header) is present + ASSERT_NE(data.metadata, nullptr); + ASSERT_GT(data.metadata->size(), 0); + + // Verify app_metadata + ASSERT_NE(data.app_metadata, nullptr); + ASSERT_EQ(data.app_metadata->ToString(), "test-metadata"); + + // Verify body and message are present + ASSERT_NE(data.body, nullptr); + ASSERT_GT(data.body->size(), 0); + ASSERT_OK_AND_ASSIGN(auto message, data.OpenMessage()); + ASSERT_NE(message, nullptr); + // Also verify the RecordBatch roundtrips correctly + ASSERT_OK_AND_ASSIGN(auto result_batch, + ipc::ReadRecordBatch(*message, schema, /*dictionaries=*/nullptr, + ipc::IpcReadOptions::Defaults())); + ASSERT_TRUE(result_batch->Equals(*batch)); +} + +TEST(FlightSerialization, RoundtripMetadataOnly) { + // A metadata-only payload (no IPC body, no descriptor) + auto app_meta = Buffer::FromString("metadata-only-message"); + + FlightPayload payload; + payload.app_metadata = std::move(app_meta); + + // Serialize + ASSERT_OK_AND_ASSIGN(auto buffers, internal::SerializePayloadToBuffers(payload)); + ASSERT_OK_AND_ASSIGN(auto concat, ConcatenateBuffers(buffers)); + + // Deserialize + ASSERT_OK_AND_ASSIGN(auto data, internal::DeserializeFlightData(concat)); + + // Verify: no descriptor, no IPC metadata, just app_metadata + ASSERT_EQ(data.descriptor, nullptr); + ASSERT_EQ(data.metadata, nullptr); + ASSERT_NE(data.app_metadata, nullptr); + ASSERT_EQ(data.app_metadata->ToString(), "metadata-only-message"); +} + // ---------------------------------------------------------------------- // Transport abstraction tests diff --git a/cpp/src/arrow/flight/serialization_internal.cc b/cpp/src/arrow/flight/serialization_internal.cc index 604375311d30..a09007581498 100644 --- a/cpp/src/arrow/flight/serialization_internal.cc +++ b/cpp/src/arrow/flight/serialization_internal.cc @@ -17,18 +17,24 @@ #include "arrow/flight/serialization_internal.h" +#include #include #include #include +#include +#include +#include #include "arrow/buffer.h" #include "arrow/flight/protocol_internal.h" #include "arrow/io/memory.h" +#include "arrow/ipc/message.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/util/logging_internal.h" // Lambda helper & CTAD template @@ -612,6 +618,232 @@ Status ToProto(const CloseSessionResult& result, pb::CloseSessionResult* pb_resu return Status::OK(); } +namespace { +using google::protobuf::internal::WireFormatLite; +using google::protobuf::io::ArrayOutputStream; +using google::protobuf::io::CodedInputStream; +using google::protobuf::io::CodedOutputStream; +static constexpr int64_t kInt32Max = std::numeric_limits::max(); +const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0}; + +// Update the sizes of our Protobuf fields based on the given IPC payload. +arrow::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, bool has_body, + size_t* header_size, int32_t* metadata_size) { + DCHECK_LE(ipc_msg.metadata->size(), kInt32Max); + *metadata_size = static_cast(ipc_msg.metadata->size()); + + // 1 byte for metadata tag + *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size); + + // 2 bytes for body tag + if (has_body) { + // We write the body tag in the header but not the actual body data + *header_size += 2 + WireFormatLite::LengthDelimitedSize(ipc_msg.body_length) - + ipc_msg.body_length; + } + + return arrow::Status::OK(); +} + +bool ReadBytesZeroCopy(const std::shared_ptr& source_data, + CodedInputStream* input, std::shared_ptr* out) { + uint32_t length; + if (!input->ReadVarint32(&length)) { + return false; + } + auto buf = + SliceBuffer(source_data, input->CurrentPosition(), static_cast(length)); + *out = buf; + return input->Skip(static_cast(length)); +} + +} // namespace + +arrow::Result SerializePayloadToBuffers(const FlightPayload& msg) { + // Size of the IPC body (protobuf: data_body) + size_t body_size = 0; + // Size of the Protobuf "header" (everything except for the body) + size_t header_size = 0; + // Size of IPC header metadata (protobuf: data_header) + int32_t metadata_size = 0; + + // Write the descriptor if present + int32_t descriptor_size = 0; + if (msg.descriptor != nullptr) { + DCHECK_LE(msg.descriptor->size(), kInt32Max); + descriptor_size = static_cast(msg.descriptor->size()); + header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size); + } + + // App metadata tag if appropriate + int32_t app_metadata_size = 0; + if (msg.app_metadata && msg.app_metadata->size() > 0) { + DCHECK_LE(msg.app_metadata->size(), kInt32Max); + app_metadata_size = static_cast(msg.app_metadata->size()); + header_size += 1 + WireFormatLite::LengthDelimitedSize(app_metadata_size); + } + + const arrow::ipc::IpcPayload& ipc_msg = msg.ipc_message; + // No data in this payload (metadata-only). + bool has_ipc = ipc_msg.type != ipc::MessageType::NONE; + bool has_body = has_ipc ? ipc::Message::HasBody(ipc_msg.type) : false; + + if (has_ipc) { + DCHECK(has_body || ipc_msg.body_length == 0); + ARROW_RETURN_NOT_OK( + IpcMessageHeaderSize(ipc_msg, has_body, &header_size, &metadata_size)); + body_size = static_cast(ipc_msg.body_length); + } + + // TODO(wesm): messages over 2GB unlikely to be yet supported + // Validated in WritePayload since returning error here causes gRPC to fail an assertion + DCHECK_LE(body_size, kInt32Max); + + // Allocate and initialize buffers + arrow::BufferVector buffers; + ARROW_ASSIGN_OR_RAISE(auto header_buf, arrow::AllocateBuffer(header_size)); + + // Force the header_stream to be destructed, which actually flushes + // the data into the slice. + { + ArrayOutputStream header_writer(const_cast(header_buf->mutable_data()), + static_cast(header_size)); + CodedOutputStream header_stream(&header_writer); + + // Write descriptor + if (msg.descriptor != nullptr) { + WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); + header_stream.WriteVarint32(descriptor_size); + header_stream.WriteRawMaybeAliased(msg.descriptor->data(), + static_cast(msg.descriptor->size())); + } + + // Write header + if (has_ipc) { + WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); + header_stream.WriteVarint32(metadata_size); + header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(), + static_cast(ipc_msg.metadata->size())); + } + + // Write app metadata + if (app_metadata_size > 0) { + WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); + header_stream.WriteVarint32(app_metadata_size); + header_stream.WriteRawMaybeAliased(msg.app_metadata->data(), + static_cast(msg.app_metadata->size())); + } + + if (has_body) { + // Write body tag + WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); + header_stream.WriteVarint32(static_cast(body_size)); + + // Enqueue body buffers for writing, without copying + for (const auto& buffer : ipc_msg.body_buffers) { + // Buffer may be null when the row length is zero, or when all + // entries are invalid. + if (!buffer || buffer->size() == 0) continue; + buffers.push_back(buffer); + + // Write padding if not multiple of 8 + const auto remainder = static_cast( + bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size()); + if (remainder) { + buffers.push_back(std::make_shared(kPaddingBytes, remainder)); + } + } + } + + DCHECK_EQ(static_cast(header_size), header_stream.ByteCount()); + } + // Once header is written we add it as the first buffer in the output vector. + buffers.insert(buffers.begin(), std::move(header_buf)); + + return buffers; +} + +// Read internal::FlightData from arrow::Buffer containing FlightData +// protobuf without copying +arrow::Result DeserializeFlightData( + const std::shared_ptr& buffer) { + if (!buffer) { + return Status::Invalid("No payload"); + } + + arrow::flight::internal::FlightData out; + + auto buffer_length = static_cast(buffer->size()); + CodedInputStream pb_stream(buffer->data(), buffer_length); + + pb_stream.SetTotalBytesLimit(buffer_length); + + // This is the bytes remaining when using CodedInputStream like this + while (pb_stream.BytesUntilTotalBytesLimit()) { + const uint32_t tag = pb_stream.ReadTag(); + const int field_number = WireFormatLite::GetTagFieldNumber(tag); + switch (field_number) { + case pb::FlightData::kFlightDescriptorFieldNumber: { + pb::FlightDescriptor pb_descriptor; + uint32_t length; + if (!pb_stream.ReadVarint32(&length)) { + return Status::Invalid("Unable to parse length of FlightDescriptor"); + } + // Can't use ParseFromCodedStream as this reads the entire + // rest of the stream into the descriptor command field. + std::string buffer; + if (!pb_stream.ReadString(&buffer, length)) { + return Status::Invalid("Unable to read FlightDescriptor from protobuf"); + } + if (!pb_descriptor.ParseFromString(buffer)) { + return Status::Invalid("Unable to parse FlightDescriptor"); + } + arrow::flight::FlightDescriptor descriptor; + ARROW_RETURN_NOT_OK( + arrow::flight::internal::FromProto(pb_descriptor, &descriptor)); + out.descriptor = std::make_unique(descriptor); + } break; + case pb::FlightData::kDataHeaderFieldNumber: { + if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.metadata)) { + return Status::Invalid("Unable to read FlightData metadata"); + } + } break; + case pb::FlightData::kAppMetadataFieldNumber: { + if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.app_metadata)) { + return Status::Invalid("Unable to read FlightData application metadata"); + } + } break; + case pb::FlightData::kDataBodyFieldNumber: { + if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.body)) { + return Status::Invalid("Unable to read FlightData body"); + } + } break; + default: { + // Unknown field. We should skip it for compatibility. + if (!WireFormatLite::SkipField(&pb_stream, tag)) { + return Status::Invalid("Could not skip unknown field tag in FlightData"); + } + break; + } + } + } + + // TODO(wesm): Where and when should we verify that the FlightData is not + // malformed? + + // Set the default value for an unspecified FlightData body. The other + // fields can be null if they're unspecified. + if (out.body == nullptr) { + out.body = std::make_shared(nullptr, 0); + } + + return out; +} + } // namespace internal } // namespace flight } // namespace arrow diff --git a/cpp/src/arrow/flight/serialization_internal.h b/cpp/src/arrow/flight/serialization_internal.h index 4d07efad8150..6b307d5162ad 100644 --- a/cpp/src/arrow/flight/serialization_internal.h +++ b/cpp/src/arrow/flight/serialization_internal.h @@ -182,6 +182,14 @@ ARROW_FLIGHT_EXPORT Status ToProto(const CloseSessionResult& result, Status ToPayload(const FlightDescriptor& descr, std::shared_ptr* out); +/// \brief Serialize a FlightPayload to a vector of buffers. +ARROW_FLIGHT_EXPORT +arrow::Result SerializePayloadToBuffers(const FlightPayload& msg); + +/// \brief Deserialize FlightData from a contiguous buffer. +arrow::Result DeserializeFlightData( + const std::shared_ptr& buffer); + // We want to reuse RecordBatchStreamReader's implementation while // (1) Adapting it to the Flight message format // (2) Allowing pure-metadata messages before data is sent diff --git a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc index d4848c5077d1..ee13a8e2022f 100644 --- a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc +++ b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc @@ -17,10 +17,7 @@ #include "arrow/flight/transport/grpc/serialization_internal.h" -// todo cleanup includes - #include -#include #include #include #include @@ -32,10 +29,6 @@ # pragma warning(disable : 4267) #endif -#include -#include -#include - #include #include #include @@ -49,9 +42,7 @@ #include "arrow/flight/serialization_internal.h" #include "arrow/flight/transport.h" #include "arrow/flight/transport/grpc/util_internal.h" -#include "arrow/ipc/message.h" #include "arrow/ipc/writer.h" -#include "arrow/util/bit_util.h" #include "arrow/util/logging_internal.h" namespace arrow { @@ -61,28 +52,10 @@ namespace grpc { namespace pb = arrow::flight::protocol; -static constexpr int64_t kInt32Max = std::numeric_limits::max(); -using google::protobuf::internal::WireFormatLite; -using google::protobuf::io::ArrayOutputStream; -using google::protobuf::io::CodedInputStream; -using google::protobuf::io::CodedOutputStream; - using ::grpc::ByteBuffer; namespace { -bool ReadBytesZeroCopy(const std::shared_ptr& source_data, - CodedInputStream* input, std::shared_ptr* out) { - uint32_t length; - if (!input->ReadVarint32(&length)) { - return false; - } - auto buf = - SliceBuffer(source_data, input->CurrentPosition(), static_cast(length)); - *out = buf; - return input->Skip(static_cast(length)); -} - // Internal wrapper for gRPC ByteBuffer so its memory can be exposed to Arrow // consumers with zero-copy class GrpcBuffer : public MutableBuffer { @@ -176,142 +149,29 @@ arrow::Result<::grpc::Slice> SliceFromBuffer(const std::shared_ptr& buf) return slice; } -const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0}; - -// Update the sizes of our Protobuf fields based on the given IPC payload. -::grpc::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, bool has_body, - size_t* header_size, int32_t* metadata_size) { - DCHECK_LE(ipc_msg.metadata->size(), kInt32Max); - *metadata_size = static_cast(ipc_msg.metadata->size()); - - // 1 byte for metadata tag - *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size); - - // 2 bytes for body tag - if (has_body) { - // We write the body tag in the header but not the actual body data - *header_size += 2 + WireFormatLite::LengthDelimitedSize(ipc_msg.body_length) - - ipc_msg.body_length; - } - - return ::grpc::Status::OK; -} - } // namespace ::grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out, bool* own_buffer) { - // Size of the IPC body (protobuf: data_body) - size_t body_size = 0; - // Size of the Protobuf "header" (everything except for the body) - size_t header_size = 0; - // Size of IPC header metadata (protobuf: data_header) - int32_t metadata_size = 0; - - // Write the descriptor if present - int32_t descriptor_size = 0; - if (msg.descriptor != nullptr) { - DCHECK_LE(msg.descriptor->size(), kInt32Max); - descriptor_size = static_cast(msg.descriptor->size()); - header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size); + // Retrieve BufferVector from the transport-agnostic serialization. + auto buffers_result = arrow::flight::internal::SerializePayloadToBuffers(msg); + if (!buffers_result.ok()) { + return ToGrpcStatus(buffers_result.status()); } - // App metadata tag if appropriate - int32_t app_metadata_size = 0; - if (msg.app_metadata && msg.app_metadata->size() > 0) { - DCHECK_LE(msg.app_metadata->size(), kInt32Max); - app_metadata_size = static_cast(msg.app_metadata->size()); - header_size += 1 + WireFormatLite::LengthDelimitedSize(app_metadata_size); - } - - const arrow::ipc::IpcPayload& ipc_msg = msg.ipc_message; - // No data in this payload (metadata-only). - bool has_ipc = ipc_msg.type != ipc::MessageType::NONE; - bool has_body = has_ipc ? ipc::Message::HasBody(ipc_msg.type) : false; - - if (has_ipc) { - DCHECK(has_body || ipc_msg.body_length == 0); - GRPC_RETURN_NOT_GRPC_OK( - IpcMessageHeaderSize(ipc_msg, has_body, &header_size, &metadata_size)); - body_size = static_cast(ipc_msg.body_length); - } - - // TODO(wesm): messages over 2GB unlikely to be yet supported - // Validated in WritePayload since returning error here causes gRPC to fail an assertion - DCHECK_LE(body_size, kInt32Max); - - // Allocate and initialize slices std::vector<::grpc::Slice> slices; - slices.emplace_back(header_size); - - // Force the header_stream to be destructed, which actually flushes - // the data into the slice. - { - ArrayOutputStream header_writer(const_cast(slices[0].begin()), - static_cast(slices[0].size())); - CodedOutputStream header_stream(&header_writer); - - // Write descriptor - if (msg.descriptor != nullptr) { - WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber, - WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); - header_stream.WriteVarint32(descriptor_size); - header_stream.WriteRawMaybeAliased(msg.descriptor->data(), - static_cast(msg.descriptor->size())); - } - - // Write header - if (has_ipc) { - WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber, - WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); - header_stream.WriteVarint32(metadata_size); - header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(), - static_cast(ipc_msg.metadata->size())); - } - - // Write app metadata - if (app_metadata_size > 0) { - WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber, - WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); - header_stream.WriteVarint32(app_metadata_size); - header_stream.WriteRawMaybeAliased(msg.app_metadata->data(), - static_cast(msg.app_metadata->size())); - } - - if (has_body) { - // Write body tag - WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber, - WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream); - header_stream.WriteVarint32(static_cast(body_size)); - - // Enqueue body buffers for writing, without copying - for (const auto& buffer : ipc_msg.body_buffers) { - // Buffer may be null when the row length is zero, or when all - // entries are invalid. - if (!buffer || buffer->size() == 0) continue; - - ::grpc::Slice slice; - auto status = SliceFromBuffer(buffer).Value(&slice); - if (ARROW_PREDICT_FALSE(!status.ok())) { - // This will likely lead to abort as gRPC cannot recover from an error here - return ToGrpcStatus(status); - } - slices.push_back(std::move(slice)); - - // Write padding if not multiple of 8 - const auto remainder = static_cast( - bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size()); - if (remainder) { - slices.emplace_back(kPaddingBytes, remainder); - } - } + slices.reserve(buffers_result->size()); + for (const auto& buffer : *buffers_result) { + ::grpc::Slice slice; + auto status = SliceFromBuffer(buffer).Value(&slice); + if (ARROW_PREDICT_FALSE(!status.ok())) { + // This will likely lead to abort as gRPC cannot recover from an error here + return ToGrpcStatus(status); } - - DCHECK_EQ(static_cast(header_size), header_stream.ByteCount()); + slices.push_back(std::move(slice)); } - // Hand off the slices to the returned ByteBuffer - *out = ::grpc::ByteBuffer(slices.data(), slices.size()); + *out = ByteBuffer(slices.data(), slices.size()); *own_buffer = true; return ::grpc::Status::OK; } @@ -324,84 +184,16 @@ ::grpc::Status FlightDataDeserialize(ByteBuffer* buffer, return {::grpc::StatusCode::INTERNAL, "No payload"}; } - // Reset fields in case the caller reuses a single allocation - out->descriptor = nullptr; - out->app_metadata = nullptr; - out->metadata = nullptr; - out->body = nullptr; - std::shared_ptr wrapped_buffer; GRPC_RETURN_NOT_OK(GrpcBuffer::Wrap(buffer, &wrapped_buffer)); - - auto buffer_length = static_cast(wrapped_buffer->size()); - CodedInputStream pb_stream(wrapped_buffer->data(), buffer_length); - - pb_stream.SetTotalBytesLimit(buffer_length); - - // This is the bytes remaining when using CodedInputStream like this - while (pb_stream.BytesUntilTotalBytesLimit()) { - const uint32_t tag = pb_stream.ReadTag(); - const int field_number = WireFormatLite::GetTagFieldNumber(tag); - switch (field_number) { - case pb::FlightData::kFlightDescriptorFieldNumber: { - pb::FlightDescriptor pb_descriptor; - uint32_t length; - if (!pb_stream.ReadVarint32(&length)) { - return {::grpc::StatusCode::INTERNAL, - "Unable to parse length of FlightDescriptor"}; - } - // Can't use ParseFromCodedStream as this reads the entire - // rest of the stream into the descriptor command field. - std::string buffer; - if (!pb_stream.ReadString(&buffer, length)) { - return {::grpc::StatusCode::INTERNAL, - "Unable to read FlightDescriptor from protobuf"}; - } - if (!pb_descriptor.ParseFromString(buffer)) { - return {::grpc::StatusCode::INTERNAL, "Unable to parse FlightDescriptor"}; - } - arrow::flight::FlightDescriptor descriptor; - GRPC_RETURN_NOT_OK( - arrow::flight::internal::FromProto(pb_descriptor, &descriptor)); - out->descriptor = std::make_unique(descriptor); - } break; - case pb::FlightData::kDataHeaderFieldNumber: { - if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) { - return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData metadata"}; - } - } break; - case pb::FlightData::kAppMetadataFieldNumber: { - if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->app_metadata)) { - return {::grpc::StatusCode::INTERNAL, - "Unable to read FlightData application metadata"}; - } - } break; - case pb::FlightData::kDataBodyFieldNumber: { - if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) { - return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData body"}; - } - } break; - default: { - // Unknown field. We should skip it for compatibility. - if (!WireFormatLite::SkipField(&pb_stream, tag)) { - return {::grpc::StatusCode::INTERNAL, - "Could not skip unknown field tag in FlightData"}; - } - break; - } - } - } + // Release gRPC memory now that Arrow Buffer holds its own reference. buffer->Clear(); - // TODO(wesm): Where and when should we verify that the FlightData is not - // malformed? - - // Set the default value for an unspecified FlightData body. The other - // fields can be null if they're unspecified. - if (out->body == nullptr) { - out->body = std::make_shared(nullptr, 0); + auto result = arrow::flight::internal::DeserializeFlightData(wrapped_buffer); + if (!result.ok()) { + return ToGrpcStatus(result.status()); } - + *out = result.MoveValueUnsafe(); return ::grpc::Status::OK; } diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index 8166513d4e3f..495425c4aebf 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -886,6 +886,10 @@ Status FlightPayload::Validate() const { return Status::OK(); } +arrow::Result FlightPayload::SerializeToBuffers() const { + return internal::SerializePayloadToBuffers(*this); +} + std::string ActionType::ToString() const { return arrow::internal::JoinToString(""); diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index d498ac67f7a7..fdd1881a6370 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -904,6 +904,9 @@ struct ARROW_FLIGHT_EXPORT FlightPayload { /// \brief Check that the payload can be written to the wire. Status Validate() const; + + /// \brief Serialize this payload to a vector of buffers. + arrow::Result SerializeToBuffers() const; }; // A wrapper around arrow.flight.protocol.PutResult is not defined From bb735a8b9b86040669773a74f12ca0c8816e0e01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Cumplido?= Date: Wed, 18 Mar 2026 15:17:24 +0100 Subject: [PATCH 2/3] Fix ASAN/UBSAN test failure --- cpp/src/arrow/flight/flight_internals_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/flight/flight_internals_test.cc b/cpp/src/arrow/flight/flight_internals_test.cc index e77d2d3f4257..a66ac210b861 100644 --- a/cpp/src/arrow/flight/flight_internals_test.cc +++ b/cpp/src/arrow/flight/flight_internals_test.cc @@ -775,8 +775,9 @@ TEST(FlightSerialization, RoundtripPayloadWithBody) { ASSERT_OK_AND_ASSIGN(auto message, data.OpenMessage()); ASSERT_NE(message, nullptr); // Also verify the RecordBatch roundtrips correctly + ipc::DictionaryMemo dict_memo; ASSERT_OK_AND_ASSIGN(auto result_batch, - ipc::ReadRecordBatch(*message, schema, /*dictionaries=*/nullptr, + ipc::ReadRecordBatch(*message, schema, &dict_memo, ipc::IpcReadOptions::Defaults())); ASSERT_TRUE(result_batch->Equals(*batch)); } From e852d6694527fe92ab5f09923b15cedbbc349589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Cumplido?= Date: Wed, 18 Mar 2026 15:59:16 +0100 Subject: [PATCH 3/3] Add ARROW_FLIGHT_EXPORT for Windows symbol visibility --- cpp/src/arrow/flight/serialization_internal.h | 1 + cpp/src/arrow/flight/transport.h | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/flight/serialization_internal.h b/cpp/src/arrow/flight/serialization_internal.h index 6b307d5162ad..896ca5b67567 100644 --- a/cpp/src/arrow/flight/serialization_internal.h +++ b/cpp/src/arrow/flight/serialization_internal.h @@ -187,6 +187,7 @@ ARROW_FLIGHT_EXPORT arrow::Result SerializePayloadToBuffers(const FlightPayload& msg); /// \brief Deserialize FlightData from a contiguous buffer. +ARROW_FLIGHT_EXPORT arrow::Result DeserializeFlightData( const std::shared_ptr& buffer); diff --git a/cpp/src/arrow/flight/transport.h b/cpp/src/arrow/flight/transport.h index 4ce50534023f..6b9694318915 100644 --- a/cpp/src/arrow/flight/transport.h +++ b/cpp/src/arrow/flight/transport.h @@ -76,7 +76,7 @@ class FlightStatusDetail; namespace internal { /// Internal, not user-visible type used for memory-efficient reads -struct FlightData { +struct ARROW_FLIGHT_EXPORT FlightData { /// Used only for puts, may be null std::unique_ptr descriptor;