-
Notifications
You must be signed in to change notification settings - Fork 49
replicator: move EC local/remote logic from policer #3854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,6 @@ import ( | |
| "context" | ||
| "io" | ||
|
|
||
| putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" | ||
| "github.com/nspcc-dev/neofs-sdk-go/client" | ||
| "github.com/nspcc-dev/neofs-sdk-go/netmap" | ||
| "github.com/nspcc-dev/neofs-sdk-go/object" | ||
|
|
@@ -39,24 +38,24 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) | |
| }() | ||
|
|
||
| var err error | ||
| var prm *putsvc.RemotePutPrm | ||
| var stream io.ReadSeeker | ||
| binReplication := task.obj == nil | ||
| if binReplication { | ||
| b, err := p.localStorage.GetBytes(task.addr) | ||
| var objBin []byte | ||
| if task.obj != nil { | ||
| objBin = task.obj.Marshal() | ||
| stream = bytes.NewReader(objBin) | ||
| } else { | ||
| objBin, err = p.localStorage.GetBytes(task.addr) | ||
| if err != nil { | ||
| p.log.Error("could not get object from local storage", | ||
| zap.Stringer("object", task.addr), | ||
| zap.Error(err)) | ||
|
|
||
| return | ||
| } | ||
| stream = bytes.NewReader(b) | ||
| if len(task.nodes) > 1 { | ||
| stream = client.DemuxReplicatedObject(stream) | ||
| } | ||
| } else { | ||
| prm = new(putsvc.RemotePutPrm).WithObject(task.obj) | ||
| stream = bytes.NewReader(objBin) | ||
| } | ||
| if len(task.nodes) > 1 { | ||
| stream = client.DemuxReplicatedObject(stream) | ||
| } | ||
|
|
||
| for i := 0; task.quantity > 0 && i < len(task.nodes); i++ { | ||
|
|
@@ -71,15 +70,26 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) | |
| zap.Stringer("object", task.addr), | ||
| ) | ||
|
|
||
| if p.localNodeKey.IsLocalNodePublicKey(task.nodes[i].PublicKey()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw, how is it possible? we are the node that started replication process, and we are the node that should store it? i think i do not understand smth, i thought in evacuation process replication is only called when we already failed to put it locally |
||
| if task.obj == nil { | ||
| log.Debug("cannot put object to local storage: object not provided in task") | ||
| continue | ||
| } | ||
| if err = p.localStorage.Put(task.obj, objBin); err != nil { | ||
| log.Error("could not put object to local storage", zap.Error(err)) | ||
| continue | ||
| } | ||
| log.Debug("object successfully stored locally") | ||
| task.quantity-- | ||
| res.SubmitSuccessfulReplication(task.nodes[i]) | ||
| continue | ||
| } | ||
|
|
||
| callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) | ||
|
|
||
| if binReplication { | ||
| err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i]) | ||
| // note that we don't need to reset stream because it is used exactly once | ||
| // according to the client.DemuxReplicatedObject above | ||
| } else { | ||
| err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) | ||
| } | ||
| err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i]) | ||
| // note that we don't need to reset stream because it is used exactly once | ||
| // according to the client.DemuxReplicatedObject above | ||
|
|
||
| cancel() | ||
|
|
||
|
|
@@ -96,16 +106,3 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) | |
| } | ||
| } | ||
| } | ||
|
|
||
| // PutObjectToNode attempts to put given object to the specified node within | ||
| // configured timeout. | ||
| func (p *Replicator) PutObjectToNode(ctx context.Context, obj object.Object, node netmap.NodeInfo) error { | ||
| ctx, cancel := context.WithTimeout(ctx, p.putTimeout) | ||
| defer cancel() | ||
|
|
||
| var pp putsvc.RemotePutPrm | ||
| pp.WithObject(&obj) | ||
| pp.WithNodeInfo(node) | ||
|
|
||
| return p.remoteSender.PutObject(ctx, &pp) | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,6 +8,13 @@ import ( | |||||||||||||||||
| "go.uber.org/zap" | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| // LocalNodeKey provides information about the NeoFS network to Replicator for work. | ||||||||||||||||||
| type LocalNodeKey interface { | ||||||||||||||||||
| // IsLocalNodePublicKey checks whether given binary-encoded public key is | ||||||||||||||||||
| // assigned in the network map to a local storage node running [Replicator]. | ||||||||||||||||||
| IsLocalNodePublicKey([]byte) bool | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Frankly, I still have no idea why we're not passing this slice into services directly to perform any comparisons locally. It can't change for a node. But that's a bit different topic, every service has it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At first, I just wanted to pass the key itself, but then I saw that it was taken from netmap, so I assumed that it could change. neofs-node/cmd/neofs-node/netmap.go Lines 139 to 146 in 72d4b6d
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do not support key hot swap, so it is always a slice for a node that does not restart
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be changed for all services, here we can follow whatever is done everywhere already. |
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Replicator represents the utility that replicates | ||||||||||||||||||
| // local objects to remote nodes. | ||||||||||||||||||
| type Replicator struct { | ||||||||||||||||||
|
|
@@ -25,6 +32,8 @@ type cfg struct { | |||||||||||||||||
| remoteSender *putsvc.RemoteSender | ||||||||||||||||||
|
|
||||||||||||||||||
| localStorage *engine.StorageEngine | ||||||||||||||||||
|
|
||||||||||||||||||
| localNodeKey LocalNodeKey | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| func defaultCfg() *cfg { | ||||||||||||||||||
|
|
@@ -73,3 +82,10 @@ func WithLocalStorage(v *engine.StorageEngine) Option { | |||||||||||||||||
| c.localStorage = v | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // WithLocalNodeKey provides LocalNodeKey component. | ||||||||||||||||||
| func WithLocalNodeKey(n LocalNodeKey) Option { | ||||||||||||||||||
| return func(c *cfg) { | ||||||||||||||||||
| c.localNodeKey = n | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this check is practical at all. A node can't detect it's own missing part. Even if netmap changes the set of nodes is the same from check to recreate. We also have some part already (which is why we're here). So we should replicate this part (if needed) and check for other parts (on other nodes) and then replicate new parts to that remote nods (if needed).
One thing I can imagine netmap change + a lost part in which case we'd have a part, but a wrong one and we'd like to both push our part to another node and recreate this missing part for ourselves. How practical that is (given that other nodes will recreate/replicate it too) is an open question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this check then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cthulhu-rider?