From 41b24e502f023bb68f2c8c10f901c8aca94bc4f9 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 16 Mar 2026 11:12:03 +0300 Subject: [PATCH] sn/object: Return buffered HEAD response from remote node as is Now proxy SN does not unmarshal object header from remote response and does not form client response from scratch. This reduces the number of allocations for processing. Behavior has changed: now response is signed by SN holding the object, not by the client request's server. This change is acceptable generally and because of https://github.com/nspcc-dev/neofs-node/issues/3396. Refs #3783. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + internal/protobuf/api.go | 6 + internal/protobuf/codecs.go | 19 +- internal/protobuf/seekers.go | 34 +++- internal/protobuf/seekers_test.go | 204 ++++++++++++++++++++++ pkg/services/object/acl/eacl/v2/object.go | 10 -- pkg/services/object/get/exec.go | 8 +- pkg/services/object/get/get.go | 4 +- pkg/services/object/get/prm.go | 25 +++ pkg/services/object/get/util.go | 8 + pkg/services/object/proto.go | 26 +++ pkg/services/object/server.go | 55 ++++-- 12 files changed, 366 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0290adac5d..f5c1c8ce61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Changelog for NeoFS Node - Optimized local HEAD request execution (#3783) - Unpaid container's data is deleted now (#3691) - Policer iterates engine-level object list now instead of shard-level (#3862) +- SN now forwards remote SN's response to the client as is (#3877) ### Removed - `node.persistent_sessions.path` config option from SN config (#3846) diff --git a/internal/protobuf/api.go b/internal/protobuf/api.go index 31e0546b88..d80798c863 100644 --- a/internal/protobuf/api.go +++ b/internal/protobuf/api.go @@ -16,6 +16,12 @@ import ( "google.golang.org/protobuf/encoding/protowire" ) +// Common response field numbers. +// TODO: declare in SDK. +const ( + FieldResponseBody = 1 +) + // ParseAPIVersionField parses version.Version from the next field with known // number and type at given offset. Also returns field length. func ParseAPIVersionField(buf []byte, fNum protowire.Number, fTyp protowire.Type) (version.Version, int, error) { diff --git a/internal/protobuf/codecs.go b/internal/protobuf/codecs.go index e55b7a4cb6..99dba8804e 100644 --- a/internal/protobuf/codecs.go +++ b/internal/protobuf/codecs.go @@ -12,15 +12,26 @@ type BufferedCodec struct{} // Marshal implements [encoding.CodecV2]. func (BufferedCodec) Marshal(msg any) (mem.BufferSlice, error) { - if bs, ok := msg.(mem.Buffer); ok { - return mem.BufferSlice{bs}, nil + switch v := msg.(type) { + case mem.BufferSlice: + return v, nil + case mem.Buffer: + return mem.BufferSlice{v}, nil + default: + return encoding.GetCodecV2(proto.Name).Marshal(msg) } - return encoding.GetCodecV2(proto.Name).Marshal(msg) } // Unmarshal implements [encoding.CodecV2]. func (BufferedCodec) Unmarshal(data mem.BufferSlice, msg any) error { - return encoding.GetCodecV2(proto.Name).Unmarshal(data, msg) + switch v := msg.(type) { + case *mem.BufferSlice: + data.Ref() + *v = data + return nil + default: + return encoding.GetCodecV2(proto.Name).Unmarshal(data, msg) + } } // Name implements [encoding.CodecV2]. diff --git a/internal/protobuf/seekers.go b/internal/protobuf/seekers.go index 12281d329b..e57d121a46 100644 --- a/internal/protobuf/seekers.go +++ b/internal/protobuf/seekers.go @@ -14,6 +14,10 @@ import ( // Note that SeekFieldByNumber does not check value of found field, but checks // intermediate ones for correct message traverse. func SeekFieldByNumber(buf []byte, seekNum protowire.Number) (int, int, protowire.Type, error) { + return seekFieldByNumber(buf, seekNum, true) +} + +func seekFieldByNumber(buf []byte, seekNum protowire.Number, ordered bool) (int, int, protowire.Type, error) { if err := checkFieldNumber(seekNum); err != nil { return 0, 0, 0, err } @@ -34,13 +38,15 @@ func SeekFieldByNumber(buf []byte, seekNum protowire.Number) (int, int, protowir if num == seekNum { return off, n, typ, nil } - if num > seekNum { - break - } - if num < prevNum { - return 0, 0, 0, NewUnorderedFieldsError(prevNum, num) + if ordered { + if num > seekNum { + break + } + if num < prevNum { + return 0, 0, 0, NewUnorderedFieldsError(prevNum, num) + } + prevNum = num } - prevNum = num off += n @@ -65,7 +71,21 @@ func SeekFieldByNumber(buf []byte, seekNum protowire.Number) (int, int, protowir // // If there is an error, its text contains num. func GetLENFieldBounds(buf []byte, num protowire.Number) (FieldBounds, error) { - off, tagLn, typ, err := SeekFieldByNumber(buf, num) + return getLENFieldBounds(buf, num, true) +} + +// GetLENFieldBounds seeks LEN field in buf by number and parses its boundaries. +// If field is missing, no error is returned. +// +// If field is repeated, GetLENFieldBoundsUnordered stops on first encounter. +// +// If there is an error, its text contains num. +func GetLENFieldBoundsUnordered(buf []byte, num protowire.Number) (FieldBounds, error) { + return getLENFieldBounds(buf, num, false) +} + +func getLENFieldBounds(buf []byte, num protowire.Number, ordered bool) (FieldBounds, error) { + off, tagLn, typ, err := seekFieldByNumber(buf, num, ordered) if err != nil { return FieldBounds{}, err } diff --git a/internal/protobuf/seekers_test.go b/internal/protobuf/seekers_test.go index 0fcd6c7cb1..e5136e277c 100644 --- a/internal/protobuf/seekers_test.go +++ b/internal/protobuf/seekers_test.go @@ -376,3 +376,207 @@ func TestGetLENFieldBounds(t *testing.T) { require.EqualValues(t, f.From+4, f.ValueFrom) require.EqualValues(t, f.From+len(fld3), f.To) } + +func TestGetLENFieldBoundsUnordered(t *testing.T) { + t.Run("invalid number", func(t *testing.T) { + t.Run("0", func(t *testing.T) { + for _, n := range []protowire.Number{-1, 0, 536870912} { + _, err := iprotobuf.GetLENFieldBoundsUnordered([]byte{}, n) + require.EqualError(t, err, "invalid number "+strconv.Itoa(int(n))) + } + }) + }) + + t.Run("empty buffer", func(t *testing.T) { + f, err := iprotobuf.GetLENFieldBoundsUnordered([]byte{}, 42) + require.NoError(t, err) + require.True(t, f.IsMissing()) + }) + + // #1, VARINT, 1234567890 + fld1 := []byte{8, 210, 133, 216, 204, 4} + // #100, I64, 2345678901 + fld2 := []byte{161, 6, 210, 56, 251, 13, 0, 0, 0, 0} + // #5K, LEN, Hello, world! + fld3 := []byte{194, 184, 2, 13, 72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 33} + // #1KK, I32, 3456789012 + fld4 := []byte{133, 164, 232, 3, 20, 106, 10, 206} + + t.Run("seek failure", func(t *testing.T) { + t.Run("invalid tag", func(t *testing.T) { + t.Run("invalid varint", func(t *testing.T) { + for _, tc := range invalidVarintTestcases { + if len(tc.buf) == 0 { + continue + } + t.Run(tc.name, func(t *testing.T) { + buf := slices.Concat(fld1, tc.buf) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 42) + require.ErrorContains(t, err, "parse tag at offset "+strconv.Itoa(len(fld1))) + require.ErrorContains(t, err, "parse varint") + require.ErrorContains(t, err, tc.err) + }) + } + }) + + t.Run("invalid number", func(t *testing.T) { + t.Run("0", func(t *testing.T) { + buf := slices.Concat(fld1, []byte{2}) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 42) // 0,LEN + require.EqualError(t, err, "parse tag at offset "+strconv.Itoa(len(fld1))+": invalid number 0") + }) + t.Run("negative", func(t *testing.T) { + buf := slices.Concat(fld1, []byte{250, 255, 255, 255, 255, 255, 255, 255, 255, 1}) // -1,LEN + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 42) + require.EqualError(t, err, "parse tag at offset "+strconv.Itoa(len(fld1))+": invalid number -1") + }) + }) + }) + + t.Run("parse intermediate field failure", func(t *testing.T) { + t.Run("varint", func(t *testing.T) { + fld := slices.Concat([]byte{208, 2}, uint64OverflowVarint) + buf := slices.Concat(fld1, fld, fld2, fld3, fld4) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 5000) + require.ErrorContains(t, err, "parse field #42 of VARINT type") + require.ErrorContains(t, err, "variable length integer overflow") + }) + t.Run("I64", func(t *testing.T) { + buf := slices.Concat(fld1, fld2[:len(fld2)-1]) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 5000) + require.EqualError(t, err, "parse field #100 of I64 type: unexpected EOF: need 8 bytes, left 7 in buffer") + }) + t.Run("LEN", func(t *testing.T) { + tag := []byte{210, 2} + t.Run("invalid len", func(t *testing.T) { + for _, tc := range invalidVarintTestcases { + t.Run(tc.name, func(t *testing.T) { + buf := slices.Concat(fld1, tag, tc.buf) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 43) + require.ErrorContains(t, err, "parse field #42 of LEN type") + require.ErrorContains(t, err, "parse varint") + require.ErrorContains(t, err, tc.err) + }) + } + }) + t.Run("buffer overflow", func(t *testing.T) { + t.Run("int overflow", func(t *testing.T) { + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(buf, uint64(math.MaxInt+1)) + + buf = slices.Concat(fld1, tag, buf[:n]) + + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 43) + require.EqualError(t, err, "parse field #42 of LEN type: value "+strconv.FormatUint(math.MaxInt+1, 10)+" overflows int") + }) + + for i, tc := range varintTestcases { + if tc.val == 0 || tc.val > 1<<20 { + continue + } + + buf := slices.Concat(tc.buf, make([]byte, tc.val-1)) + buf = slices.Concat(fld1, tag, buf) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 43) + require.EqualError(t, err, "parse field #42 of LEN type: unexpected EOF: need "+strconv.FormatUint(tc.val, 10)+" bytes, left "+strconv.FormatUint(tc.val-1, 10)+" in buffer", i) + } + }) + }) + t.Run("SGROUP", func(t *testing.T) { + buf := slices.Concat(fld1, []byte{211, 2}) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 43) + require.EqualError(t, err, "parse field #42 of SGROUP type: type is not supported") + }) + t.Run("EGROUP", func(t *testing.T) { + buf := slices.Concat(fld1, []byte{212, 2}) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 43) + require.EqualError(t, err, "parse field #42 of EGROUP type: type is not supported") + }) + t.Run("I32", func(t *testing.T) { + buf := slices.Concat(fld1, fld4[:len(fld4)-1]) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 1_000_001) + require.EqualError(t, err, "parse field #1000000 of I32 type: unexpected EOF: need 4 bytes, left 3 in buffer") + }) + }) + }) + + t.Run("wrong type", func(t *testing.T) { + _, err := iprotobuf.GetLENFieldBoundsUnordered([]byte{208, 2}, 42) + require.EqualError(t, err, "wrong type of field #42: expected LEN, got VARINT") + }) + + t.Run("parse failure", func(t *testing.T) { + tag := []byte{210, 2} + t.Run("invalid len", func(t *testing.T) { + for _, tc := range invalidVarintTestcases { + t.Run(tc.name, func(t *testing.T) { + buf := slices.Concat(fld1, tag, tc.buf) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 42) + require.ErrorContains(t, err, "parse field #42 of LEN type") + require.ErrorContains(t, err, "parse varint") + require.ErrorContains(t, err, tc.err) + }) + } + }) + t.Run("buffer overflow", func(t *testing.T) { + t.Run("int overflow", func(t *testing.T) { + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(buf, uint64(math.MaxInt+1)) + + buf = slices.Concat(fld1, tag, buf[:n]) + + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 42) + require.EqualError(t, err, "parse field #42 of LEN type: value "+strconv.FormatUint(math.MaxInt+1, 10)+" overflows int") + }) + + for i, tc := range varintTestcases { + if tc.val == 0 || tc.val > 1<<20 { + continue + } + + buf := slices.Concat(tc.buf, make([]byte, tc.val-1)) + buf = slices.Concat(fld1, tag, buf) + _, err := iprotobuf.GetLENFieldBoundsUnordered(buf, 42) + require.EqualError(t, err, "parse field #42 of LEN type: unexpected EOF: need "+strconv.FormatUint(tc.val, 10)+" bytes, left "+strconv.FormatUint(tc.val-1, 10)+" in buffer", i) + } + }) + }) + + message := slices.Concat(fld1, fld2, fld3, fld4) + + t.Run("missing", func(t *testing.T) { + for _, n := range []protowire.Number{2, 99, 101, 4999, 50001, 999_999, 1_000_001} { + f, err := iprotobuf.GetLENFieldBoundsUnordered(message, n) + require.NoError(t, err, n) + require.True(t, f.IsMissing()) + } + }) + + f, err := iprotobuf.GetLENFieldBoundsUnordered(fld3, 5000) + require.NoError(t, err) + require.False(t, f.IsMissing()) + require.EqualValues(t, 0, f.From) + require.EqualValues(t, 4, f.ValueFrom) + require.EqualValues(t, len(fld3), f.To) + + f, err = iprotobuf.GetLENFieldBoundsUnordered(slices.Concat(fld1, fld2, fld3), 5000) + require.NoError(t, err) + require.False(t, f.IsMissing()) + require.EqualValues(t, len(fld1)+len(fld2), f.From) + require.EqualValues(t, f.From+4, f.ValueFrom) + require.EqualValues(t, f.From+len(fld3), f.To) + + f, err = iprotobuf.GetLENFieldBoundsUnordered(message, 5_000) + require.NoError(t, err) + require.False(t, f.IsMissing()) + require.EqualValues(t, len(fld1)+len(fld2), f.From) + require.EqualValues(t, f.From+4, f.ValueFrom) + require.EqualValues(t, f.From+len(fld3), f.To) + + f, err = iprotobuf.GetLENFieldBoundsUnordered(slices.Concat(fld2, fld1, fld4, fld3), 5000) + require.NoError(t, err) + require.False(t, f.IsMissing()) + require.EqualValues(t, len(fld1)+len(fld2)+len(fld4), f.From) + require.EqualValues(t, f.From+4, f.ValueFrom) + require.EqualValues(t, f.From+len(fld3), f.To) +} diff --git a/pkg/services/object/acl/eacl/v2/object.go b/pkg/services/object/acl/eacl/v2/object.go index 167b3f944e..4ffc02b40e 100644 --- a/pkg/services/object/acl/eacl/v2/object.go +++ b/pkg/services/object/acl/eacl/v2/object.go @@ -12,7 +12,6 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" "github.com/nspcc-dev/neofs-sdk-go/version" - "google.golang.org/protobuf/encoding/protowire" ) type sysObjHdr struct { @@ -108,21 +107,12 @@ func headersFromBinaryObjectHeader(buf []byte, cnr cid.ID, id *oid.ID) ([]eaclSD res := make([]eaclSDK.Header, 0, 10) var off int - var prevNum protowire.Number for { num, typ, n, err := iprotobuf.ParseTag(buf[off:]) if err != nil { return nil, err } - if num < prevNum { - return nil, iprotobuf.NewUnorderedFieldsError(prevNum, num) - } - if num == prevNum && num != protoobject.FieldHeaderAttributes { - return nil, iprotobuf.NewRepeatedFieldError(num) - } - prevNum = num - off += n switch num { diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 86cb5325f7..6fa898d3cb 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -61,6 +61,10 @@ type execCtx struct { // If an error occurs after that, the stream is already corrupted and // no retry should be attempted. headerWritten bool + + forwardHeadFn HeadRequestForwarder + + submitBufferedResponseFn SubmitBufferedResponseFunc } type execOption func(*execCtx) @@ -73,9 +77,11 @@ const ( statusNotFound ) -func headOnly() execOption { +func headOnly(forwardRequestFn HeadRequestForwarder, submitBufferedResponseFn SubmitBufferedResponseFunc) execOption { return func(c *execCtx) { c.head = true + c.forwardHeadFn = forwardRequestFn + c.submitBufferedResponseFn = submitBufferedResponseFn } } diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 2313462651..a62005fbf1 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -259,7 +259,7 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { } if len(repRules) > 0 { - err := s.get(ctx, prm.commonPrm, headOnly(), withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules)).err + err := s.get(ctx, prm.commonPrm, headOnly(prm.forwardRequestFn, prm.submitBufferedResponseFn), withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules)).err if len(ecRules) == 0 || !errors.Is(err, apistatus.ErrObjectNotFound) { return err } @@ -275,7 +275,7 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { for i := range ecRules { repRules[i] = uint(ecRules[i].DataPartNum + ecRules[i].ParityPartNum) } - return s.get(ctx, prm.commonPrm, headOnly(), withPreSortedContainerNodes(ecNodeLists, repRules)).err + return s.get(ctx, prm.commonPrm, headOnly(prm.forwardRequestFn, prm.submitBufferedResponseFn), withPreSortedContainerNodes(ecNodeLists, repRules)).err } return s.copyECObjectHeader(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(), diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 4ccb3193e0..77091fd225 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "google.golang.org/grpc/mem" ) // Prm groups parameters of Get service call. @@ -40,12 +41,23 @@ type RangeHashPrm struct { type RequestForwarder func(context.Context, coreclient.MultiAddressClient) (*object.Object, error) type RangeRequestForwarder func(context.Context, coreclient.MultiAddressClient) ([][]byte, error) +// HeadRequestForwarder forwards particular HEAD request to remote node and +// returns buffered response. +type HeadRequestForwarder = func(context.Context, coreclient.MultiAddressClient) (mem.BufferSlice, error) + +// SubmitBufferedResponseFunc is a callback accepting buffered response. +type SubmitBufferedResponseFunc = func(mem.BufferSlice) + // HeadPrm groups parameters of Head service call. type HeadPrm struct { commonPrm buffer []byte submitLenFn func(int) + + forwardRequestFn HeadRequestForwarder + + submitBufferedResponseFn func(mem.BufferSlice) } type commonPrm struct { @@ -152,3 +164,16 @@ func (p *HeadPrm) WithBuffer(buffer []byte, submitLenFn func(int)) { p.buffer = buffer p.submitLenFn = submitLenFn } + +// SetRequestForwarder sets function called to transmit a request to a remote +// node. Once response is received without error, it is passed to handler which +// must be set via [HeadPrm.SetSubmitBufferedResponseFunc]. +func (p *HeadPrm) SetRequestForwarder(f HeadRequestForwarder) { + p.forwardRequestFn = f +} + +// SetSubmitBufferedResponseFunc sets function called with buffered response +// binary if any. +func (p *HeadPrm) SetSubmitBufferedResponseFunc(f SubmitBufferedResponseFunc) { + p.submitBufferedResponseFn = f +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index de0d35289f..d26cbc8be8 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -177,6 +177,14 @@ func (c *clientCacheWrapper) get(ctx context.Context, info coreclient.NodeInfo) } func (c *clientWrapper) getObject(exec *execCtx) (*object.Object, io.ReadCloser, error) { + if exec.headOnly() && exec.forwardHeadFn != nil { + respBuf, err := exec.forwardHeadFn(exec.ctx, c.client) + if err == nil { + exec.submitBufferedResponseFn(respBuf) + } + return nil, nil, err + } + if exec.isForwardingEnabled() { obj, err := exec.prm.forwarder(exec.ctx, c.client) return obj, nil, err diff --git a/pkg/services/object/proto.go b/pkg/services/object/proto.go index c10b456a64..08f80afa19 100644 --- a/pkg/services/object/proto.go +++ b/pkg/services/object/proto.go @@ -9,6 +9,7 @@ import ( neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/object" + protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" "github.com/nspcc-dev/neofs-sdk-go/version" "google.golang.org/protobuf/encoding/protowire" ) @@ -176,6 +177,31 @@ func shiftHeaderInHeadResponseBuffer(respBuf, hdrBuf []byte, sigf, hdrf iprotobu return bodyf } +func seekHeaderInHeadResponseBuffer(buf []byte) (int, int, error) { + f, err := iprotobuf.GetLENFieldBoundsUnordered(buf, iprotobuf.FieldResponseBody) + if err != nil || f.IsMissing() { + return -1, 0, err + } + + buf = buf[f.ValueFrom:f.To] + off := f.ValueFrom + + f, err = iprotobuf.GetLENFieldBoundsUnordered(buf, protoobject.FieldHeadResponseBodyHeader) + if err != nil || f.IsMissing() { + return -1, 0, err + } + + buf = buf[f.ValueFrom:f.To] + off += f.ValueFrom + + f, err = iprotobuf.GetLENFieldBoundsUnordered(buf, protoobject.FieldHeaderWithSignatureHeader) + if err != nil || f.IsMissing() { + return -1, 0, err + } + + return off + f.ValueFrom, f.To - f.ValueFrom, nil +} + var headResponseBufferPool = iprotobuf.NewBufferPool(headResponseBufferLen) func getBufferForHeadResponse() (*iprotobuf.MemBuffer, []byte) { diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index f4778a6614..dc7d881e77 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -51,6 +51,7 @@ import ( "github.com/nspcc-dev/tzhash/tz" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" + "google.golang.org/grpc/mem" ) // Handlers represents storage node's internal handler Object service op @@ -611,8 +612,8 @@ func (s *Server) Head(context.Context, *protoobject.HeadRequest) (*protoobject.H } // HeadBuffered serves req and returns response as either -// [*protoobject.HeadResponse] or [mem.Buffer]. The buffer must be freed -// eventually. +// [*protoobject.HeadResponse], [mem.BufferSlice] or [mem.Buffer]. All buffers +// must be freed eventually. func (s *Server) HeadBuffered(ctx context.Context, req *protoobject.HeadRequest) any { var ( err error @@ -671,15 +672,44 @@ func (s *Server) HeadBuffered(ctx context.Context, req *protoobject.HeadRequest) p.WithBuffer(hdrBuf, func(ln int) { hdrLen = ln }) + var proxyRespBuf mem.BufferSlice + p.SetSubmitBufferedResponseFunc(func(respBuf mem.BufferSlice) { proxyRespBuf = respBuf }) + err = s.handlers.Head(ctx, p) if err != nil { return s.makeStatusHeadResponse(err, needSignResp) } - buffered := hdrLen >= 0 - + var buffered bool var sigf, hdrf iprotobuf.FieldBounds - if buffered { + if proxyRespBuf != nil { + if !recheckEACL { + return proxyRespBuf + } + + var respBuf []byte + if len(proxyRespBuf) == 1 { + respBuf = proxyRespBuf[0].ReadOnlyData() + } else { + // This concats all buffers. It is definitely possible to loop over ReadOnlyData(), but that's not trivial. + // And object headers fit into single buffer mostly. + respBuf = proxyRespBuf.Materialize() + } + + off, ln, err := seekHeaderInHeadResponseBuffer(respBuf) + if err != nil { + return s.makeStatusHeadResponse(fmt.Errorf("seek header field in proxy response: %w", err), needSignResp) + } + + if off >= 0 { + hdrBuf = respBuf[off:] + hdrf.To = ln + } else { + hdrBuf = []byte{} + } + + buffered = true + } else if buffered = hdrLen >= 0; buffered { _, sigf, hdrf, err = iobject.GetNonPayloadFieldBounds(hdrBuf[:hdrLen]) if err != nil { return s.makeStatusHeadResponse(err, needSignResp) @@ -698,6 +728,10 @@ func (s *Server) HeadBuffered(ctx context.Context, req *protoobject.HeadRequest) err = eACLErr(reqInfo, err) // defer return s.makeStatusHeadResponse(err, needSignResp) } + + if proxyRespBuf != nil { + return proxyRespBuf + } } if !buffered { @@ -774,7 +808,7 @@ func convertHeadPrm(signer ecdsa.PrivateKey, req *protoobject.HeadRequest, resp if meta == nil { return getsvc.HeadPrm{}, errors.New("missing meta header") } - p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) (*object.Object, error) { + p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) (mem.BufferSlice, error) { var err error onceResign.Do(func() { req.MetaHeader = &protosession.RequestMetaHeader{ @@ -788,10 +822,11 @@ func convertHeadPrm(signer ecdsa.PrivateKey, req *protoobject.HeadRequest, resp return nil, err } - var hdr *object.Object - return hdr, c.ForEachGRPCConn(ctx, func(ctx context.Context, conn *grpc.ClientConn) error { - var err error - hdr, err = getHeaderFromRemoteNode(ctx, conn, req, addr.Object()) + var respBuf mem.BufferSlice + return respBuf, c.ForEachGRPCConn(ctx, func(ctx context.Context, conn *grpc.ClientConn) error { + // following is protoobject.objectServiceClient.Head() with custom response codec + var err = conn.Invoke(ctx, protoobject.ObjectService_Head_FullMethodName, req, &respBuf, + grpc.StaticMethod(), grpc.ForceCodecV2(iprotobuf.BufferedCodec{})) return err // TODO: log error }) })