Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changelog for NeoFS Node
- Too early GET/HEAD/RANGE request failure on single SN dial failure (#3840)
- Payment deadlock on IR side (#3842)
- Wrong shard ID entry in FSTree descriptor (#3849, #3861)
- Shard evacuation replicate for EC parts (#3854)

### Changed
- SN returns unsigned responses to requests with API >= `v2.22` (#3785)
Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func initObjectService(c *cfg) {
replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, (*coreClientConstructor)(clientConstructor)),
),
replicator.WithLocalNodeKey(c),
)

c.policer = policer.New(neofsecdsa.Signer(c.key.PrivateKey),
Expand Down
41 changes: 0 additions & 41 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ type RemoteSender struct {
clientConstructor ClientConstructor
}

// RemotePutPrm groups remote put operation parameters.
type RemotePutPrm struct {
node netmap.NodeInfo

obj *object.Object
}

func putObjectToNode(ctx context.Context, nodeInfo clientcore.NodeInfo, obj *object.Object,
keyStorage *util.KeyStorage, clientConstructor ClientConstructor, commonPrm *util.CommonPrm) error {
var opts client.PrmObjectPutInit
Expand Down Expand Up @@ -100,40 +93,6 @@ func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *Remot
}
}

// WithNodeInfo sets information about the remote node.
func (p *RemotePutPrm) WithNodeInfo(v netmap.NodeInfo) *RemotePutPrm {
if p != nil {
p.node = v
}

return p
}

// WithObject sets transferred object.
func (p *RemotePutPrm) WithObject(v *object.Object) *RemotePutPrm {
if p != nil {
p.obj = v
}

return p
}

// PutObject sends object to remote node.
func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
var nodeInfo clientcore.NodeInfo
err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfo, netmapCore.Node(p.node))
if err != nil {
return fmt.Errorf("parse client node info: %w", err)
}

err = putObjectToNode(ctx, nodeInfo, p.obj, s.keyStorage, s.clientConstructor, nil)
if err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err)
}

return nil
}

// ReplicateObjectToNode copies binary-encoded NeoFS object from the given
// [io.ReadSeeker] into local storage of the node described by specified
// [netmap.NodeInfo].
Expand Down
51 changes: 21 additions & 30 deletions pkg/services/policer/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

iec "github.com/nspcc-dev/neofs-node/internal/ec"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand Down Expand Up @@ -504,40 +505,30 @@ func (p *Policer) recreateECPart(ctx context.Context, parent object.Object, rule
return
}

for i := range iec.NodeSequenceForPart(partIdx, int(rule.DataPartNum)+int(rule.ParityPartNum), len(sortedNodes)) {
if p.network.IsLocalNodePublicKey(sortedNodes[i].PublicKey()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this check is practical at all. A node can't detect it's own missing part. Even if netmap changes the set of nodes is the same from check to recreate. We also have some part already (which is why we're here). So we should replicate this part (if needed) and check for other parts (on other nodes) and then replicate new parts to that remote nods (if needed).

One thing I can imagine netmap change + a lost part in which case we'd have a part, but a wrong one and we'd like to both push our part to another node and recreate this missing part for ourselves. How practical that is (given that other nodes will recreate/replicate it too) is an open question.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this check then?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err = p.localStorage.Put(&partObj, nil)
if err == nil {
p.log.Info("EC part successfully recreated and put to local storage",
zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()),
zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()))
return
}

p.log.Info("failed to put recreated EC part to local storage",
zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()),
zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()),
zap.Error(err))
continue
}

err = p.replicator.PutObjectToNode(ctx, partObj, sortedNodes[i])
if err == nil {
p.log.Info("EC part successfully recreated and put to remote node",
zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()),
zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()),
zap.String("node_pub", netmap.StringifyPublicKey(sortedNodes[i])))
return
}
totalParts := int(rule.DataPartNum) + int(rule.ParityPartNum)
seq := slices.Collect(iec.NodeSequenceForPart(partIdx, totalParts, len(sortedNodes)))
nodes := make([]netmap.NodeInfo, len(seq))
for i, idx := range seq {
nodes[i] = sortedNodes[idx]
}

if errors.Is(err, ctx.Err()) {
return
}
var repRes singleReplication
var task replicator.Task
task.SetObject(&partObj)
task.SetObjectAddress(oid.NewAddress(parent.GetContainerID(), partObj.GetID()))
task.SetCopiesNumber(1)
task.SetNodes(nodes)
p.replicator.HandleTask(ctx, task, &repRes)

