diff --git a/CHANGELOG.md b/CHANGELOG.md index b5ed898749..a27a5b7a3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Changelog for NeoFS Node - Unpaid container's data is deleted now (#3691) - Policer iterates engine-level object list now instead of shard-level (#3862) - SN now ignores `copies_number` field of `object.PutRequest.Body.Init` message (#3830) +- Policer starts from a random offset (#3879) ### Removed - `node.persistent_sessions.path` config option from SN config (#3846) diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index d57354f34d..72371e03b9 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -26,6 +26,16 @@ func NewCursor(cnr cid.ID, obj oid.ID) *Cursor { return &Cursor{shardCursor: shard.NewCursor(cnr, obj)} } +// ContainerID returns the container ID stored in the cursor. +func (c *Cursor) ContainerID() cid.ID { + return c.shardCursor.ContainerID() +} + +// ObjectID returns the object ID stored in the cursor. +func (c *Cursor) ObjectID() oid.ID { + return c.shardCursor.LastObjectID() +} + // ListWithCursor lists physical objects available in the engine starting // from the cursor. It includes regular, tombstone and storage group objects. // Does not include inhumed objects. Use cursor value from the response diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index 948f219bbc..6b863fd854 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -62,15 +62,21 @@ func (oiw *objectsInWork) inWork(addr oid.Address) bool { return ok } -func (oiw *objectsInWork) remove(addr oid.Address) { +func (oiw *objectsInWork) tryAdd(addr oid.Address) bool { oiw.m.Lock() - delete(oiw.objs, addr) - oiw.m.Unlock() + defer oiw.m.Unlock() + + if _, ok := oiw.objs[addr]; ok { + return false + } + + oiw.objs[addr] = struct{}{} + return true } -func (oiw *objectsInWork) add(addr oid.Address) { +func (oiw *objectsInWork) remove(addr oid.Address) { oiw.m.Lock() - oiw.objs[addr] = struct{}{} + delete(oiw.objs, addr) oiw.m.Unlock() } diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 4fc65b36f0..c5df76dd18 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -80,6 +80,7 @@ func TestConsistency(t *testing.T) { t.Run("startup value", func(t *testing.T) { wp, err := ants.NewPool(100) require.NoError(t, err) + t.Cleanup(wp.Release) var ( mockM = &mockMetrics{} @@ -107,6 +108,7 @@ func TestConsistency(t *testing.T) { t.Run("metrics change", func(t *testing.T) { wp, err := ants.NewPool(100) require.NoError(t, err) + t.Cleanup(wp.Release) var ( cnr = cidtest.ID() @@ -146,6 +148,7 @@ func TestConsistency(t *testing.T) { localNode.setListResulsts(nil, engine.ErrEndOfListing) delayCh <- struct{}{} + delayCh <- struct{}{} require.Eventually(t, func() bool { return mockM.consistency.Load() }, 3*time.Second, 50*time.Millisecond) @@ -241,7 +244,7 @@ func testDefaultREPWithType(t *testing.T, typ object.Type) { }) t.Run("backup", func(t *testing.T) { for i := defaultRep; i < len(nodes); i++ { - testRepCheck(t, defaultRep, localObj, nodes, i, true, allNotFound, expShortage, false, slices.Delete(slices.Clone(nodes), i, i+1)) + testRepCheck(t, defaultRep, localObj, nodes, i, true, allNotFound, expShortage, !broadcast, slices.Delete(slices.Clone(nodes), i, i+1)) } }) }) @@ -251,9 +254,9 @@ func testDefaultREPWithType(t *testing.T, typ object.Type) { expShortage = uint32(len(nodes)) } t.Run("in netmap", func(t *testing.T) { - logBuf := testRepCheck(t, defaultRep, localObj, nodes, -1, true, allNotFound, expShortage, false, nodes) + logBuf := testRepCheck(t, defaultRep, localObj, nodes, -1, true, allNotFound, expShortage, true, nodes) logBuf.AssertContains(testutil.LogEntry{ - Level: zap.InfoLevel, Message: "node outside the container, but nobody stores the object, holding the replica...", Fields: map[string]any{ + Level: zap.InfoLevel, Message: "node outside the container, removing the replica so as not to violate the storage policy...", Fields: map[string]any{ "component": "Object Policer", "object": objAddr.String(), }, @@ -665,6 +668,7 @@ func testRepCheck(t *testing.T, rep uint, localObj objectcore.AddressWithAttribu wp, err := ants.NewPool(100) require.NoError(t, err) + t.Cleanup(wp.Release) localNode := newTestLocalNode() localNode.objList = []objectcore.AddressWithAttributes{localObj} @@ -678,8 +682,6 @@ func testRepCheck(t *testing.T, rep uint, localObj objectcore.AddressWithAttribu mockNet.inNetmap = localInNM } - r := newTestReplicator(t) - conns := newMockAPIConnections() for i := range nodes { if i != localIdx { @@ -687,6 +689,17 @@ func testRepCheck(t *testing.T, rep uint, localObj objectcore.AddressWithAttribu } } + r := newTestReplicator(t) + r.success = true + if expShortage > 0 { + r.successfulCopies = expShortage + r.onTask = func(_ replicator.Task, successfulNodes []netmap.NodeInfo) { + for _, node := range successfulNodes { + conns.setHeadResult(node, localObj.Address, nil) + } + } + } + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) p := New(neofscryptotest.Signer(), WithPool(wp), @@ -704,13 +717,7 @@ func testRepCheck(t *testing.T, rep uint, localObj objectcore.AddressWithAttribu t.Cleanup(cancel) go p.Run(ctx) - require.Eventually(t, func() bool { - return lb.Contains(testutil.LogEntry{ - Level: zap.InfoLevel, Message: "finished local storage cycle", Fields: map[string]any{ - "component": "Object Policer", - "cleanCycle": true, - }}) - }, 3*time.Second, 50*time.Millisecond) + waitForPolicerResult(t, p, lb, mockNet, localNode, localObj.Address, expRedundant, expShortage > 0, r) var taskV = r.task.Load() if expShortage > 0 { @@ -794,8 +801,7 @@ func TestPolicer_Run_EC(t *testing.T) { t.Run(tc.name, func(t *testing.T) { localObj.Attributes = []string{tc.ruleIdx, tc.partIdx, parentOIDAttr} - logBuf := testECCheck(t, rule, localObj, nodes, 0, allOK, false, nil) - logBuf.AssertContains(testutil.LogEntry{ + testECCheckWaitLog(t, rule, localObj, nodes, 0, allOK, testutil.LogEntry{ Level: zap.ErrorLevel, Message: "failed to decode EC part info from attributes, skip object", Fields: map[string]any{"component": "Object Policer", "object": localObj.Address.String(), "error": tc.err}, }) @@ -811,8 +817,7 @@ func TestPolicer_Run_EC(t *testing.T) { mockNet := newMockNetwork() mockNet.setObjectNodesECResult(cnr, parentOID, nodes, rule) - logBuf := testECCheckWithNetwork(t, mockNet, localObj, nodes, 0, allOK, false, nil) - logBuf.AssertContains(testutil.LogEntry{ + testECCheckWithNetworkAndShortage(t, mockNet, localObj, nodes, 0, allOK, false, nil, false, 0, testutil.LogEntry{ Level: zap.ErrorLevel, Message: "received EC parent OID with unexpected len from local storage, skip object", Fields: map[string]any{"component": "Object Policer", "object": localObj.Address.String(), "len": json.Number("31")}, }) @@ -1074,7 +1079,7 @@ func TestPolicer_Run_EC(t *testing.T) { } } -func testECCheck(t *testing.T, rule iec.Rule, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, localIdx int, headErrs []error, expRedundant bool, expCandidates []netmap.NodeInfo) *testutil.LogBuffer { +func newECMockNetwork(t *testing.T, rule iec.Rule, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, localIdx int) *mockNetwork { require.Len(t, localObj.Attributes, 3) var sortOID oid.ID @@ -1091,9 +1096,19 @@ func testECCheck(t *testing.T, rule iec.Rule, localObj objectcore.AddressWithAtt mockNet.pubKey = nodes[localIdx].PublicKey() } + return mockNet +} + +func testECCheck(t *testing.T, rule iec.Rule, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, localIdx int, headErrs []error, expRedundant bool, expCandidates []netmap.NodeInfo) *testutil.LogBuffer { + mockNet := newECMockNetwork(t, rule, localObj, nodes, localIdx) return testECCheckWithNetwork(t, mockNet, localObj, nodes, localIdx, headErrs, expRedundant, expCandidates) } +func testECCheckWaitLog(t *testing.T, rule iec.Rule, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, localIdx int, headErrs []error, exp testutil.LogEntry) { + mockNet := newECMockNetwork(t, rule, localObj, nodes, localIdx) + testECCheckWithNetworkAndShortage(t, mockNet, localObj, nodes, localIdx, headErrs, false, nil, false, 0, exp) +} + func testECCheckWithNetwork(t *testing.T, mockNet *mockNetwork, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, localIdx int, headErrs []error, expRedundant bool, expCandidates []netmap.NodeInfo) *testutil.LogBuffer { expShortage := uint32(0) @@ -1105,18 +1120,16 @@ func testECCheckWithNetwork(t *testing.T, mockNet *mockNetwork, localObj objectc } func testECCheckWithNetworkAndShortage(t *testing.T, mockNet *mockNetwork, localObj objectcore.AddressWithAttributes, nodes []netmap.NodeInfo, - localIdx int, headErrs []error, expRedundant bool, expCandidates []netmap.NodeInfo, repSuccess bool, expShortage uint32) *testutil.LogBuffer { + localIdx int, headErrs []error, expRedundant bool, expCandidates []netmap.NodeInfo, repSuccess bool, expShortage uint32, waitLog ...testutil.LogEntry) *testutil.LogBuffer { require.Equal(t, len(nodes), len(headErrs)) wp, err := ants.NewPool(100) require.NoError(t, err) + t.Cleanup(wp.Release) localNode := newTestLocalNode() localNode.objList = []objectcore.AddressWithAttributes{localObj} - r := newTestReplicator(t) - r.success = repSuccess - conns := newMockAPIConnections() for i := range nodes { if i != localIdx { @@ -1124,6 +1137,17 @@ func testECCheckWithNetworkAndShortage(t *testing.T, mockNet *mockNetwork, local } } + r := newTestReplicator(t) + r.success = repSuccess + if repSuccess && expShortage > 0 { + r.successfulCopies = expShortage + r.onTask = func(_ replicator.Task, successfulNodes []netmap.NodeInfo) { + for _, node := range successfulNodes { + conns.setHeadResult(node, localObj.Address, nil) + } + } + } + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) p := New(neofscryptotest.Signer(), WithPool(wp), @@ -1141,13 +1165,12 @@ func testECCheckWithNetworkAndShortage(t *testing.T, mockNet *mockNetwork, local t.Cleanup(cancel) go p.Run(ctx) - require.Eventually(t, func() bool { - return lb.Contains(testutil.LogEntry{ - Level: zap.InfoLevel, Message: "finished local storage cycle", Fields: map[string]any{ - "component": "Object Policer", - "cleanCycle": true, - }}) - }, 3*time.Second, 50*time.Millisecond) + var expLog *testutil.LogEntry + if len(waitLog) > 0 { + expLog = &waitLog[0] + } + + waitForPolicerResult(t, p, lb, mockNet, localNode, localObj.Address, expRedundant, len(expCandidates) > 0, r, expLog) var taskV = r.task.Load() if len(expCandidates) > 0 { @@ -1171,10 +1194,41 @@ func testECCheckWithNetworkAndShortage(t *testing.T, mockNet *mockNetwork, local return lb } +func waitForPolicerResult(t *testing.T, p *Policer, lb *testutil.LogBuffer, mockNet *mockNetwork, localNode *testLocalNode, addr oid.Address, expRedundant bool, expectTask bool, r *testReplicator, waitLog ...*testutil.LogEntry) { + t.Helper() + + var expLog *testutil.LogEntry + if len(waitLog) > 0 { + expLog = waitLog[0] + } + + require.Eventually(t, func() bool { + if p.objsInWork.inWork(addr) { + return false + } + + if expRedundant { + return slices.Equal(localNode.deletedObjects(), []oid.Address{addr}) + } + + if expectTask { + return r.task.Load() != nil + } + + if expLog != nil { + return lb.Contains(*expLog) + } + + return mockNet.totalGetNodesCalls() > 0 + }, 3*time.Second, 50*time.Millisecond) +} + type testReplicator struct { - t *testing.T - task atomic.Value - success bool + t *testing.T + task atomic.Value + success bool + successfulCopies uint32 + onTask func(replicator.Task, []netmap.NodeInfo) } func newTestReplicator(t *testing.T) *testReplicator { @@ -1188,9 +1242,24 @@ func (x *testReplicator) HandleTask(ctx context.Context, task replicator.Task, r require.NotNil(x.t, r) nodes := task.Nodes() - require.NotEmpty(x.t, nodes) - if x.success { - r.SubmitSuccessfulReplication(nodes[0]) + + var successfulNodes []netmap.NodeInfo + if x.success && len(nodes) > 0 { + copies := x.successfulCopies + if copies == 0 { + copies = 1 + } + if copies > uint32(len(nodes)) { + copies = uint32(len(nodes)) + } + + successfulNodes = append(successfulNodes, nodes[:copies]...) + for i := range successfulNodes { + r.SubmitSuccessfulReplication(successfulNodes[i]) + } + } + if x.onTask != nil { + x.onTask(task, successfulNodes) } // Prevent collisions on subsequent iterations @@ -1215,12 +1284,15 @@ type mockNetwork struct { inNetmap bool - getNodes map[getNodesKey]getNodesValue + mtx sync.RWMutex + getNodes map[getNodesKey]getNodesValue + getNodesCalled map[getNodesKey]uint64 } func newMockNetwork() *mockNetwork { return &mockNetwork{ - getNodes: make(map[getNodesKey]getNodesValue), + getNodes: make(map[getNodesKey]getNodesValue), + getNodesCalled: make(map[getNodesKey]uint64), } } @@ -1245,11 +1317,39 @@ func (x *mockNetwork) PublicKey() []byte { } func (x *testLocalNode) ListWithCursor(_ uint32, c *engine.Cursor, _ ...string) ([]objectcore.AddressWithAttributes, *engine.Cursor, error) { - if c != nil || len(x.objList) == 0 { - return nil, c, engine.ErrEndOfListing + if len(x.objList) == 0 { + return nil, nil, engine.ErrEndOfListing + } + + x.delMtx.RLock() + withoutDel := make([]objectcore.AddressWithAttributes, 0, len(x.objList)) + for _, obj := range x.objList { + if _, ok := x.del[obj.Address]; !ok { + withoutDel = append(withoutDel, obj) + } + } + x.delMtx.RUnlock() + + if len(withoutDel) == 0 { + return nil, nil, engine.ErrEndOfListing + } + + if c == nil { + lastObj := withoutDel[len(withoutDel)-1] + return withoutDel, engine.NewCursor(lastObj.Address.Container(), lastObj.Address.Object()), nil + } + cursorAddr := oid.NewAddress(c.ContainerID(), c.ObjectID()) + res := make([]objectcore.AddressWithAttributes, 0, len(withoutDel)) + for _, obj := range withoutDel { + if obj.Address.Compare(cursorAddr) > 0 { + res = append(res, obj) + } } - return x.objList, new(engine.Cursor), nil + if len(res) == 0 { + return nil, nil, engine.ErrEndOfListing + } + return res, engine.NewCursor(res[len(res)-1].Address.Container(), res[len(res)-1].Address.Object()), nil } func (x *testLocalNode) deletedObjects() []oid.Address { @@ -1301,21 +1401,29 @@ func newGetNodesKey(cnr cid.ID, obj oid.ID) getNodesKey { } func (x *mockNetwork) setObjectNodesRepResult(cnr cid.ID, obj oid.ID, nodes []netmap.NodeInfo, rep uint) { + x.mtx.Lock() x.getNodes[newGetNodesKey(cnr, obj)] = getNodesValue{ nodes: nodes, repRules: []uint{rep}, } + x.mtx.Unlock() } func (x *mockNetwork) setObjectNodesECResult(cnr cid.ID, obj oid.ID, nodes []netmap.NodeInfo, rule iec.Rule) { + x.mtx.Lock() x.getNodes[newGetNodesKey(cnr, obj)] = getNodesValue{ nodes: nodes, ecRules: []iec.Rule{rule}, } + x.mtx.Unlock() } func (x *mockNetwork) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, []uint, []iec.Rule, error) { - v, ok := x.getNodes[newGetNodesKey(addr.Container(), addr.Object())] + key := newGetNodesKey(addr.Container(), addr.Object()) + x.mtx.Lock() + x.getNodesCalled[key]++ + v, ok := x.getNodes[key] + x.mtx.Unlock() if !ok { return nil, nil, nil, errors.New("[test] unexpected policy requested") } @@ -1323,6 +1431,17 @@ func (x *mockNetwork) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, return [][]netmap.NodeInfo{v.nodes}, v.repRules, v.ecRules, nil } +func (x *mockNetwork) totalGetNodesCalls() uint64 { + x.mtx.RLock() + defer x.mtx.RUnlock() + + var res uint64 + for _, v := range x.getNodesCalled { + res += v + } + return res +} + type nopNodeLoader struct{} func (nopNodeLoader) ObjectServiceLoad() float64 { @@ -1342,14 +1461,19 @@ func newConnKey(node netmap.NodeInfo, objAddr oid.Address) connObjectKey { } type mockAPIConnections struct { + lock sync.RWMutex head map[connObjectKey]error } func (x *mockAPIConnections) setHeadResult(node netmap.NodeInfo, addr oid.Address, err error) { + x.lock.Lock() + defer x.lock.Unlock() x.head[newConnKey(node, addr)] = err } func (x *mockAPIConnections) headObject(_ context.Context, node netmap.NodeInfo, addr oid.Address, _ bool, _ []string) (object.Object, error) { + x.lock.RLock() + defer x.lock.RUnlock() v, ok := x.head[newConnKey(node, addr)] if !ok { return object.Object{}, errors.New("[test] unexpected conn/object accessed") diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 1820e7c09c..c94ef300af 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -2,18 +2,22 @@ package policer import ( "context" + "crypto/rand" "errors" "time" iec "github.com/nspcc-dev/neofs-node/internal/ec" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) func (p *Policer) Run(ctx context.Context) { defer func() { + p.checkECPartsWorkerPool.Release() p.log.Info("routine stopped") }() @@ -55,20 +59,49 @@ func (w *boostWindow) record(hadReplicated bool) (withReplication int) { return withReplication } +func randomAddress() oid.Address { + var cnr cid.ID + var obj oid.ID + _, _ = rand.Read(cnr[:]) + _, _ = rand.Read(obj[:]) + var addr oid.Address + addr.SetContainer(cnr) + addr.SetObject(obj) + return addr +} + func (p *Policer) shardPolicyWorker(ctx context.Context) { var ( - addrs []objectcore.AddressWithAttributes - cursor *engine.Cursor - win boostWindow - boosted bool - err error + addrs []objectcore.AddressWithAttributes + cursor *engine.Cursor + win boostWindow + boosted bool + err error + wrapped bool + stopAddr oid.Address ) p.mtx.RLock() t := time.NewTimer(p.repCooldown) p.mtx.RUnlock() + stopAddr = randomAddress() + cursor = engine.NewCursor(stopAddr.Container(), stopAddr.Object()) + + cycleFinished := func() { + cleanCycle := !p.hadToReplicate.Swap(false) + if cleanCycle { + p.metrics.SetPolicerConsistency(true) + } + + p.log.Info("finished local storage cycle", zap.Bool("cleanCycle", cleanCycle)) + + wrapped = false + } + for { + var hadReplicationBeforeReset bool + select { case <-ctx.Done(): return @@ -91,39 +124,44 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { addrs, cursor, err = p.localStorage.ListWithCursor(batchSize, cursor, iec.AttributeRuleIdx, iec.AttributePartIdx, object.FilterParentID) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { - cleanCycle := !p.hadToReplicate.Swap(false) - if cleanCycle { - p.metrics.SetPolicerConsistency(true) + if wrapped { + cycleFinished() + time.Sleep(repCooldown) + } else { + wrapped = true + cursor = nil } - - p.log.Info("finished local storage cycle", zap.Bool("cleanCycle", cleanCycle)) } else { p.log.Warn("failure at object select for replication", zap.Error(err)) + time.Sleep(repCooldown) } - time.Sleep(repCooldown) continue } for i := range addrs { + if wrapped && addrs[i].Address.Compare(stopAddr) > 0 { + hadReplicationBeforeReset = p.hadToReplicate.Load() + cycleFinished() + } + select { case <-ctx.Done(): return default: addr := addrs[i] - if p.objsInWork.inWork(addr.Address) { + if !p.objsInWork.tryAdd(addr.Address) { // do not process an object // that is in work continue } err = p.taskPool.Submit(func() { - p.objsInWork.add(addr.Address) + defer p.objsInWork.remove(addr.Address) p.processObject(ctx, addr) - - p.objsInWork.remove(addr.Address) }) if err != nil { + p.objsInWork.remove(addr.Address) p.log.Warn("pool submission", zap.Error(err)) } } @@ -133,7 +171,8 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { // sliding window. Boost mode transitions only when a strict majority // of the window agrees, so an equal split keeps the current state unchanged. if boostMultiplier > 1 { - withReplication := win.record(p.hadToReplicate.Load()) + hadReplication := hadReplicationBeforeReset || p.hadToReplicate.Load() + withReplication := win.record(hadReplication) if !boosted && withReplication >= boostMajority { p.log.Info("missing replicas detected, entering boost mode",