From 71203970f81082d2e1853453f0a748774b07db81 Mon Sep 17 00:00:00 2001 From: Jason Lynch Date: Wed, 18 Mar 2026 13:27:35 -0400 Subject: [PATCH] feat: use cache for workflow queues Updates the workflows backend to use the write-through cache for workflow and activity queue items. This eliminates a frequent, expensive range query, and enables us to reduce the queue polling frequency to the default 200ms without overloading Etcd. We were previously performing these range queries every 500ms, but the queries themselves could take up to 1.5s - even when the range was empty. --- Makefile | 23 ++++++-- server/internal/app/app.go | 1 + server/internal/storage/cache.go | 4 ++ .../backend/etcd/activity_queue_item/store.go | 41 +++++++++++---- .../internal/workflows/backend/etcd/etcd.go | 25 +++++++++ .../workflows/backend/etcd/etcd_test.go | 11 +++- .../internal/workflows/backend/etcd/store.go | 30 ++++++++++- .../backend/etcd/workflow_queue_item/store.go | 52 +++++++++++++++---- server/internal/workflows/provide.go | 6 +-- server/internal/workflows/worker.go | 20 +++++-- 10 files changed, 178 insertions(+), 35 deletions(-) diff --git a/Makefile b/Makefile index a5812da4..1e62d2e2 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ LOG_LEVEL ?= info DEV_IMAGE_REPO ?= ghcr.io/pgedge CONTROL_PLANE_IMAGE_REPO ?= host.docker.internal:5000/control-plane TEST_RERUN_FAILS ?= 0 +TEST_DISABLE_CACHE ?= 0 E2E_FIXTURE ?= E2E_PARALLEL ?= 8 E2E_RUN ?= @@ -51,6 +52,9 @@ cluster_test_args=-tags=cluster_test -count=1 -timeout=10m \ $(if $(CLUSTER_TEST_IMAGE_TAG),-image-tag $(CLUSTER_TEST_IMAGE_TAG)) \ $(if $(CLUSTER_TEST_DATA_DIR),-data-dir $(CLUSTER_TEST_DATA_DIR)) +test_disable_cache=$(if $(filter 1,$(TEST_DISABLE_CACHE)),-count=1) +workflows_backend_skip=-skip=Test_EtcdBackendE2E/AutoExpiration/StartsWorkflowAndRemoves + # Automatically adds junit output named after the rule, e.g. # 'test-e2e-results.xml' in CI environment. gotestsum=$(gobin)/gotestsum \ @@ -70,7 +74,9 @@ test: $(gotestsum) \ --format-hide-empty-pkg \ --rerun-fails=$(TEST_RERUN_FAILS) \ - --packages='./...' + --packages='./...' \ + -- \ + $(test_disable_cache) .PHONY: test-etcd test-etcd-lifecycle: @@ -79,8 +85,15 @@ test-etcd-lifecycle: --rerun-fails=$(TEST_RERUN_FAILS) \ --packages='./server/internal/etcd/...' \ -- \ + $(test_disable_cache) \ -tags=etcd_lifecycle_test +# We skip StartsWorkflowAndRemoves because it contains a race condition that's +# much more prevalent now that we're executing workflows more quickly. This test +# uses the "autoexpire" feature to remove workflows that are older than 1 +# millisecond. It starts a workflow, waits for the result, and then waits for +# the workflow to be removed. Occasionally, the workflow gets removed while the +# "waiting for result" step is still polling the workflow status. .PHONY: test-workflows-backend test-workflows-backend: $(gotestsum) \ @@ -88,7 +101,9 @@ test-workflows-backend: --rerun-fails=$(TEST_RERUN_FAILS) \ --packages='./server/internal/workflows/backend/etcd/...' \ -- \ - -tags=workflows_backend_test + $(test_disable_cache) \ + -tags=workflows_backend_test \ + $(workflows_backend_skip) .PHONY: test-ci test-ci: @@ -98,7 +113,9 @@ test-ci: --rerun-fails=$(TEST_RERUN_FAILS) \ --packages='./...' \ -- \ - -tags=workflows_backend_test,etcd_lifecycle_test + -count=1 \ + -tags=workflows_backend_test,etcd_lifecycle_test \ + $(workflows_backend_skip) .PHONY: test-e2e test-e2e: diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 75ceb42f..203e699d 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -212,6 +212,7 @@ func (a *App) runInitialized(parentCtx context.Context) error { if err := worker.Start(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to start worker: %w", err)) } + a.addErrorProducer(parentCtx, worker) if err := a.api.ServePostInit(a.serviceCtx); err != nil { return handleError(fmt.Errorf("failed to serve post-init API: %w", err)) diff --git a/server/internal/storage/cache.go b/server/internal/storage/cache.go index 8e390996..50fff801 100644 --- a/server/internal/storage/cache.go +++ b/server/internal/storage/cache.go @@ -167,6 +167,10 @@ func (c *cache[V]) Stop() { if c.op != nil { c.op.Close() c.op = nil + + c.mu.Lock() + defer c.mu.Unlock() + c.lastWatchRevision = 0 c.items = map[string]*cachedValue{} c.tombstones = nil diff --git a/server/internal/workflows/backend/etcd/activity_queue_item/store.go b/server/internal/workflows/backend/etcd/activity_queue_item/store.go index bf08408f..a2b20825 100644 --- a/server/internal/workflows/backend/etcd/activity_queue_item/store.go +++ b/server/internal/workflows/backend/etcd/activity_queue_item/store.go @@ -1,6 +1,7 @@ package activity_queue_item import ( + "context" "time" "github.com/cschleiden/go-workflows/backend/history" @@ -23,9 +24,12 @@ func (v *Value) UpdateLastLocked() { v.LastLocked = &now } +// Store is a storage implementation for activity queue items. StartCache must +// be called before the store can be used. type Store struct { client *clientv3.Client root string + cache storage.Cache[*Value] } func NewStore(client *clientv3.Client, root string) *Store { @@ -47,36 +51,53 @@ func (s *Store) Key(queue, instanceID, eventID string) string { return storage.Key(s.QueuePrefix(queue), instanceID, eventID) } +func (s *Store) StartCache(ctx context.Context) error { + if s.cache != nil { + return nil + } + s.cache = storage.NewCache(s.client, s.AllQueuesPrefix(), func(item *Value) string { + return s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) + }) + return s.cache.Start(ctx) +} + +func (s *Store) StopCache() { + if s.cache != nil { + s.cache.Stop() + } +} + +func (s *Store) PropagateErrors(ctx context.Context, ch chan error) { + s.cache.PropagateErrors(ctx, ch) +} + func (s *Store) GetAll() storage.GetMultipleOp[*Value] { - return storage.NewGetPrefixOp[*Value](s.client, s.AllQueuesPrefix()) + return s.cache.GetPrefix(s.AllQueuesPrefix()) } func (s *Store) GetByKey(queue, instanceID, eventID string) storage.GetOp[*Value] { key := s.Key(queue, instanceID, eventID) - return storage.NewGetOp[*Value](s.client, key) + return s.cache.Get(key) } func (s *Store) GetByQueue(queue string) storage.GetMultipleOp[*Value] { prefix := s.QueuePrefix(queue) - return storage.NewGetPrefixOp[*Value](s.client, prefix) + return s.cache.GetPrefix(prefix) } func (s *Store) Create(item *Value) storage.PutOp[*Value] { - key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) - return storage.NewCreateOp(s.client, key, item) + return s.cache.Create(item) } func (s *Store) Update(item *Value) storage.PutOp[*Value] { - key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) - return storage.NewUpdateOp(s.client, key, item) + return s.cache.Update(item) } func (s *Store) DeleteByKey(queue, instanceID, eventID string) storage.DeleteOp { key := s.Key(queue, instanceID, eventID) - return storage.NewDeleteKeyOp(s.client, key) + return s.cache.DeleteByKey(key) } func (s *Store) DeleteItem(item *Value) storage.DeleteValueOp[*Value] { - key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID) - return storage.NewDeleteValueOp(s.client, key, item) + return s.cache.DeleteValue(item) } diff --git a/server/internal/workflows/backend/etcd/etcd.go b/server/internal/workflows/backend/etcd/etcd.go index cc077251..8c6fb890 100644 --- a/server/internal/workflows/backend/etcd/etcd.go +++ b/server/internal/workflows/backend/etcd/etcd.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "slices" + "sync" "time" "github.com/cschleiden/go-workflows/backend" @@ -34,6 +35,8 @@ type Backend struct { options *backend.Options workerID string workerInstanceID string + workflowMu sync.Mutex + activityMu sync.Mutex } func NewBackend(store *Store, options *backend.Options, workerID string) *Backend { @@ -45,6 +48,14 @@ func NewBackend(store *Store, options *backend.Options, workerID string) *Backen } } +func (b *Backend) StartCaches(ctx context.Context) error { + return b.store.StartCaches(ctx) +} + +func (b *Backend) StopCaches() { + b.store.StopCaches() +} + func (b *Backend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error { // Check for existing active instance execution instances, err := b.store.WorkflowInstance. @@ -244,6 +255,11 @@ func (b *Backend) PrepareActivityQueues(ctx context.Context, queues []workflow.Q } func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*backend.WorkflowTask, error) { + // This lock reduces unnecessary contention from concurrent calls within the + // same worker. + b.workflowMu.Lock() + defer b.workflowMu.Unlock() + for _, queue := range queues { items, err := b.store.WorkflowQueueItem. GetByQueue(string(queue)). @@ -578,6 +594,11 @@ func (b *Backend) CompleteWorkflowTask( } func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue) (*backend.ActivityTask, error) { + // This lock reduces unnecessary contention from concurrent calls within the + // same worker. + b.activityMu.Lock() + defer b.activityMu.Unlock() + for _, queue := range queues { items, err := b.store.ActivityQueueItem. GetByQueue(string(queue)). @@ -795,6 +816,10 @@ func (b *Backend) FeatureSupported(feature backend.Feature) bool { return false } +func (b *Backend) Error() <-chan error { + return b.store.Error() +} + func sortPendingEvents(events []*pending_event.Value) { slices.SortStableFunc(events, func(a *pending_event.Value, b *pending_event.Value) int { if a.CreatedAt > 0 && b.CreatedAt > 0 { diff --git a/server/internal/workflows/backend/etcd/etcd_test.go b/server/internal/workflows/backend/etcd/etcd_test.go index 1c92fb08..fed3975b 100644 --- a/server/internal/workflows/backend/etcd/etcd_test.go +++ b/server/internal/workflows/backend/etcd/etcd_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/backend/history" @@ -23,7 +24,10 @@ func Test_EtcdBackend(t *testing.T) { test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend { opts := backend.ApplyOptions(options...) - return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + require.NoError(t, backend.StartCaches(t.Context())) + t.Cleanup(backend.StopCaches) + return backend }, nil) } @@ -33,7 +37,10 @@ func Test_EtcdBackendE2E(t *testing.T) { test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend { opts := backend.ApplyOptions(options...) - return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString()) + require.NoError(t, backend.StartCaches(t.Context())) + t.Cleanup(backend.StopCaches) + return backend }, nil) } diff --git a/server/internal/workflows/backend/etcd/store.go b/server/internal/workflows/backend/etcd/store.go index d6809c4e..2edc2e8e 100644 --- a/server/internal/workflows/backend/etcd/store.go +++ b/server/internal/workflows/backend/etcd/store.go @@ -1,6 +1,11 @@ package etcd import ( + "context" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/pgEdge/control-plane/server/internal/storage" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_lock" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_queue_item" @@ -10,11 +15,11 @@ import ( "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_lock" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_sticky" "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_queue_item" - clientv3 "go.etcd.io/etcd/client/v3" ) type Store struct { client *clientv3.Client + errCh chan error ActivityLock *activity_lock.Store ActivityQueueItem *activity_queue_item.Store HistoryEvent *history_event.Store @@ -28,6 +33,7 @@ type Store struct { func NewStore(client *clientv3.Client, root string) *Store { return &Store{ client: client, + errCh: make(chan error, 1), ActivityLock: activity_lock.NewStore(client, root), ActivityQueueItem: activity_queue_item.NewStore(client, root), HistoryEvent: history_event.NewStore(client, root), @@ -42,3 +48,25 @@ func NewStore(client *clientv3.Client, root string) *Store { func (s *Store) Txn(ops ...storage.TxnOperation) storage.Txn { return storage.NewTxn(s.client, ops...) } + +func (s *Store) StartCaches(ctx context.Context) error { + if err := s.WorkflowQueueItem.StartCache(ctx); err != nil { + return fmt.Errorf("failed to start workflow queue item cache: %w", err) + } + if err := s.ActivityQueueItem.StartCache(ctx); err != nil { + return fmt.Errorf("failed to start activity queue item cache: %w", err) + } + s.WorkflowQueueItem.PropagateErrors(ctx, s.errCh) + s.ActivityQueueItem.PropagateErrors(ctx, s.errCh) + + return nil +} + +func (s *Store) StopCaches() { + s.WorkflowQueueItem.StopCache() + s.ActivityQueueItem.StopCache() +} + +func (s *Store) Error() <-chan error { + return s.errCh +} diff --git a/server/internal/workflows/backend/etcd/workflow_queue_item/store.go b/server/internal/workflows/backend/etcd/workflow_queue_item/store.go index cafb6ee0..8e572bc2 100644 --- a/server/internal/workflows/backend/etcd/workflow_queue_item/store.go +++ b/server/internal/workflows/backend/etcd/workflow_queue_item/store.go @@ -1,6 +1,7 @@ package workflow_queue_item import ( + "context" "time" "github.com/cschleiden/go-workflows/backend/metadata" @@ -26,9 +27,12 @@ func (v *Value) UpdateLastLocked() { v.LastLocked = &now } +// Store is a storage implementation for workflow queue items. StartCache must +// be called before the store can be used. type Store struct { client *clientv3.Client root string + cache storage.Cache[*Value] } func NewStore(client *clientv3.Client, root string) *Store { @@ -54,41 +58,67 @@ func (s *Store) Key(queue, instanceID, executionID string) string { return storage.Key(s.InstanceIDPrefix(queue, instanceID), executionID) } +func (s *Store) StartCache(ctx context.Context) error { + if s.cache != nil { + return nil + } + cache := storage.NewCache(s.client, s.AllQueuesPrefix(), func(item *Value) string { + return s.Key( + string(item.Queue), + item.WorkflowInstance.InstanceID, + item.WorkflowInstance.ExecutionID, + ) + }) + if err := cache.Start(ctx); err != nil { + return err + } + s.cache = cache + return nil +} + +func (s *Store) StopCache() { + if s.cache != nil { + s.cache.Stop() + s.cache = nil + } +} + func (s *Store) GetAll() storage.GetMultipleOp[*Value] { - return storage.NewGetPrefixOp[*Value](s.client, s.AllQueuesPrefix()) + return s.cache.GetPrefix(s.AllQueuesPrefix()) +} + +func (s *Store) PropagateErrors(ctx context.Context, ch chan error) { + s.cache.PropagateErrors(ctx, ch) } func (s *Store) GetByKey(queue, instanceID, executionID string) storage.GetOp[*Value] { key := s.Key(queue, instanceID, executionID) - return storage.NewGetOp[*Value](s.client, key) + return s.cache.Get(key) } func (s *Store) GetByInstanceID(queue, instanceID string) storage.GetMultipleOp[*Value] { prefix := s.InstanceIDPrefix(queue, instanceID) - return storage.NewGetPrefixOp[*Value](s.client, prefix) + return s.cache.GetPrefix(prefix) } func (s *Store) GetByQueue(queue string) storage.GetMultipleOp[*Value] { prefix := s.QueuePrefix(queue) - return storage.NewGetPrefixOp[*Value](s.client, prefix) + return s.cache.GetPrefix(prefix) } func (s *Store) Create(item *Value) storage.PutOp[*Value] { - key := s.Key(string(item.Queue), item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID) - return storage.NewCreateOp(s.client, key, item) + return s.cache.Create(item) } func (s *Store) Put(item *Value) storage.PutOp[*Value] { - key := s.Key(string(item.Queue), item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID) - return storage.NewPutOp(s.client, key, item) + return s.cache.Put(item) } func (s *Store) Update(item *Value) storage.PutOp[*Value] { - key := s.Key(string(item.Queue), item.WorkflowInstance.InstanceID, item.WorkflowInstance.ExecutionID) - return storage.NewUpdateOp(s.client, key, item) + return s.cache.Update(item) } func (s *Store) DeleteByKey(queue, instanceID, executionID string) storage.DeleteOp { key := s.Key(queue, instanceID, executionID) - return storage.NewDeleteKeyOp(s.client, key) + return s.cache.DeleteByKey(key) } diff --git a/server/internal/workflows/provide.go b/server/internal/workflows/provide.go index ec66a099..147ac501 100644 --- a/server/internal/workflows/provide.go +++ b/server/internal/workflows/provide.go @@ -28,7 +28,7 @@ func provideWorker(i *do.Injector) { if err != nil { return nil, err } - be, err := do.Invoke[backend.Backend](i) + be, err := do.Invoke[*etcd.Backend](i) if err != nil { return nil, err } @@ -64,7 +64,7 @@ func provideWorkflows(i *do.Injector) { func provideClient(i *do.Injector) { do.Provide(i, func(i *do.Injector) (*client.Client, error) { - be, err := do.Invoke[backend.Backend](i) + be, err := do.Invoke[*etcd.Backend](i) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func provideClient(i *do.Injector) { } func provideBackend(i *do.Injector) { - do.Provide(i, func(i *do.Injector) (backend.Backend, error) { + do.Provide(i, func(i *do.Injector) (*etcd.Backend, error) { cfg, err := do.Invoke[config.Config](i) if err != nil { return nil, err diff --git a/server/internal/workflows/worker.go b/server/internal/workflows/worker.go index dc5e3677..5c6eb94a 100644 --- a/server/internal/workflows/worker.go +++ b/server/internal/workflows/worker.go @@ -3,15 +3,14 @@ package workflows import ( "context" "fmt" - "time" - "github.com/cschleiden/go-workflows/backend" "github.com/cschleiden/go-workflows/worker" "github.com/cschleiden/go-workflows/workflow" "github.com/rs/zerolog" "github.com/samber/do" "github.com/pgEdge/control-plane/server/internal/logging" + "github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd" ) var _ do.Shutdownable = (*Worker)(nil) @@ -26,19 +25,20 @@ type Worker struct { workflows *Workflows ctx context.Context cancel context.CancelFunc + be *etcd.Backend } -func NewWorker(loggerFactory *logging.Factory, be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { +func NewWorker(loggerFactory *logging.Factory, be *etcd.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error) { queues, err := orch.WorkerQueues() if err != nil { return nil, fmt.Errorf("failed to get worker queues: %w", err) } + // We're using the default activity and workflow polling intervals here, + // which are both 200ms. opts := worker.DefaultOptions opts.WorkflowQueues = queues opts.ActivityQueues = queues - opts.ActivityPollingInterval = 500 * time.Millisecond - opts.WorkflowPollingInterval = 500 * time.Millisecond w := worker.New(be, &opts) if err := workflows.Register(w); err != nil { @@ -49,6 +49,7 @@ func NewWorker(loggerFactory *logging.Factory, be backend.Backend, workflows *Wo logger: loggerFactory.Logger(logging.ComponentWorkflowsWorker), worker: w, workflows: workflows, + be: be, }, nil } @@ -56,6 +57,9 @@ func (w *Worker) Start(ctx context.Context) error { if w.cancel != nil { return fmt.Errorf("workflows worker already started") } + if err := w.be.StartCaches(ctx); err != nil { + return err + } w.logger.Debug().Msg("starting workflows worker") @@ -79,8 +83,14 @@ func (w *Worker) Shutdown() error { w.cancel() } + w.be.StopCaches() + if err := w.worker.WaitForCompletion(); err != nil { return fmt.Errorf("failed to wait for active tasks to complete: %w", err) } return nil } + +func (w *Worker) Error() <-chan error { + return w.be.Error() +}