Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Changelog for NeoFS Node
- Optimized locking for reputation data collector (#3851)
- Optimized local HEAD request execution (#3783)
- Unpaid container's data is deleted now (#3691)
- Policer iterates engine-level object list now instead of shard-level (#3862)

### Removed
- `node.persistent_sessions.path` config option from SN config (#3846)
Expand Down
4 changes: 3 additions & 1 deletion pkg/core/object/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// AddressWithAttributes groups object's address and its attributes.
// AddressWithAttributes groups object's address, its attributes and the IDs of
// local shards that store the object.
type AddressWithAttributes struct {
Address oid.Address
Type object.Type
Attributes []string
ShardIDs []string
}
118 changes: 70 additions & 48 deletions pkg/local_object_storage/engine/list.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package engine

import (
"slices"

objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// ErrEndOfListing is returned from an object listing with cursor
Expand All @@ -16,15 +16,25 @@ var ErrEndOfListing = shard.ErrEndOfListing
// [StorageEngine.ListWithCursor] and can be reused as a parameter for it for
// subsequent requests.
type Cursor struct {
shardID string
shardCursor *shard.Cursor
}

// NewCursor creates a Cursor positioned at the given container and object.
// The next call to [StorageEngine.ListWithCursor] will return objects strictly
// after this address.
func NewCursor(cnr cid.ID, obj oid.ID) *Cursor {
return &Cursor{shardCursor: shard.NewCursor(cnr, obj)}
}

// ListWithCursor lists physical objects available in the engine starting
// from the cursor. It includes regular, tombstone and storage group objects.
// Does not include inhumed objects. Use cursor value from the response
// for consecutive requests (it's nil when iteration is over).
//
// Objects present on multiple shards are deduplicated: each unique address
// appears exactly once, with [objectcore.AddressWithAttributes.ShardIDs]
// containing the IDs of all shards that hold it.
//
// Optional attrs specifies attributes to include in the result. If object does
// not have requested attribute, corresponding element in the result is empty.
//
Expand All @@ -35,63 +45,75 @@ func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor, attrs ...st
defer elapsed(e.metrics.AddListObjectsDuration)()
}

result := make([]objectcore.AddressWithAttributes, 0, count)

// 1. Get available shards and sort them.
e.mtx.RLock()
shardIDs := make([]string, 0, len(e.shards))
for id := range e.shards {
shardIDs = append(shardIDs, id)
}
e.mtx.RUnlock()

if len(shardIDs) == 0 {
return nil, nil, ErrEndOfListing
}

slices.Sort(shardIDs)

// 2. Prepare cursor object.
if cursor == nil {
cursor = &Cursor{shardID: shardIDs[0]}
cursor = &Cursor{shardCursor: new(shard.Cursor)}
}

// 3. Iterate over available shards. Skip unavailable shards.
for i := range shardIDs {
if len(result) >= int(count) {
break
}
shards := e.unsortedShards()
var result, buf []objectcore.AddressWithAttributes

if shardIDs[i] < cursor.shardID {
cnr, obj := cursor.shardCursor.ContainerID(), cursor.shardCursor.LastObjectID()
for _, sh := range shards {
cursor.shardCursor.Reset(cnr, obj)
res, _, err := sh.ListWithCursor(int(count), cursor.shardCursor, attrs...)
if err != nil || len(res) == 0 {
continue
}

e.mtx.RLock()
shardInstance, ok := e.shards[shardIDs[i]]
e.mtx.RUnlock()
if !ok {
continue
prev := result
result = mergeListResults(buf, result, res, sh.ID().String(), int(count))
if prev != nil {
buf = prev[:0]
}

count := uint32(int(count) - len(result))
var shCursor *shard.Cursor
if shardIDs[i] == cursor.shardID {
shCursor = cursor.shardCursor
}

res, shCursor, err := shardInstance.ListWithCursor(int(count), shCursor, attrs...)
if err != nil {
continue
}

result = append(result, res...)
cursor.shardCursor = shCursor
cursor.shardID = shardIDs[i]
}

if len(result) == 0 {
return nil, nil, ErrEndOfListing
}

last := result[len(result)-1]
cursor.shardCursor.Reset(last.Address.Container(), last.Address.Object())
return result, cursor, nil
}

// mergeListResults merges a sorted accumulated result with a new sorted slice
// of items from a single shard into a single sorted deduplicated slice of at
// most count items. Objects present on multiple shards have their ShardIDs merged.
func mergeListResults(out, a, b []objectcore.AddressWithAttributes, shardID string, count int) []objectcore.AddressWithAttributes {
if len(a) == 0 {
end := min(count, len(b))
for i := range end {
b[i].ShardIDs = []string{shardID}
}
return b[:end]
}
if out == nil {
out = make([]objectcore.AddressWithAttributes, 0, min(count, len(a)+len(b)))
}
i, j := 0, 0
for len(out) < count && (i < len(a) || j < len(b)) {
var cmp int
if i >= len(a) {
cmp = 1
} else if j >= len(b) {
cmp = -1
} else {
cmp = a[i].Address.Compare(b[j].Address)
}

if cmp > 0 {
item := b[j]
item.ShardIDs = []string{shardID}
out = append(out, item)
j++
} else {
item := a[i]
if cmp == 0 {
item.ShardIDs = append(item.ShardIDs, shardID)
j++
}
out = append(out, item)
i++
}
}
return out
}
133 changes: 124 additions & 9 deletions pkg/local_object_storage/engine/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package engine
import (
"errors"
"fmt"
"sort"
"slices"
"strconv"
"testing"

objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -37,7 +39,9 @@ func TestListWithCursor(t *testing.T) {
expected = append(expected, objectcore.AddressWithAttributes{Type: object.TypeRegular, Address: obj.Address()})
}

expected = sortAddresses(expected)
slices.SortFunc(expected, func(a, b objectcore.AddressWithAttributes) int {
return a.Address.Compare(b.Address)
})

addrs, cursor, err := e.ListWithCursor(1, nil)
require.NoError(t, err)
Expand All @@ -55,7 +59,7 @@ func TestListWithCursor(t *testing.T) {
_, _, err = e.ListWithCursor(1, cursor)
require.ErrorIs(t, err, ErrEndOfListing)

got = sortAddresses(got)
got = stripShardIDs(got)
require.Equal(t, expected, got)

t.Run("attributes", func(t *testing.T) {
Expand Down Expand Up @@ -114,19 +118,42 @@ func TestListWithCursor(t *testing.T) {
} {
t.Run(fmt.Sprintf("total=%d,count=%d", totalObjects, count), func(t *testing.T) {
collected := collectListWithCursor(t, s, count, staticAttr, commonAttr, groupAttr, "$Object:ownerID")
require.ElementsMatch(t, exp, collected)
require.ElementsMatch(t, exp, stripShardIDs(collected))
})
}
})
}
})
}

func sortAddresses(addrWithType []objectcore.AddressWithAttributes) []objectcore.AddressWithAttributes {
sort.Slice(addrWithType, func(i, j int) bool {
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
})
return addrWithType
func TestListWithCursor_Dedup(t *testing.T) {
s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2)
e := testNewEngineWithShards(s1, s2)
t.Cleanup(func() { _ = e.Close() })

obj := generateObjectWithCID(cidtest.ID())
require.NoError(t, s1.Put(obj, nil))
require.NoError(t, s2.Put(obj, nil))

res, cursor, err := e.ListWithCursor(2, nil)
require.NoError(t, err)

_, _, err = e.ListWithCursor(2, cursor)
require.ErrorIs(t, err, ErrEndOfListing)

require.Len(t, res, 1)
require.Equal(t, obj.Address(), res[0].Address)
require.Len(t, res[0].ShardIDs, 2)
}

func stripShardIDs(addrs []objectcore.AddressWithAttributes) []objectcore.AddressWithAttributes {
res := make([]objectcore.AddressWithAttributes, len(addrs))
for i, a := range addrs {
a.ShardIDs = nil
res[i] = a
}
return res
}

func collectListWithCursor(t *testing.T, s *StorageEngine, count uint32, attrs ...string) []objectcore.AddressWithAttributes {
Expand All @@ -142,3 +169,91 @@ func collectListWithCursor(t *testing.T, s *StorageEngine, count uint32, attrs .
require.NoError(t, err)
}
}

func TestMergeListResults(t *testing.T) {
var cnr = cid.ID{0xff}

mkItems := func(ids ...oid.ID) []objectcore.AddressWithAttributes {
items := make([]objectcore.AddressWithAttributes, len(ids))
for i, id := range ids {
items[i].Address = oid.NewAddress(cnr, id)
}
return items
}

type shardInput struct {
id string
items []objectcore.AddressWithAttributes
}

mergeAll := func(count int, shards ...shardInput) []objectcore.AddressWithAttributes {
var result []objectcore.AddressWithAttributes
for _, s := range shards {
result = mergeListResults(nil, result, s.items, s.id, count)
}
return result
}

t.Run("nil input", func(t *testing.T) {
require.Empty(t, mergeListResults(nil, nil, nil, "A", 10))
})
t.Run("empty shards", func(t *testing.T) {
require.Empty(t, mergeAll(10,
shardInput{"A", nil},
shardInput{"B", nil},
shardInput{"C", nil},
))
})
t.Run("single shard", func(t *testing.T) {
res := mergeListResults(nil, nil, mkItems(oid.ID{1}, oid.ID{2}, oid.ID{3}), "A", 10)
require.Len(t, res, 3)
for i, r := range res {
require.Equal(t, oid.NewAddress(cnr, oid.ID{byte(i + 1)}), r.Address)
require.Equal(t, []string{"A"}, r.ShardIDs)
}
})
t.Run("two shards no duplicates", func(t *testing.T) {
res := mergeListResults(nil, nil, mkItems(oid.ID{1}, oid.ID{3}, oid.ID{5}), "A", 10)
res = mergeListResults(nil, res, mkItems(oid.ID{2}, oid.ID{4}, oid.ID{6}), "B", 10)
require.Len(t, res, 6)
for i, r := range res {
require.Equal(t, oid.NewAddress(cnr, oid.ID{byte(i + 1)}), r.Address)
}
require.Equal(t, []string{"A"}, res[0].ShardIDs)
require.Equal(t, []string{"B"}, res[1].ShardIDs)
})
t.Run("two shards with duplicates", func(t *testing.T) {
res := mergeListResults(nil, nil, mkItems(oid.ID{1}, oid.ID{2}, oid.ID{3}), "A", 10)
res = mergeListResults(nil, res, mkItems(oid.ID{2}, oid.ID{3}, oid.ID{4}), "B", 10)
require.Len(t, res, 4)
require.Equal(t, []string{"A"}, res[0].ShardIDs) // oid{1}
require.ElementsMatch(t, []string{"A", "B"}, res[1].ShardIDs) // oid{2}
require.ElementsMatch(t, []string{"A", "B"}, res[2].ShardIDs) // oid{3}
require.Equal(t, []string{"B"}, res[3].ShardIDs) // oid{4}
})
t.Run("four shards with duplicates", func(t *testing.T) {
res := mergeAll(10,
shardInput{"A", mkItems(oid.ID{1}, oid.ID{2}, oid.ID{3})},
shardInput{"B", mkItems(oid.ID{2}, oid.ID{3}, oid.ID{4}, oid.ID{5})},
shardInput{"C", mkItems(oid.ID{3}, oid.ID{4}, oid.ID{5})},
shardInput{"D", mkItems(oid.ID{1}, oid.ID{4}, oid.ID{6}, oid.ID{7})},
)
require.Len(t, res, 7)
require.ElementsMatch(t, []string{"A", "D"}, res[0].ShardIDs) // oid{1}
require.ElementsMatch(t, []string{"A", "B"}, res[1].ShardIDs) // oid{2}
require.ElementsMatch(t, []string{"A", "B", "C"}, res[2].ShardIDs) // oid{3}
require.ElementsMatch(t, []string{"B", "C", "D"}, res[3].ShardIDs) // oid{4}
require.ElementsMatch(t, []string{"B", "C"}, res[4].ShardIDs) // oid{5}
})
t.Run("count limits result", func(t *testing.T) {
res := mergeListResults(nil, nil, mkItems(oid.ID{1}, oid.ID{2}, oid.ID{3}), "A", 3)
res = mergeListResults(nil, res, mkItems(oid.ID{1}, oid.ID{4}, oid.ID{5}), "B", 3)
require.Len(t, res, 3)
require.Equal(t, oid.NewAddress(cnr, oid.ID{1}), res[0].Address)
require.Equal(t, oid.NewAddress(cnr, oid.ID{2}), res[1].Address)
require.Equal(t, oid.NewAddress(cnr, oid.ID{3}), res[2].Address)
})
t.Run("count zero", func(t *testing.T) {
require.Empty(t, mergeListResults(nil, nil, mkItems(oid.ID{1}, oid.ID{2}), "A", 0))
})
}
24 changes: 24 additions & 0 deletions pkg/local_object_storage/metabase/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,30 @@ type Cursor struct {
attrsPrefix []byte
}

// NewCursor creates a Cursor positioned at the given container and object.
func NewCursor(cnr cid.ID, obj oid.ID) *Cursor {
return &Cursor{containerID: cnr, lastObjectID: obj}
}

// Reset repositions the cursor to the given container and object, reusing the
// existing attrsPrefix buffer to avoid re-allocation on repeated calls.
func (c *Cursor) Reset(cnr cid.ID, obj oid.ID) {
c.containerID = cnr
c.lastObjectID = obj
// attrsPrefix is intentionally kept: its capacity is reused by
// selectNFromBucket on the next ListWithCursor call.
}

// ContainerID returns the container ID stored in the cursor.
func (c *Cursor) ContainerID() cid.ID {
return c.containerID
}

// LastObjectID returns the last object ID stored in the cursor.
func (c *Cursor) LastObjectID() oid.ID {
return c.lastObjectID
}

// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests.
Expand Down
Loading
Loading