-
Notifications
You must be signed in to change notification settings - Fork 2
feat: update item version on put #309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,19 +8,21 @@ import ( | |
| "fmt" | ||
| "time" | ||
|
|
||
| "go.etcd.io/etcd/api/v3/mvccpb" | ||
| clientv3 "go.etcd.io/etcd/client/v3" | ||
| ) | ||
|
|
||
| // PutOp stores a key value pair with an optional time-to-live. This operation | ||
| // does not enforce any version constraints. | ||
| type putOp[V Value] struct { | ||
| client *clientv3.Client | ||
| key string | ||
| val V | ||
| ttl *time.Duration | ||
| options []clientv3.OpOption | ||
| client *clientv3.Client | ||
| key string | ||
| val V | ||
| ttl *time.Duration | ||
| options []clientv3.OpOption | ||
| shouldUpdateVersion bool | ||
| } | ||
|
|
||
| // NewPutOp returns an operation that stores a key value pair with an optional | ||
| // time-to-live. This operation does not enforce any version constraints. | ||
| func NewPutOp[V Value](client *clientv3.Client, key string, val V, options ...clientv3.OpOption) PutOp[V] { | ||
| return &putOp[V]{ | ||
| client: client, | ||
|
|
@@ -43,29 +45,54 @@ func (o *putOp[V]) WithTTL(ttl time.Duration) PutOp[V] { | |
| return o | ||
| } | ||
|
|
||
| func (o *putOp[V]) WithUpdatedVersion() PutOp[V] { | ||
| o.shouldUpdateVersion = true | ||
| o.options = append(o.options, clientv3.WithPrevKV()) | ||
| return o | ||
| } | ||
|
|
||
| func (o *putOp[V]) Exec(ctx context.Context) error { | ||
| ops, err := o.Ops(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| _, err = o.client.Do(ctx, ops[0]) | ||
| resp, err := o.client.Do(ctx, ops[0]) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to put %q: %w", o.key, err) | ||
| } | ||
| if o.shouldUpdateVersion { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not call
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good question. It's just a slightly different output shape. |
||
| put := resp.Put() | ||
| if put != nil && put.PrevKv != nil { | ||
| o.val.SetVersion(put.PrevKv.Version + 1) | ||
| } else { | ||
| // PrevKV is nil for creates | ||
| o.val.SetVersion(o.val.Version() + 1) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // CreateOp creates a key value pair with an optional time-to-live. This | ||
| // operation will fail with ErrAlreadyExists if the given key already exists. | ||
| func (o *putOp[V]) UpdateVersionEnabled() bool { | ||
| return o.shouldUpdateVersion | ||
| } | ||
|
|
||
| func (o *putOp[V]) UpdateVersion(prevKVs map[string]*mvccpb.KeyValue) { | ||
| updateVersion(o.key, o.val, prevKVs) | ||
| } | ||
|
|
||
| type createOp[V Value] struct { | ||
| client *clientv3.Client | ||
| key string | ||
| val V | ||
| ttl *time.Duration | ||
| options []clientv3.OpOption | ||
| client *clientv3.Client | ||
| key string | ||
| val V | ||
| ttl *time.Duration | ||
| options []clientv3.OpOption | ||
| shouldUpdateVersion bool | ||
| } | ||
|
|
||
| // NewCreateOp returns an operation that creates a key value pair with an | ||
| // optional time-to-live. This operation will fail with ErrAlreadyExists if the | ||
| // given key already exists. | ||
| func NewCreateOp[V Value](client *clientv3.Client, key string, val V, options ...clientv3.OpOption) PutOp[V] { | ||
| return &createOp[V]{ | ||
| client: client, | ||
|
|
@@ -88,6 +115,12 @@ func (o *createOp[V]) WithTTL(ttl time.Duration) PutOp[V] { | |
| return o | ||
| } | ||
|
|
||
| func (o *createOp[V]) WithUpdatedVersion() PutOp[V] { | ||
| o.shouldUpdateVersion = true | ||
| o.options = append(o.options, clientv3.WithPrevKV()) | ||
| return o | ||
| } | ||
|
|
||
| func (o *createOp[V]) Exec(ctx context.Context) error { | ||
| ops, err := o.Ops(ctx) | ||
| if err != nil { | ||
|
|
@@ -103,21 +136,35 @@ func (o *createOp[V]) Exec(ctx context.Context) error { | |
| if !resp.Succeeded { | ||
| return fmt.Errorf("%q: %w", o.key, ErrAlreadyExists) | ||
| } | ||
| if o.shouldUpdateVersion { | ||
| updateVersion(o.key, o.val, extractPrevKVs(resp)) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // UpdateOp updates an existing key value pair with a new value and an optional | ||
| // time-to-live. This operation will fail with ErrValueVersionMismatch if the | ||
| // stored value's version does not match the given value's version. | ||
| func (o *createOp[V]) UpdateVersionEnabled() bool { | ||
| return o.shouldUpdateVersion | ||
| } | ||
|
|
||
| func (o *createOp[V]) UpdateVersion(prevKVs map[string]*mvccpb.KeyValue) { | ||
| updateVersion(o.key, o.val, prevKVs) | ||
| } | ||
|
|
||
| type updateOp[V Value] struct { | ||
| client *clientv3.Client | ||
| key string | ||
| val V | ||
| ttl *time.Duration | ||
| options []clientv3.OpOption | ||
| client *clientv3.Client | ||
| key string | ||
| val V | ||
| ttl *time.Duration | ||
| options []clientv3.OpOption | ||
| shouldUpdateVersion bool | ||
| } | ||
|
|
||
| // NewUpdateOp returns an operation updates an existing key value pair with a | ||
| // new value and an optional time-to-live. This operation will fail with | ||
| // ErrValueVersionMismatch if the stored value's version does not match the | ||
| // given value's version. Note that this operation is equivalent to a create | ||
| // when the item version is 0. | ||
| func NewUpdateOp[V Value](client *clientv3.Client, key string, val V, options ...clientv3.OpOption) PutOp[V] { | ||
| return &updateOp[V]{ | ||
| client: client, | ||
|
|
@@ -142,6 +189,12 @@ func (o *updateOp[V]) WithTTL(ttl time.Duration) PutOp[V] { | |
| return o | ||
| } | ||
|
|
||
| func (o *updateOp[V]) WithUpdatedVersion() PutOp[V] { | ||
| o.shouldUpdateVersion = true | ||
| o.options = append(o.options, clientv3.WithPrevKV()) | ||
| return o | ||
| } | ||
|
|
||
| func (o *updateOp[V]) Exec(ctx context.Context) error { | ||
| ops, err := o.Ops(ctx) | ||
| if err != nil { | ||
|
|
@@ -157,10 +210,21 @@ func (o *updateOp[V]) Exec(ctx context.Context) error { | |
| if !resp.Succeeded { | ||
| return fmt.Errorf("%q: %w", o.key, ErrValueVersionMismatch) | ||
| } | ||
| if o.shouldUpdateVersion { | ||
| updateVersion(o.key, o.val, extractPrevKVs(resp)) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (o *updateOp[V]) UpdateVersionEnabled() bool { | ||
| return o.shouldUpdateVersion | ||
| } | ||
|
|
||
| func (o *updateOp[V]) UpdateVersion(prevKVs map[string]*mvccpb.KeyValue) { | ||
| updateVersion(o.key, o.val, prevKVs) | ||
| } | ||
|
|
||
| func encodeJSON(val any) (string, error) { | ||
| raw, err := json.Marshal(val) | ||
| if err != nil { | ||
|
|
@@ -201,15 +265,14 @@ func putOps[V Value]( | |
| ttl *time.Duration, | ||
| options ...clientv3.OpOption, | ||
| ) ([]clientv3.Op, error) { | ||
| var allOptions []clientv3.OpOption | ||
| allOptions := append([]clientv3.OpOption{}, options...) | ||
| if ttl != nil { | ||
| leaseResp, err := client.Grant(ctx, int64(ttl.Seconds())) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to grant lease for %q: %w", key, err) | ||
| } | ||
| allOptions = append(options, clientv3.WithLease(leaseResp.ID)) | ||
| allOptions = append(allOptions, clientv3.WithLease(leaseResp.ID)) | ||
| } | ||
| allOptions = append(allOptions, options...) | ||
|
|
||
| encoded, err := encodeJSON(val) | ||
| if err != nil { | ||
|
|
@@ -218,3 +281,24 @@ func putOps[V Value]( | |
|
|
||
| return []clientv3.Op{clientv3.OpPut(key, encoded, allOptions...)}, nil | ||
| } | ||
|
|
||
| func extractPrevKVs(resp *clientv3.TxnResponse) map[string]*mvccpb.KeyValue { | ||
| prevKVs := map[string]*mvccpb.KeyValue{} | ||
| for _, r := range resp.Responses { | ||
| put := r.GetResponsePut() | ||
| if put != nil && put.PrevKv != nil { | ||
| prevKVs[string(put.PrevKv.Key)] = put.PrevKv | ||
| } | ||
| } | ||
| return prevKVs | ||
| } | ||
|
|
||
| func updateVersion[V Value](key string, item V, prevKVs map[string]*mvccpb.KeyValue) { | ||
| prev, ok := prevKVs[key] | ||
| if ok { | ||
| item.SetVersion(prev.Version + 1) | ||
| } else { | ||
| // PrevKV is nil for creates | ||
| item.SetVersion(item.Version() + 1) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.