diff --git a/Makefile b/Makefile index a5812da4..1e62d2e2 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ LOG_LEVEL ?= info DEV_IMAGE_REPO ?= ghcr.io/pgedge CONTROL_PLANE_IMAGE_REPO ?= host.docker.internal:5000/control-plane TEST_RERUN_FAILS ?= 0 +TEST_DISABLE_CACHE ?= 0 E2E_FIXTURE ?= E2E_PARALLEL ?= 8 E2E_RUN ?= @@ -51,6 +52,9 @@ cluster_test_args=-tags=cluster_test -count=1 -timeout=10m \ $(if $(CLUSTER_TEST_IMAGE_TAG),-image-tag $(CLUSTER_TEST_IMAGE_TAG)) \ $(if $(CLUSTER_TEST_DATA_DIR),-data-dir $(CLUSTER_TEST_DATA_DIR)) +test_disable_cache=$(if $(filter 1,$(TEST_DISABLE_CACHE)),-count=1) +workflows_backend_skip=-skip=Test_EtcdBackendE2E/AutoExpiration/StartsWorkflowAndRemoves + # Automatically adds junit output named after the rule, e.g. # 'test-e2e-results.xml' in CI environment. gotestsum=$(gobin)/gotestsum \ @@ -70,7 +74,9 @@ test: $(gotestsum) \ --format-hide-empty-pkg \ --rerun-fails=$(TEST_RERUN_FAILS) \ - --packages='./...' + --packages='./...' \ + -- \ + $(test_disable_cache) .PHONY: test-etcd test-etcd-lifecycle: @@ -79,8 +85,15 @@ test-etcd-lifecycle: --rerun-fails=$(TEST_RERUN_FAILS) \ --packages='./server/internal/etcd/...' \ -- \ + $(test_disable_cache) \ -tags=etcd_lifecycle_test +# We skip StartsWorkflowAndRemoves because it contains a race condition that's +# much more prevalent now that we're executing workflows more quickly. This test +# uses the "autoexpire" feature to remove workflows that are older than 1 +# millisecond. It starts a workflow, waits for the result, and then waits for +# the workflow to be removed. Occasionally, the workflow gets removed while the +# "waiting for result" step is still polling the workflow status. .PHONY: test-workflows-backend test-workflows-backend: $(gotestsum) \ @@ -88,7 +101,9 @@ test-workflows-backend: --rerun-fails=$(TEST_RERUN_FAILS) \ --packages='./server/internal/workflows/backend/etcd/...' \ -- \ - -tags=workflows_backend_test + $(test_disable_cache) \ + -tags=workflows_backend_test \ + $(workflows_backend_skip) .PHONY: test-ci test-ci: @@ -98,7 +113,9 @@ test-ci: --rerun-fails=$(TEST_RERUN_FAILS) \ --packages='./...' \ -- \ - -tags=workflows_backend_test,etcd_lifecycle_test + -count=1 \ + -tags=workflows_backend_test,etcd_lifecycle_test \ + $(workflows_backend_skip) .PHONY: test-e2e test-e2e: diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 75ceb42f..203e699d 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -212,6 +212,7 @@ func (a *App) runInitialized(parentCtx context.Context) error { if err := worker.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start worker: %w", err)) } + a.addErrorProducer(parentCtx, worker) if err := a.api.ServePostInit(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to serve post-init API: %w", err)) diff --git a/server/internal/storage/cache.go b/server/internal/storage/cache.go index 8e390996..50fff801 100644 --- a/server/internal/storage/cache.go +++ b/server/internal/storage/cache.go @@ -167,6 +167,10 @@ func (c *cache[V]) Stop() { if c.op != nil { c.op.Close() c.op = nil + + c.mu.Lock() + defer c.mu.Unlock() + c.lastWatchRevision = 0 c.items = map[string]*cachedValue{} c.tombstones = nil diff --git a/server/internal/workflows/backend/etcd/activity_queue_item/store.go b/server/internal/workflows/backend/etcd/activity_queue_item/store.go index bf08408f..a2b20825 100644 --- a/server/internal/workflows/backend/etcd/activity_queue_item/store.go +++ b/server/internal/workflows/backend/etcd/activity_queue_item/store.go @@ -1,6 +1,7 @@ package activity_queue_item import ( + "context" "time" "github.com/cschleiden/go-workflows/backend/history" @@ -23,9 +24,12 @@ func (v *Value) UpdateLastLocked() { v.LastLocked = &now } +// Store is a storage implementation for activity queue items. StartCache must +// be called before the store can be used. type Store struct { client *clientv3.Client root string + cache storage.Cache[*Value] } func NewStore(client *clientv3.Client, root string) *Store { @@ -47,36 +51,53 @@ func (s *Store) Key(queue, instanceID, eventID string) string { return storage.Key(s.QueuePrefix(queue), instanceID, eventID) } +func (s *Store) StartCache(ctx context.Context) error { + if s.cache != nil { + return nil + } + s.cache = storage.NewCache(s.client, s.AllQueuesPrefix(), func(item *Value) string { + return s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) + }) + return s.cache.Start(ctx) +} + +func (s *Store) StopCache() { + if s.cache != nil { + s.cache.Stop() + } +} + +func (s *Store) PropagateErrors(ctx context.Context, ch chan error) { + s.cache.PropagateErrors(ctx, ch) +} + func (s *Store) GetAll() storage.GetMultipleOp[*Value] { - return storage.NewGetPrefixOp[*Value](s.client, s.AllQueuesPrefix()) + return s.cache.GetPrefix(s.AllQueuesPrefix()) } func (s *Store) GetByKey(queue, instanceID, eventID string) storage.GetOp[*Value] { key := s.Key(queue, instanceID, eventID) - return storage.NewGetOp[*Value](s.client, key) + return s.cache.Get(key) } func (s *Store) GetByQueue(queue string) storage.GetMultipleOp[*Value] { prefix := s.QueuePrefix(queue) - return storage.NewGetPrefixOp[*Value](s.client, prefix) + return s.cache.GetPrefix(prefix) } func (s *Store) Create(item *Value) storage.PutOp[*Value] { - key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) - return storage.NewCreateOp(s.client, key, item) + return s.cache.Create(item) } func (s *Store) Update(item *Value) storage.PutOp[*Value] { - key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) - return storage.NewUpdateOp(s.client, key, item) + return s.cache.Update(item) } func (s *Store) DeleteByKey(queue, instanceID, eventID string) storage.DeleteOp { key := s.Key(queue, instanceID, eventID) - return storage.NewDeleteKeyOp(s.client, key) + return s.cache.DeleteByKey(key) } func (s *Store) DeleteItem(item *Value) storage.DeleteValueOp[*Value] { - key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) - return storage.NewDeleteValueOp(s.client, key, item) + return s.cache.DeleteValue(item) } diff --git a/server/internal/workflows/backend/etcd/etcd.go b/server/internal/workflows/backend/etcd/etcd.go index cc077251..8c6fb890 100644 --- a/server/internal/workflows/backend/etcd/etcd.go +++ b/server/internal/workflows/backend/etcd/etcd.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "slices" + "sync" "time" "github.com/cschleiden/go-workflows/backend" @@ -34,6 +35,8 @@ type Backend struct { options *backend.Options workerID string workerInstanceID string + workflowMu sync.Mutex + activityMu sync.Mutex } func NewBackend(store *Store, options *backend.Options, workerID string) *Backend { @@ -45,6 +48,14 @@ func NewBackend(store *Store, options *backend.Options, workerID string) *Backen } } +func (b *Backend) StartCaches(ctx context.Context) error { + return b.store.StartCaches(ctx) +} + +func (b *Backend) StopCaches() { + b.store.StopCaches() +} + func (b *Backend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error { // Check for existing active instance execution instances, err := b.store.WorkflowInstance. @@ -244,6 +255,11 @@ func (b *Backend) PrepareActivityQueues(ctx context.Context, queues []workflow.Q } func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*backend.WorkflowTask, error) { + // This lock reduces unnecessary contention from concurrent calls within the + // same worker. + b.workflowMu.Lock() + defer b.workflowMu.Unlock() + for _, queue := range queues { items, err := b.store.WorkflowQueueItem. GetByQueue(string(queue)). @@ -578,6 +594,11 @@ func (b *Backend) CompleteWorkflowTask( } func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue) (*backend.ActivityTask, error) { + // This lock reduces unnecessary contention from concurrent calls within the + // same worker. + b.activityMu.Lock() + defer b.activityMu.Unlock() + for _, queue := range queues { items, err := b.store.ActivityQueueItem. GetByQueue(string(queue)). @@ -795,6 +816,10 @@ func (b *Backend) FeatureSupported(feature backend.Feature) bool { return false } +func (b *Backend) Error() <-chan error { + return b.store.Error() +} + func sortPendingEvents(events []*pending_event.Value) { slices.SortStableFunc(events, func(a *pending_event.Value, b *pending_event.Value) int { if a.CreatedAt > 0 && b.CreatedAt > 0 { diff --git a/server/internal/workflows/backend/etcd/etcd_test.go b/server/internal/workflows/backend/etcd/etcd_test.go index 1c92fb08..fed3975b 100644 --- a/server/internal/workflows/backend/etcd/etcd_test.go +++ b/server/internal/workflows/backend/etcd/etcd_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/backend/history" @@ -23,7 +24,10 @@ func Test_EtcdBackend(t *testing.T) { test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend { opts := backend.ApplyOptions(options...) - return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + require.NoError(t, backend.StartCaches(t.Context())) + t.Cleanup(backend.StopCaches) + return backend }, nil) } @@ -33,7 +37,10 @@ func Test_EtcdBackendE2E(t *testing.T) { test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend { opts := backend.ApplyOptions(options...) - return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + require.NoError(t, backend.StartCaches(t.Context())) + t.Cleanup(backend.StopCaches) + return backend }, nil) } diff --git a/server/internal/workflows/backend/etcd/store.go b/server/internal/workflows/backend/etcd/store.go index d6809c4e..2edc2e8e 100644 --- a/server/internal/workflows/backend/etcd/store.go +++ b/server/internal/workflows/backend/etcd/store.go @@ -1,6 +1,11 @@ package etcd import ( + "context" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/pgEdge/control-plane/server/internal/storage" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_lock" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_queue_item" @@ -10,11 +15,11 @@ import ( "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_lock" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_sticky" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_queue_item" - clientv3 "go.etcd.io/etcd/client/v3" ) type Store struct { client *clientv3.Client + errCh chan error ActivityLock *activity_lock.Store ActivityQueueItem *activity_queue_item.Store HistoryEvent *history_event.Store @@ -28,6 +33,7 @@ type Store struct { func NewStore(client *clientv3.Client, root string) *Store { return &Store{ client: client, + errCh: make(chan error, 1), ActivityLock: activity_lock.NewStore(client, root), ActivityQueueItem: activity_queue_item.NewStore(client, root), HistoryEvent: history_event.NewStore(client, root), @@ -42,3 +48,25 @@ func NewStore(client *clientv3.Client, root string) *Store { func (s *Store) Txn(ops ...storage.TxnOperation) storage.Txn { return storage.NewTxn(s.client, ops...) } + +func (s *Store) StartCaches(ctx context.Context) error { + if err := s.WorkflowQueueItem.StartCache(ctx); err != nil { + return fmt.Errorf("failed to start workflow queue item cache: %w", err) + } + if err := s.ActivityQueueItem.StartCache(ctx); err != nil { + return fmt.Errorf("failed to start activity queue item cache: %w", err) + } + s.WorkflowQueueItem.PropagateErrors(ctx, s.errCh) + s.ActivityQueueItem.PropagateErrors(ctx, s.errCh) + + return nil +} + +func (s *Store) StopCaches() { + s.WorkflowQueueItem.StopCache() + s.ActivityQueueItem.StopCache() +} + +func (s *Store) Error() <-chan error { + return s.errCh +} diff --git a/server/internal/workflows/backend/etcd/workflow_queue_item/store.go b/server/internal/workflows/backend/etcd/workflow_queue_item/store.go index cafb6ee0..8e572bc2 100644 --- a/server/internal/workflows/backend/etcd/workflow_queue_item/store.go +++ b/server/internal/workflows/backend/etcd/workflow_queue_item/store.go @@ -1,6 +1,7 @@ package workflow_queue_item import ( + "context" "time" "github.com/cschleiden/go-workflows/backend/metadata" @@ -26,9 +27,12 @@ func (v *Value) UpdateLastLocked() { v.LastLocked = &now } +// Store is a storage implementation for workflow queue items. StartCache must +// be called before the store can be used. type Store struct { client *clientv3.Client root string + cache storage.Cache[*Value] } func NewStore(client *clientv3.Client, root string) *Store { @@ -54,41 +58,67 @@ func (s *Store) Key(queue, instanceID, executionID string) string { return storage.Key(s.InstanceIDPrefix(queue, instanceID), executionID) } +func (s *Store) StartCache(ctx context.Context) error { + if s.cache != nil { + return nil + } + cache := storage.NewCache(s.client, s.AllQueuesPrefix(), func(item *Value) string { + return s.Key( + string(item.Queue), + item.WorkflowInstance.InstanceID, + item.WorkflowInstance.ExecutionID, + ) + }) + if err := cache.Start(ctx); err != nil { + return err + } + s.cache = cache + return nil +} + +func (s *Store) StopCache() { + if s.cache != nil { + s.cache.Stop() + s.cache = nil + } +} + func (s *Store) GetAll() storage.GetMultipleOp[*Value] { - return storage.NewGetPrefixOp[*Value](s.client, s.AllQueuesPrefix()) + return s.cache.GetPrefix(s.AllQueuesPrefix()) +} + +func (s *Store) PropagateErrors(ctx context.Context, ch chan error) { + s.cache.PropagateErrors(ctx, ch) } func (s *Store) GetByKey(queue, instanceID, executionID string) storage.GetOp[*Value] { key := s.Key(queue, instanceID, executionID) - return storage.NewGetOp[*Value](s.client, key) + return s.cache.Get(key) } func (s *Store) GetByInstanceID(queue, instanceID string) storage.GetMultipleOp[*Value] { prefix := s.InstanceIDPrefix(queue, instanceID) - return storage.NewGetPrefixOp[*Value](s.client, prefix) + return s.cache.GetPrefix(prefix) } func (s *Store) GetByQueue(queue string) storage.GetMultipleOp[*Value] { prefix := s.QueuePrefix(queue) - return storage.NewGetPrefixOp[*Value](s.client, prefix) + return s.cache.GetPrefix(prefix) } func (s *Store) Create(item *Value) storage.PutOp[*Value] { - key := s.Key(string(item.Queue), item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID) - return storage.NewCreateOp(s.client, key, item) + return s.cache.Create(item) } func (s *Store) Put(item *Value) storage.PutOp[*Value] { - key := s.Key(string(item.Queue), item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID) - return storage.NewPutOp(s.client, key, item) + return s.cache.Put(item) } func (s *Store) Update(item *Value) storage.PutOp[*Value] { - key := s.Key(string(item.Queue), item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID) - return storage.NewUpdateOp(s.client, key, item) + return s.cache.Update(item) } func (s *Store) DeleteByKey(queue, instanceID, executionID string) storage.DeleteOp { key := s.Key(queue, instanceID, executionID) - return storage.NewDeleteKeyOp(s.client, key) + return s.cache.DeleteByKey(key) } diff --git a/server/internal/workflows/provide.go b/server/internal/workflows/provide.go index ec66a099..147ac501 100644 --- a/server/internal/workflows/provide.go +++ b/server/internal/workflows/provide.go @@ -28,7 +28,7 @@ func provideWorker(i *do.Injector) { if err != nil { return nil, err } - be, err := do.Invoke[backend.Backend](i) + be, err := do.Invoke[*etcd.Backend](i) if err != nil { return nil, err } @@ -64,7 +64,7 @@ func provideWorkflows(i *do.Injector) { func provideClient(i *do.Injector) { do.Provide(i, func(i *do.Injector) (*client.Client, error) { - be, err := do.Invoke[backend.Backend](i) + be, err := do.Invoke[*etcd.Backend](i) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func provideClient(i *do.Injector) { } func provideBackend(i *do.Injector) { - do.Provide(i, func(i *do.Injector) (backend.Backend, error) { + do.Provide(i, func(i *do.Injector) (*etcd.Backend, error) { cfg, err := do.Invoke[config.Config](i) if err != nil { return nil, err diff --git a/server/internal/workflows/worker.go b/server/internal/workflows/worker.go index dc5e3677..5c6eb94a 100644 --- a/server/internal/workflows/worker.go +++ b/server/internal/workflows/worker.go @@ -3,15 +3,14 @@ package workflows import ( "context" "fmt" - "time" - "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/worker" "github.com/cschleiden/go-workflows/workflow" "github.com/rs/zerolog" "github.com/samber/do" "github.com/pgEdge/control-plane/server/internal/logging" + "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd" ) var _ do.Shutdownable = (*Worker)(nil) @@ -26,19 +25,20 @@ type Worker struct { workflows *Workflows ctx context.Context cancel context.CancelFunc + be *etcd.Backend } -func NewWorker(loggerFactory *logging.Factory, be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { +func NewWorker(loggerFactory *logging.Factory, be *etcd.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { queues, err := orch.WorkerQueues() if err != nil { return nil, fmt.Errorf("failed to get worker queues: %w", err) } + // We're using the default activity and workflow polling intervals here, + // which are both 200ms. opts := worker.DefaultOptions opts.WorkflowQueues = queues opts.ActivityQueues = queues - opts.ActivityPollingInterval = 500 * time.Millisecond - opts.WorkflowPollingInterval = 500 * time.Millisecond w := worker.New(be, &opts) if err := workflows.Register(w); err != nil { @@ -49,6 +49,7 @@ func NewWorker(loggerFactory *logging.Factory, be backend.Backend, workflows *Wo logger: loggerFactory.Logger(logging.ComponentWorkflowsWorker), worker: w, workflows: workflows, + be: be, }, nil } @@ -56,6 +57,9 @@ func (w *Worker) Start(ctx context.Context) error { if w.cancel != nil { return fmt.Errorf("workflows worker already started") } + if err := w.be.StartCaches(ctx); err != nil { + return err + } w.logger.Debug().Msg("starting workflows worker") @@ -79,8 +83,14 @@ func (w *Worker) Shutdown() error { w.cancel() } + w.be.StopCaches() + if err := w.worker.WaitForCompletion(); err != nil { return fmt.Errorf("failed to wait for active tasks to complete: %w", err) } return nil } + +func (w *Worker) Error() <-chan error { + return w.be.Error() +}