Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ LOG_LEVEL ?= info
DEV_IMAGE_REPO ?= ghcr.io/pgedge
CONTROL_PLANE_IMAGE_REPO ?= host.docker.internal:5000/control-plane
TEST_RERUN_FAILS ?= 0
TEST_DISABLE_CACHE ?= 0
E2E_FIXTURE ?=
E2E_PARALLEL ?= 8
E2E_RUN ?=
Expand Down Expand Up @@ -51,6 +52,9 @@ cluster_test_args=-tags=cluster_test -count=1 -timeout=10m \
$(if $(CLUSTER_TEST_IMAGE_TAG),-image-tag $(CLUSTER_TEST_IMAGE_TAG)) \
$(if $(CLUSTER_TEST_DATA_DIR),-data-dir $(CLUSTER_TEST_DATA_DIR))

test_disable_cache=$(if $(filter 1,$(TEST_DISABLE_CACHE)),-count=1)
workflows_backend_skip=-skip=Test_EtcdBackendE2E/AutoExpiration/StartsWorkflowAndRemoves

# Automatically adds junit output named after the rule, e.g.
# 'test-e2e-results.xml' in CI environment.
gotestsum=$(gobin)/gotestsum \
Expand All @@ -70,7 +74,9 @@ test:
$(gotestsum) \
--format-hide-empty-pkg \
--rerun-fails=$(TEST_RERUN_FAILS) \
--packages='./...'
--packages='./...' \
-- \
$(test_disable_cache)

.PHONY: test-etcd
test-etcd-lifecycle:
Expand All @@ -79,16 +85,25 @@ test-etcd-lifecycle:
--rerun-fails=$(TEST_RERUN_FAILS) \
--packages='./server/internal/etcd/...' \
-- \
$(test_disable_cache) \
-tags=etcd_lifecycle_test

# We skip StartsWorkflowAndRemoves because it contains a race condition that's
# much more prevalent now that we're executing workflows more quickly. This test
# uses the "autoexpire" feature to remove workflows that are older than 1
# millisecond. It starts a workflow, waits for the result, and then waits for
# the workflow to be removed. Occasionally, the workflow gets removed while the
# "waiting for result" step is still polling the workflow status.
.PHONY: test-workflows-backend
test-workflows-backend:
$(gotestsum) \
--format-hide-empty-pkg \
--rerun-fails=$(TEST_RERUN_FAILS) \
--packages='./server/internal/workflows/backend/etcd/...' \
-- \
-tags=workflows_backend_test
$(test_disable_cache) \
-tags=workflows_backend_test \
$(workflows_backend_skip)

.PHONY: test-ci
test-ci:
Expand All @@ -98,7 +113,9 @@ test-ci:
--rerun-fails=$(TEST_RERUN_FAILS) \
--packages='./...' \
-- \
-tags=workflows_backend_test,etcd_lifecycle_test
-count=1 \
-tags=workflows_backend_test,etcd_lifecycle_test \
$(workflows_backend_skip)