p.log.Info("failed to put recreated EC part to remote node",
if repRes.done {
p.log.Info("EC part successfully recreated",
zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()),
zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()),
zap.String("node_pub", netmap.StringifyPublicKey(sortedNodes[i])), zap.Error(err))
zap.Strings("node", repRes.netAddresses))
} else {
p.log.Info("failed to put recreated EC part on any node",
zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()),
zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()))
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/services/policer/policer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type nodeLoader interface {
// interface of [replicator.Replicator] used by [Policer] for overriding in tests.
type replicatorIface interface {
HandleTask(context.Context, replicator.Task, replicator.TaskResult)
PutObjectToNode(context.Context, object.Object, netmapsdk.NodeInfo) error
}

// interface of [engine.StorageEngine] used by [Policer] for overriding in tests.
Expand Down
8 changes: 4 additions & 4 deletions pkg/services/policer/policer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,10 +1138,6 @@ func (x *testReplicator) HandleTask(ctx context.Context, task replicator.Task, r
_ = x.task.CompareAndSwap(nil, task)
}

func (x *testReplicator) PutObjectToNode(context.Context, object.Object, netmap.NodeInfo) error {
return nil
}

type testLocalNode struct {
objList []objectcore.AddressWithAttributes

Expand Down Expand Up @@ -1185,6 +1181,10 @@ func (x *mockNetwork) IsLocalNodeInNetmap() bool {
return x.inNetmap
}

func (x *mockNetwork) PublicKey() []byte {
return x.pubKey
}

func (x *testLocalNode) ListWithCursor(_ uint32, c *engine.Cursor, _ ...string) ([]objectcore.AddressWithAttributes, *engine.Cursor, error) {
if c != nil || len(x.objList) == 0 {
return nil, c, engine.ErrEndOfListing
Expand Down
59 changes: 28 additions & 31 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"io"

putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
Expand Down Expand Up @@ -39,24 +38,24 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
}()

var err error
var prm *putsvc.RemotePutPrm
var stream io.ReadSeeker
binReplication := task.obj == nil
if binReplication {
b, err := p.localStorage.GetBytes(task.addr)
var objBin []byte
if task.obj != nil {
objBin = task.obj.Marshal()
stream = bytes.NewReader(objBin)
} else {
objBin, err = p.localStorage.GetBytes(task.addr)
if err != nil {
p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),
zap.Error(err))

return
}
stream = bytes.NewReader(b)
if len(task.nodes) > 1 {
stream = client.DemuxReplicatedObject(stream)
}
} else {
prm = new(putsvc.RemotePutPrm).WithObject(task.obj)
stream = bytes.NewReader(objBin)
}
if len(task.nodes) > 1 {
stream = client.DemuxReplicatedObject(stream)
}

for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
Expand All @@ -71,15 +70,26 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
zap.Stringer("object", task.addr),
)

if p.localNodeKey.IsLocalNodePublicKey(task.nodes[i].PublicKey()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, how is it possible? we are the node that started replication process, and we are the node that should store it? i think i do not understand smth, i thought in evacuation process replication is only called when we already failed to put it locally

if task.obj == nil {
log.Debug("cannot put object to local storage: object not provided in task")
continue
}
if err = p.localStorage.Put(task.obj, objBin); err != nil {
log.Error("could not put object to local storage", zap.Error(err))
continue
}
log.Debug("object successfully stored locally")
task.quantity--
res.SubmitSuccessfulReplication(task.nodes[i])
continue
}

callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)

if binReplication {
err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i])
// note that we don't need to reset stream because it is used exactly once
// according to the client.DemuxReplicatedObject above
} else {
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
}
err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i])
// note that we don't need to reset stream because it is used exactly once
// according to the client.DemuxReplicatedObject above

cancel()

Expand All @@ -96,16 +106,3 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
}
}
}

// PutObjectToNode attempts to put given object to the specified node within
// configured timeout.
func (p *Replicator) PutObjectToNode(ctx context.Context, obj object.Object, node netmap.NodeInfo) error {
ctx, cancel := context.WithTimeout(ctx, p.putTimeout)
defer cancel()

var pp putsvc.RemotePutPrm
pp.WithObject(&obj)
pp.WithNodeInfo(node)

return p.remoteSender.PutObject(ctx, &pp)
}
16 changes: 16 additions & 0 deletions pkg/services/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"go.uber.org/zap"
)

// LocalNodeKey provides information about the NeoFS network to Replicator for work.
type LocalNodeKey interface {
// IsLocalNodePublicKey checks whether given binary-encoded public key is
// assigned in the network map to a local storage node running [Replicator].
IsLocalNodePublicKey([]byte) bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frankly, I still have no idea why we're not passing this slice into services directly to perform any comparisons locally. It can't change for a node. But that's a bit different topic, every service has it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first, I just wanted to pass the key itself, but then I saw that it was taken from netmap, so I assumed that it could change.

func nodeKeyFromNetmap(c *cfg) []byte {
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
return ni.PublicKey()
}
return nil
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not support key hot swap, so it is always a slice for a node that does not restart

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be changed for all services, here we can follow whatever is done everywhere already.

}

// Replicator represents the utility that replicates
// local objects to remote nodes.
type Replicator struct {
Expand All @@ -25,6 +32,8 @@ type cfg struct {
remoteSender *putsvc.RemoteSender

localStorage *engine.StorageEngine

localNodeKey LocalNodeKey
}

func defaultCfg() *cfg {
Expand Down Expand Up @@ -73,3 +82,10 @@ func WithLocalStorage(v *engine.StorageEngine) Option {
c.localStorage = v
}
}

// WithLocalNodeKey provides LocalNodeKey component.
func WithLocalNodeKey(n LocalNodeKey) Option {
return func(c *cfg) {
c.localNodeKey = n
}
}
Loading