From 8de84cfc1b2c340a66ac18ab0e3c1337adae021b Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Sat, 21 Mar 2026 16:42:11 -0700 Subject: [PATCH] Add poller autoscaling scale-down-on-idle feature test Validates that the SDK scales down pollers on an idle queue when the server advertises the PollerAutoscaling namespace capability. Uses a gRPC interceptor to track concurrent PollWorkflowTaskQueue RPCs and verifies the count decreases from the initial peak. Skips gracefully when the server doesn't support the capability. Requires Go SDK >= v1.41.2 (includes sdk-go#2239 which adds the capability-based scale-down logic). Made-with: Cursor --- features/features.go | 2 + .../scale_down_on_idle/feature.go | 103 ++++++++++++++++++ go.mod | 6 +- go.sum | 12 +- 4 files changed, 114 insertions(+), 9 deletions(-) create mode 100644 features/poller_scaling/scale_down_on_idle/feature.go diff --git a/features/features.go b/features/features.go index 441964a3..0cfe8f8c 100644 --- a/features/features.go +++ b/features/features.go @@ -32,6 +32,7 @@ import ( deployment_versioning_routing_with_ramp "github.com/temporalio/features/features/deployment_versioning/routing_with_ramp" eager_activity_non_remote_activities_worker "github.com/temporalio/features/features/eager_activity/non_remote_activities_worker" eager_workflow_successful_start "github.com/temporalio/features/features/eager_workflow/successful_start" + poller_scaling_scale_down_on_idle "github.com/temporalio/features/features/poller_scaling/scale_down_on_idle" query_successful_query "github.com/temporalio/features/features/query/successful_query" query_timeout_due_to_no_active_workers "github.com/temporalio/features/features/query/timeout_due_to_no_active_workers" query_unexpected_arguments "github.com/temporalio/features/features/query/unexpected_arguments" @@ -92,6 +93,7 @@ func init() { deployment_versioning_routing_with_ramp.Feature, eager_activity_non_remote_activities_worker.Feature, eager_workflow_successful_start.Feature, + poller_scaling_scale_down_on_idle.Feature, query_successful_query.Feature, query_timeout_due_to_no_active_workers.Feature, query_unexpected_arguments.Feature, diff --git a/features/poller_scaling/scale_down_on_idle/feature.go b/features/poller_scaling/scale_down_on_idle/feature.go new file mode 100644 index 00000000..0595dd08 --- /dev/null +++ b/features/poller_scaling/scale_down_on_idle/feature.go @@ -0,0 +1,103 @@ +package scale_down_on_idle + +import ( + "context" + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/temporalio/features/harness/go/harness" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + "google.golang.org/grpc" +) + +var concurrentPolls atomic.Int32 + +var Feature = harness.Feature{ + Workflows: Workflow, + WorkerOptions: worker.Options{ + WorkflowTaskPollerBehavior: worker.NewPollerBehaviorAutoscaling(worker.PollerBehaviorAutoscalingOptions{ + InitialNumberOfPollers: 5, + MinimumNumberOfPollers: 1, + MaximumNumberOfPollers: 10, + }), + }, + ClientOptions: client.Options{ + ConnectionOptions: client.ConnectionOptions{ + DialOptions: []grpc.DialOption{ + grpc.WithChainUnaryInterceptor(pollInterceptor), + }, + }, + }, + Execute: execute, + CheckHistory: harness.NoHistoryCheck, +} + +func pollInterceptor( + ctx context.Context, + method string, + req, reply any, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, +) error { + if strings.HasSuffix(method, "PollWorkflowTaskQueue") { + concurrentPolls.Add(1) + defer concurrentPolls.Add(-1) + } + return invoker(ctx, method, req, reply, cc, opts...) +} + +func execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) { + descNs, err := r.Client.WorkflowService().DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{ + Namespace: r.Namespace, + }) + if err != nil { + return nil, fmt.Errorf("DescribeNamespace: %w", err) + } + caps := descNs.GetNamespaceInfo().GetCapabilities() + if !caps.GetPollerAutoscaling() { + return nil, r.Skip("server does not support poller autoscaling") + } + + // Wait for concurrent polls to reach a peak (at least 3 of the 5 initial pollers). + var peakPolls int32 + err = r.DoUntilEventually(ctx, 500*time.Millisecond, 30*time.Second, func() bool { + current := concurrentPolls.Load() + if current > peakPolls { + peakPolls = current + } + r.Log.Info("Waiting for pollers to start", "current", current, "peak", peakPolls) + return peakPolls >= 3 + }) + if err != nil { + return nil, fmt.Errorf("pollers did not reach expected initial count (peak: %d): %w", peakPolls, err) + } + r.Log.Info("Peak concurrent polls reached", "peak", peakPolls) + + // Wait for concurrent polls to decrease. The queue is idle, so the SDK + // should scale down once it sees the PollerAutoscaling capability on empty + // poll responses and decides to reduce pollers. + var lastSeen int32 + err = r.DoUntilEventually(ctx, 3*time.Second, 120*time.Second, func() bool { + current := concurrentPolls.Load() + r.Log.Info("Waiting for scale-down", "current", current, "peak", peakPolls) + lastSeen = current + return current < peakPolls + }) + if err != nil { + return nil, fmt.Errorf("pollers did not scale down from peak %d (last seen: %d): %w", + peakPolls, lastSeen, err) + } + + r.Log.Info("Pollers scaled down", "peak", peakPolls, "final", lastSeen) + return nil, nil +} + +func Workflow(ctx workflow.Context) error { + return nil +} diff --git a/go.mod b/go.mod index b3af00c9..d5d0d587 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/temporalio/features/features v0.0.0-00010101000000-000000000000 github.com/temporalio/features/harness/go v0.0.0-00010101000000-000000000000 github.com/urfave/cli/v2 v2.25.7 - go.temporal.io/sdk v1.37.0 + go.temporal.io/sdk v1.41.2-0.20260320032056-e6da72d5340d golang.org/x/mod v0.17.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -23,7 +23,7 @@ require ( github.com/golang/mock v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect - github.com/nexus-rpc/sdk-go v0.3.0 // indirect + github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect @@ -31,7 +31,7 @@ require ( github.com/twmb/murmur3 v1.1.8 // indirect github.com/uber-go/tally/v4 v4.1.7 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect - go.temporal.io/api v1.60.0 // indirect + go.temporal.io/api v1.62.5 // indirect go.temporal.io/sdk/contrib/tally v0.2.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 1d17da0b..bea4c8b5 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nexus-rpc/sdk-go v0.3.0 h1:Y3B0kLYbMhd4C2u00kcYajvmOrfozEtTV/nHSnV57jA= -github.com/nexus-rpc/sdk-go v0.3.0/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ= +github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= +github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= @@ -183,11 +183,11 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA= -go.temporal.io/api v1.60.0 h1:SlRkizt3PXu/J62NWlUNLldHtJhUxfsBRuF4T0KYkgY= -go.temporal.io/api v1.60.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= +go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= -go.temporal.io/sdk v1.37.0 h1:RbwCkUQuqY4rfCzdrDZF9lgT7QWG/pHlxfZFq0NPpDQ= -go.temporal.io/sdk v1.37.0/go.mod h1:tOy6vGonfAjrpCl6Bbw/8slTgQMiqvoyegRv2ZHPm5M= +go.temporal.io/sdk v1.41.2-0.20260320032056-e6da72d5340d h1:Xa/MXh02phZ4pl5NgfUQ7CYgrbcqeHoc/zh/1TLK7Ok= +go.temporal.io/sdk v1.41.2-0.20260320032056-e6da72d5340d/go.mod h1:XV3YZRD5+KIK2J5eHmB9+wt1PI+exDGQV8gywQNeSeI= go.temporal.io/sdk/contrib/tally v0.2.0 h1:XnTJIQcjOv+WuCJ1u8Ve2nq+s2H4i/fys34MnWDRrOo= go.temporal.io/sdk/contrib/tally v0.2.0/go.mod h1:1kpSuCms/tHeJQDPuuKkaBsMqfHnIIRnCtUYlPNXxuE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=