.PHONY: test-e2e
test-e2e:
Expand Down
1 change: 1 addition & 0 deletions server/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (a *App) runInitialized(parentCtx context.Context) error {
if err := worker.Start(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to start worker: %w", err))
}
a.addErrorProducer(parentCtx, worker)

if err := a.api.ServePostInit(a.serviceCtx); err != nil {
return handleError(fmt.Errorf("failed to serve post-init API: %w", err))
Expand Down
4 changes: 4 additions & 0 deletions server/internal/storage/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (c *cache[V]) Stop() {
if c.op != nil {
c.op.Close()
c.op = nil

c.mu.Lock()
defer c.mu.Unlock()

c.lastWatchRevision = 0
c.items = map[string]*cachedValue{}
c.tombstones = nil
Expand Down
41 changes: 31 additions & 10 deletions server/internal/workflows/backend/etcd/activity_queue_item/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package activity_queue_item

import (
"context"
"time"

"github.com/cschleiden/go-workflows/backend/history"
Expand All @@ -23,9 +24,12 @@ func (v *Value) UpdateLastLocked() {
v.LastLocked = &now
}

// Store is a storage implementation for activity queue items. StartCache must
// be called before the store can be used.
type Store struct {
client *clientv3.Client
root string
cache storage.Cache[*Value]
}

func NewStore(client *clientv3.Client, root string) *Store {
Expand All @@ -47,36 +51,53 @@ func (s *Store) Key(queue, instanceID, eventID string) string {
return storage.Key(s.QueuePrefix(queue), instanceID, eventID)
}

func (s *Store) StartCache(ctx context.Context) error {
if s.cache != nil {
return nil
}
s.cache = storage.NewCache(s.client, s.AllQueuesPrefix(), func(item *Value) string {
return s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
})
return s.cache.Start(ctx)
}

func (s *Store) StopCache() {
if s.cache != nil {
s.cache.Stop()
}
}

func (s *Store) PropagateErrors(ctx context.Context, ch chan error) {
s.cache.PropagateErrors(ctx, ch)
}

func (s *Store) GetAll() storage.GetMultipleOp[*Value] {
return storage.NewGetPrefixOp[*Value](s.client, s.AllQueuesPrefix())
return s.cache.GetPrefix(s.AllQueuesPrefix())
}

func (s *Store) GetByKey(queue, instanceID, eventID string) storage.GetOp[*Value] {
key := s.Key(queue, instanceID, eventID)
return storage.NewGetOp[*Value](s.client, key)
return s.cache.Get(key)
}

func (s *Store) GetByQueue(queue string) storage.GetMultipleOp[*Value] {
prefix := s.QueuePrefix(queue)
return storage.NewGetPrefixOp[*Value](s.client, prefix)
return s.cache.GetPrefix(prefix)
}

func (s *Store) Create(item *Value) storage.PutOp[*Value] {
key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
return storage.NewCreateOp(s.client, key, item)
return s.cache.Create(item)
}

func (s *Store) Update(item *Value) storage.PutOp[*Value] {
key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
return storage.NewUpdateOp(s.client, key, item)
return s.cache.Update(item)
}

func (s *Store) DeleteByKey(queue, instanceID, eventID string) storage.DeleteOp {
key := s.Key(queue, instanceID, eventID)
return storage.NewDeleteKeyOp(s.client, key)
return s.cache.DeleteByKey(key)
}

func (s *Store) DeleteItem(item *Value) storage.DeleteValueOp[*Value] {
key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
return storage.NewDeleteValueOp(s.client, key, item)
return s.cache.DeleteValue(item)
}
25 changes: 25 additions & 0 deletions server/internal/workflows/backend/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"slices"
"sync"
"time"

"github.com/cschleiden/go-workflows/backend"
Expand Down Expand Up @@ -34,6 +35,8 @@ type Backend struct {
options *backend.Options
workerID string
workerInstanceID string
workflowMu sync.Mutex
activityMu sync.Mutex
}

func NewBackend(store *Store, options *backend.Options, workerID string) *Backend {
Expand All @@ -45,6 +48,14 @@ func NewBackend(store *Store, options *backend.Options, workerID string) *Backen
}
}

func (b *Backend) StartCaches(ctx context.Context) error {
return b.store.StartCaches(ctx)
}

func (b *Backend) StopCaches() {
b.store.StopCaches()
}

func (b *Backend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
// Check for existing active instance execution
instances, err := b.store.WorkflowInstance.
Expand Down Expand Up @@ -244,6 +255,11 @@ func (b *Backend) PrepareActivityQueues(ctx context.Context, queues []workflow.Q
}

func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*backend.WorkflowTask, error) {
// This lock reduces unnecessary contention from concurrent calls within the
// same worker.
b.workflowMu.Lock()
defer b.workflowMu.Unlock()

for _, queue := range queues {
items, err := b.store.WorkflowQueueItem.
GetByQueue(string(queue)).
Expand Down Expand Up @@ -578,6 +594,11 @@ func (b *Backend) CompleteWorkflowTask(
}

func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue) (*backend.ActivityTask, error) {
// This lock reduces unnecessary contention from concurrent calls within the
// same worker.
b.activityMu.Lock()
defer b.activityMu.Unlock()

for _, queue := range queues {
items, err := b.store.ActivityQueueItem.
GetByQueue(string(queue)).
Expand Down Expand Up @@ -795,6 +816,10 @@ func (b *Backend) FeatureSupported(feature backend.Feature) bool {
return false
}

func (b *Backend) Error() <-chan error {
return b.store.Error()
}

func sortPendingEvents(events []*pending_event.Value) {
slices.SortStableFunc(events, func(a *pending_event.Value, b *pending_event.Value) int {
if a.CreatedAt > 0 && b.CreatedAt > 0 {
Expand Down
11 changes: 9 additions & 2 deletions server/internal/workflows/backend/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
Expand All @@ -23,7 +24,10 @@ func Test_EtcdBackend(t *testing.T) {

test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
opts := backend.ApplyOptions(options...)
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
require.NoError(t, backend.StartCaches(t.Context()))
t.Cleanup(backend.StopCaches)
return backend
}, nil)
}

Expand All @@ -33,7 +37,10 @@ func Test_EtcdBackendE2E(t *testing.T) {

test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
opts := backend.ApplyOptions(options...)
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
require.NoError(t, backend.StartCaches(t.Context()))
t.Cleanup(backend.StopCaches)
return backend
}, nil)
}

Expand Down
30 changes: 29 additions & 1 deletion server/internal/workflows/backend/etcd/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package etcd

import (
"context"
"fmt"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pgEdge/control-plane/server/internal/storage"
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_lock"
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_queue_item"
Expand All @@ -10,11 +15,11 @@ import (
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_lock"
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_sticky"
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_queue_item"
clientv3 "go.etcd.io/etcd/client/v3"
)

type Store struct {
client *clientv3.Client
errCh chan error
ActivityLock *activity_lock.Store
ActivityQueueItem *activity_queue_item.Store
HistoryEvent *history_event.Store
Expand All @@ -28,6 +33,7 @@ type Store struct {
func NewStore(client *clientv3.Client, root string) *Store {
return &Store{
client: client,
errCh: make(chan error, 1),
ActivityLock: activity_lock.NewStore(client, root),
ActivityQueueItem: activity_queue_item.NewStore(client, root),
HistoryEvent: history_event.NewStore(client, root),
Expand All @@ -42,3 +48,25 @@ func NewStore(client *clientv3.Client, root string) *Store {
func (s *Store) Txn(ops ...storage.TxnOperation) storage.Txn {
return storage.NewTxn(s.client, ops...)
}

func (s *Store) StartCaches(ctx context.Context) error {
if err := s.WorkflowQueueItem.StartCache(ctx); err != nil {
return fmt.Errorf("failed to start workflow queue item cache: %w", err)
}
if err := s.ActivityQueueItem.StartCache(ctx); err != nil {
return fmt.Errorf("failed to start activity queue item cache: %w", err)
}
s.WorkflowQueueItem.PropagateErrors(ctx, s.errCh)
s.ActivityQueueItem.PropagateErrors(ctx, s.errCh)

return nil
}

func (s *Store) StopCaches() {
s.WorkflowQueueItem.StopCache()
s.ActivityQueueItem.StopCache()
}

func (s *Store) Error() <-chan error {
return s.errCh
}
Loading