Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions server/internal/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand Down Expand Up @@ -67,7 +68,12 @@ type PutOp[V Value] interface {
// WithTTL sets a time-to-live for this value. The value will automatically
// be removed after the TTL has expired.
WithTTL(ttl time.Duration) PutOp[V]
// WithUpdatedVersion will update the version on the value after the put
// operation completes.
WithUpdatedVersion() PutOp[V]
Exec(ctx context.Context) error

VersionUpdater
}

// DeleteOp is an operation that deletes one or more values from storage, and
Expand Down Expand Up @@ -120,3 +126,15 @@ type WatchOp[V Value] interface {
// again.
Close()
}

// VersionUpdater are the methods that an operation can implement to support
// updating the value version after an update. This exists as a separate
// interface to make it usable for runtime type checking.
type VersionUpdater interface {
// UpdateVersionEnabled should return true if this operation should update
// the item's version.
UpdateVersionEnabled() bool
// UpdateVersion should read the previous KVs that it manages from prevKV
// and update its item's versions.
UpdateVersion(prevKVs map[string]*mvccpb.KeyValue)
Comment thread
jason-lynch marked this conversation as resolved.
}
136 changes: 110 additions & 26 deletions server/internal/storage/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import (
"fmt"
"time"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

// PutOp stores a key value pair with an optional time-to-live. This operation
// does not enforce any version constraints.
type putOp[V Value] struct {
client *clientv3.Client
key string
val V
ttl *time.Duration
options []clientv3.OpOption
client *clientv3.Client
key string
val V
ttl *time.Duration
options []clientv3.OpOption
shouldUpdateVersion bool
}

// NewPutOp returns an operation that stores a key value pair with an optional
// time-to-live. This operation does not enforce any version constraints.
func NewPutOp[V Value](client *clientv3.Client, key string, val V, options ...clientv3.OpOption) PutOp[V] {
return &putOp[V]{
client: client,
Expand All @@ -43,29 +45,54 @@ func (o *putOp[V]) WithTTL(ttl time.Duration) PutOp[V] {
return o
}

func (o *putOp[V]) WithUpdatedVersion() PutOp[V] {
o.shouldUpdateVersion = true
o.options = append(o.options, clientv3.WithPrevKV())
return o
}

func (o *putOp[V]) Exec(ctx context.Context) error {
ops, err := o.Ops(ctx)
if err != nil {
return err
}
_, err = o.client.Do(ctx, ops[0])
resp, err := o.client.Do(ctx, ops[0])
if err != nil {
return fmt.Errorf("failed to put %q: %w", o.key, err)
}
if o.shouldUpdateVersion {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call updateVersion() here like createOp.Exec() and updateOp.Exec() do?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. It's just a slightly different output shape. createOp and updateOp use a transaction internally since we need to apply a condition to the operation. That means both of them produce a slice of responses, even though they're only performing one operation. So we're reusing the same code for those as we do with transactions: we iterate over the responses, extract any Put responses, and produce a map[string]*mvccpb.KeyValue of the previous KVs. I considered reusing those functions here, but since they involve a few transformations, it felt cleaner to just duplicate this if/else.

put := resp.Put()
if put != nil && put.PrevKv != nil {
o.val.SetVersion(put.PrevKv.Version + 1)
} else {
// PrevKV is nil for creates
o.val.SetVersion(o.val.Version() + 1)
}
}

return nil
}

// CreateOp creates a key value pair with an optional time-to-live. This
// operation will fail with ErrAlreadyExists if the given key already exists.
func (o *putOp[V]) UpdateVersionEnabled() bool {
return o.shouldUpdateVersion
}

func (o *putOp[V]) UpdateVersion(prevKVs map[string]*mvccpb.KeyValue) {
updateVersion(o.key, o.val, prevKVs)
}

type createOp[V Value] struct {
client *clientv3.Client
key string
val V
ttl *time.Duration
options []clientv3.OpOption
client *clientv3.Client
key string
val V
ttl *time.Duration
options []clientv3.OpOption
shouldUpdateVersion bool
}

// NewCreateOp returns an operation that creates a key value pair with an
// optional time-to-live. This operation will fail with ErrAlreadyExists if the
// given key already exists.
func NewCreateOp[V Value](client *clientv3.Client, key string, val V, options ...clientv3.OpOption) PutOp[V] {
return &createOp[V]{
client: client,
Expand All @@ -88,6 +115,12 @@ func (o *createOp[V]) WithTTL(ttl time.Duration) PutOp[V] {
return o
}

func (o *createOp[V]) WithUpdatedVersion() PutOp[V] {
o.shouldUpdateVersion = true
o.options = append(o.options, clientv3.WithPrevKV())
return o
}

func (o *createOp[V]) Exec(ctx context.Context) error {
ops, err := o.Ops(ctx)
if err != nil {
Expand All @@ -103,21 +136,35 @@ func (o *createOp[V]) Exec(ctx context.Context) error {
if !resp.Succeeded {
return fmt.Errorf("%q: %w", o.key, ErrAlreadyExists)
}
if o.shouldUpdateVersion {
updateVersion(o.key, o.val, extractPrevKVs(resp))
}

return nil
}

// UpdateOp updates an existing key value pair with a new value and an optional
// time-to-live. This operation will fail with ErrValueVersionMismatch if the
// stored value's version does not match the given value's version.
func (o *createOp[V]) UpdateVersionEnabled() bool {
return o.shouldUpdateVersion
}

func (o *createOp[V]) UpdateVersion(prevKVs map[string]*mvccpb.KeyValue) {
updateVersion(o.key, o.val, prevKVs)
}

type updateOp[V Value] struct {
client *clientv3.Client
key string
val V
ttl *time.Duration
options []clientv3.OpOption
client *clientv3.Client
key string
val V
ttl *time.Duration
options []clientv3.OpOption
shouldUpdateVersion bool
}

// NewUpdateOp returns an operation updates an existing key value pair with a
// new value and an optional time-to-live. This operation will fail with
// ErrValueVersionMismatch if the stored value's version does not match the
// given value's version. Note that this operation is equivalent to a create
// when the item version is 0.
func NewUpdateOp[V Value](client *clientv3.Client, key string, val V, options ...clientv3.OpOption) PutOp[V] {
return &updateOp[V]{
client: client,
Expand All @@ -142,6 +189,12 @@ func (o *updateOp[V]) WithTTL(ttl time.Duration) PutOp[V] {
return o
}

func (o *updateOp[V]) WithUpdatedVersion() PutOp[V] {
o.shouldUpdateVersion = true
o.options = append(o.options, clientv3.WithPrevKV())
return o
}

func (o *updateOp[V]) Exec(ctx context.Context) error {
ops, err := o.Ops(ctx)
if err != nil {
Expand All @@ -157,10 +210,21 @@ func (o *updateOp[V]) Exec(ctx context.Context) error {
if !resp.Succeeded {
return fmt.Errorf("%q: %w", o.key, ErrValueVersionMismatch)
}
if o.shouldUpdateVersion {
updateVersion(o.key, o.val, extractPrevKVs(resp))
}

return nil
}

func (o *updateOp[V]) UpdateVersionEnabled() bool {
return o.shouldUpdateVersion
}

func (o *updateOp[V]) UpdateVersion(prevKVs map[string]*mvccpb.KeyValue) {
updateVersion(o.key, o.val, prevKVs)
}

func encodeJSON(val any) (string, error) {
raw, err := json.Marshal(val)
if err != nil {
Expand Down Expand Up @@ -201,15 +265,14 @@ func putOps[V Value](
ttl *time.Duration,
options ...clientv3.OpOption,
) ([]clientv3.Op, error) {
var allOptions []clientv3.OpOption
allOptions := append([]clientv3.OpOption{}, options...)
if ttl != nil {
leaseResp, err := client.Grant(ctx, int64(ttl.Seconds()))
if err != nil {
return nil, fmt.Errorf("failed to grant lease for %q: %w", key, err)
}
allOptions = append(options, clientv3.WithLease(leaseResp.ID))
allOptions = append(allOptions, clientv3.WithLease(leaseResp.ID))
}
allOptions = append(allOptions, options...)

encoded, err := encodeJSON(val)
if err != nil {
Expand All @@ -218,3 +281,24 @@ func putOps[V Value](

return []clientv3.Op{clientv3.OpPut(key, encoded, allOptions...)}, nil
}

func extractPrevKVs(resp *clientv3.TxnResponse) map[string]*mvccpb.KeyValue {
prevKVs := map[string]*mvccpb.KeyValue{}
for _, r := range resp.Responses {
put := r.GetResponsePut()
if put != nil && put.PrevKv != nil {
prevKVs[string(put.PrevKv.Key)] = put.PrevKv
}
}
return prevKVs
}

func updateVersion[V Value](key string, item V, prevKVs map[string]*mvccpb.KeyValue) {
prev, ok := prevKVs[key]
if ok {
item.SetVersion(prev.Version + 1)
} else {
// PrevKV is nil for creates
item.SetVersion(item.Version() + 1)
}
}
41 changes: 35 additions & 6 deletions server/internal/storage/put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -14,9 +15,7 @@ import (

func TestPutOp(t *testing.T) {
server := storagetest.NewEtcdTestServer(t)
// defer server.Close()
client := server.Client(t)
// defer client.Close()

t.Run("puts a value", func(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -65,13 +64,24 @@ func TestPutOp(t *testing.T) {
assert.ErrorIs(t, err, storage.ErrNotFound)
assert.Nil(t, val)
})

t.Run("with updated version", func(t *testing.T) {
ctx := t.Context()

item := &TestValue{SomeField: "bar"}
key := uuid.NewString()
assert.NoError(t, storage.NewPutOp(client, key, item).WithUpdatedVersion().Exec(ctx))
assert.Equal(t, int64(1), item.Version())

item.SomeField = "baz"
assert.NoError(t, storage.NewPutOp(client, key, item).WithUpdatedVersion().Exec(ctx))
assert.Equal(t, int64(2), item.Version())
})
}

func TestCreateOp(t *testing.T) {
server := storagetest.NewEtcdTestServer(t)
// defer server.Close()
client := server.Client(t)
// defer client.Close()

t.Run("key does not exist", func(t *testing.T) {
ctx := context.Background()
Expand All @@ -95,13 +105,19 @@ func TestCreateOp(t *testing.T) {
err = storage.NewCreateOp(client, "bar", &TestValue{SomeField: "bar"}).Exec(ctx)
assert.ErrorIs(t, err, storage.ErrAlreadyExists)
})

t.Run("with updated version", func(t *testing.T) {
item := &TestValue{SomeField: "foo"}
ctx := t.Context()
err := storage.NewCreateOp(client, uuid.NewString(), item).WithUpdatedVersion().Exec(ctx)
assert.NoError(t, err)
assert.Equal(t, int64(1), item.Version())
})
}

func TestUpdateOp(t *testing.T) {
server := storagetest.NewEtcdTestServer(t)
// defer server.Close()
client := server.Client(t)
// defer client.Close()

t.Run("valid update", func(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -137,4 +153,17 @@ func TestUpdateOp(t *testing.T) {

assert.ErrorIs(t, err, storage.ErrValueVersionMismatch)
})

t.Run("with updated version", func(t *testing.T) {
ctx := t.Context()

item := &TestValue{SomeField: "bar"}
key := uuid.NewString()
assert.NoError(t, storage.NewCreateOp(client, key, item).WithUpdatedVersion().Exec(ctx))
assert.Equal(t, int64(1), item.Version())

item.SomeField = "baz"
assert.NoError(t, storage.NewUpdateOp(client, key, item).WithUpdatedVersion().Exec(ctx))
assert.Equal(t, int64(2), item.Version())
})
}
Loading