From 507cae35e6ab73599f4c6ca5e43fd1805e3d05e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Sun, 1 Mar 2026 22:17:43 -0800 Subject: [PATCH 01/12] feat(orchestrator): add Status field to Result struct MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pkg/orchestrator/result.go | 1 + pkg/orchestrator/result_public_test.go | 60 ++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/pkg/orchestrator/result.go b/pkg/orchestrator/result.go index fbb4841..b293122 100644 --- a/pkg/orchestrator/result.go +++ b/pkg/orchestrator/result.go @@ -27,6 +27,7 @@ const ( type Result struct { Changed bool Data map[string]any + Status Status } // TaskResult records the full execution details of a task. diff --git a/pkg/orchestrator/result_public_test.go b/pkg/orchestrator/result_public_test.go index 59ba13f..82a4615 100644 --- a/pkg/orchestrator/result_public_test.go +++ b/pkg/orchestrator/result_public_test.go @@ -76,6 +76,66 @@ func (s *ResultPublicTestSuite) TestReportSummary() { } } +func (s *ResultPublicTestSuite) TestResultStatusField() { + tests := []struct { + name string + result *orchestrator.Result + wantStatus orchestrator.Status + wantChange bool + }{ + { + name: "changed result carries status", + result: &orchestrator.Result{ + Changed: true, + Data: map[string]any{"hostname": "web-01"}, + Status: orchestrator.StatusChanged, + }, + wantStatus: orchestrator.StatusChanged, + wantChange: true, + }, + { + name: "unchanged result carries status", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusUnchanged, + }, + wantStatus: orchestrator.StatusUnchanged, + wantChange: false, + }, + { + name: "failed result carries status", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusFailed, + }, + wantStatus: orchestrator.StatusFailed, + wantChange: false, + }, + { + name: "skipped result carries status", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusSkipped, + }, + wantStatus: orchestrator.StatusSkipped, + wantChange: false, + }, + { + name: "zero value has empty status", + result: &orchestrator.Result{}, + wantStatus: "", + wantChange: false, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + s.Equal(tt.wantStatus, tt.result.Status) + s.Equal(tt.wantChange, tt.result.Changed) + }) + } +} + func (s *ResultPublicTestSuite) TestResultsGet() { tests := []struct { name string From 22a0156564ceee73f04b49abe7955d97cda55b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 09:01:07 -0800 Subject: [PATCH 02/12] feat(orchestrator): store Result in results map for all task paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the runner only stored a Result for successfully completed tasks. Skipped and failed tasks had no entry in the results map, which prevented guards from inspecting their status. Now every code path (dep-failed skip, OnlyIfChanged skip, guard-false skip, failure, and success) stores a Result with the appropriate Status field set. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pkg/orchestrator/runner.go | 20 ++- pkg/orchestrator/runner_test.go | 244 ++++++++++++++++++++++++++++++++ 2 files changed, 260 insertions(+), 4 deletions(-) diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index afce9e5..8709eed 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -237,6 +237,7 @@ func (r *runner) runTask( for _, dep := range t.deps { if r.failed[dep.name] { r.failed[t.name] = true + r.results[t.name] = &Result{Status: StatusSkipped} r.mu.Unlock() tr := TaskResult{ @@ -268,6 +269,10 @@ func (r *runner) runTask( r.mu.Unlock() if !anyChanged { + r.mu.Lock() + r.results[t.name] = &Result{Status: StatusSkipped} + r.mu.Unlock() + tr := TaskResult{ Name: t.name, Status: StatusSkipped, @@ -287,6 +292,10 @@ func (r *runner) runTask( r.mu.Unlock() if !shouldRun { + r.mu.Lock() + r.results[t.name] = &Result{Status: StatusSkipped} + r.mu.Unlock() + tr := TaskResult{ Name: t.name, Status: StatusSkipped, @@ -335,6 +344,7 @@ func (r *runner) runTask( if err != nil { r.mu.Lock() r.failed[t.name] = true + r.results[t.name] = &Result{Status: StatusFailed} r.mu.Unlock() tr := TaskResult{ @@ -349,15 +359,17 @@ func (r *runner) runTask( return tr } - r.mu.Lock() - r.results[t.name] = result - r.mu.Unlock() - status := StatusUnchanged if result.Changed { status = StatusChanged } + result.Status = status + + r.mu.Lock() + r.results[t.name] = result + r.mu.Unlock() + tr := TaskResult{ Name: t.name, Status: status, diff --git a/pkg/orchestrator/runner_test.go b/pkg/orchestrator/runner_test.go index ccf7549..c00c80b 100644 --- a/pkg/orchestrator/runner_test.go +++ b/pkg/orchestrator/runner_test.go @@ -1,9 +1,13 @@ package orchestrator import ( + "context" + "fmt" "testing" "github.com/stretchr/testify/suite" + + "github.com/osapi-io/osapi-sdk/pkg/osapi" ) type RunnerTestSuite struct { @@ -68,3 +72,243 @@ func (s *RunnerTestSuite) TestLevelize() { }) } } + +func (s *RunnerTestSuite) TestRunTaskStoresResultForAllPaths() { + tests := []struct { + name string + setup func() *Plan + taskName string + wantStatus Status + }{ + { + name: "OnlyIfChanged skip stores StatusSkipped", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + // dep returns Changed=false, so child with + // OnlyIfChanged should be skipped. + dep := plan.TaskFunc("dep", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: false}, nil + }) + + child := plan.TaskFunc("child", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + child.DependsOn(dep) + child.OnlyIfChanged() + + return plan + }, + taskName: "child", + wantStatus: StatusSkipped, + }, + { + name: "failed task stores StatusFailed", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("failing", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return nil, fmt.Errorf("deliberate error") + }) + + return plan + }, + taskName: "failing", + wantStatus: StatusFailed, + }, + { + name: "guard-false skip stores StatusSkipped", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("guarded", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }).When(func(_ Results) bool { + return false + }) + + return plan + }, + taskName: "guarded", + wantStatus: StatusSkipped, + }, + { + name: "dependency-failed skip stores StatusSkipped", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + dep := plan.TaskFunc("dep", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return nil, fmt.Errorf("deliberate error") + }) + + child := plan.TaskFunc("child", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + child.DependsOn(dep) + + return plan + }, + taskName: "child", + wantStatus: StatusSkipped, + }, + { + name: "successful changed task stores StatusChanged", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("ok", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + + return plan + }, + taskName: "ok", + wantStatus: StatusChanged, + }, + { + name: "successful unchanged task stores StatusUnchanged", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("ok", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: false}, nil + }) + + return plan + }, + taskName: "ok", + wantStatus: StatusUnchanged, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan := tt.setup() + runner := newRunner(plan) + + _, err := runner.run(context.Background()) + // Some plans produce errors (e.g. StopAll with a + // failing task); we don't assert on err here because + // we only care about the results map. + _ = err + + result := runner.results.Get(tt.taskName) + s.NotNil( + result, + "results map should contain entry for %q", + tt.taskName, + ) + s.Equal( + tt.wantStatus, + result.Status, + "result status for %q", + tt.taskName, + ) + }) + } +} + +func (s *RunnerTestSuite) TestDownstreamGuardInspectsSkippedStatus() { + tests := []struct { + name string + setup func() (*Plan, *bool) + observerName string + wantGuardCalled bool + wantTaskStatus Status + }{ + { + name: "guard can see guard-skipped task status", + setup: func() (*Plan, *bool) { + plan := NewPlan(nil, OnError(Continue)) + guardCalled := false + + // This task is skipped because its guard + // returns false. + guarded := plan.TaskFunc("guarded", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + guarded.When(func(_ Results) bool { + return false + }) + + // Observer depends on guarded so it runs in a + // later level. Its guard inspects the skipped + // task's status. + observer := plan.TaskFunc("observer", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: false}, nil + }) + observer.DependsOn(guarded) + observer.When(func(r Results) bool { + guardCalled = true + res := r.Get("guarded") + + return res != nil && res.Status == StatusSkipped + }) + + return plan, &guardCalled + }, + observerName: "observer", + wantGuardCalled: true, + // Observer runs because the guard sees the skipped + // status and returns true. + wantTaskStatus: StatusUnchanged, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan, guardCalled := tt.setup() + runner := newRunner(plan) + + _, err := runner.run(context.Background()) + _ = err + + s.Equal( + tt.wantGuardCalled, + *guardCalled, + "guard should have been called", + ) + + result := runner.results.Get(tt.observerName) + s.NotNil( + result, + "observer should have a result entry", + ) + s.Equal( + tt.wantTaskStatus, + result.Status, + "observer task status", + ) + }) + } +} From 06c489d509161814315f384f673133ff3a5800e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 09:08:36 -0800 Subject: [PATCH 03/12] feat(orchestrator): add HostResult type to Result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add HostResult struct for per-host responses in broadcast operations and a HostResults slice field to Result. This enables downstream tasks and guards to inspect individual host outcomes. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pkg/orchestrator/result.go | 16 +++++- pkg/orchestrator/result_public_test.go | 76 ++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/pkg/orchestrator/result.go b/pkg/orchestrator/result.go index b293122..f8b07fa 100644 --- a/pkg/orchestrator/result.go +++ b/pkg/orchestrator/result.go @@ -23,11 +23,21 @@ const ( StatusFailed Status = "failed" ) +// HostResult represents a single host's response within a broadcast +// operation. +type HostResult struct { + Hostname string + Changed bool + Error string + Data map[string]any +} + // Result is the outcome of a single task execution. type Result struct { - Changed bool - Data map[string]any - Status Status + Changed bool + Data map[string]any + Status Status + HostResults []HostResult } // TaskResult records the full execution details of a task. diff --git a/pkg/orchestrator/result_public_test.go b/pkg/orchestrator/result_public_test.go index 82a4615..bc3052c 100644 --- a/pkg/orchestrator/result_public_test.go +++ b/pkg/orchestrator/result_public_test.go @@ -136,6 +136,82 @@ func (s *ResultPublicTestSuite) TestResultStatusField() { } } +func (s *ResultPublicTestSuite) TestResultHostResults() { + tests := []struct { + name string + result *orchestrator.Result + wantLen int + validateFn func(hrs []orchestrator.HostResult) + }{ + { + name: "result with multiple host results", + result: &orchestrator.Result{ + Changed: true, + Status: orchestrator.StatusChanged, + HostResults: []orchestrator.HostResult{ + { + Hostname: "web-01", + Changed: true, + Data: map[string]any{"stdout": "ok"}, + }, + { + Hostname: "web-02", + Changed: false, + Error: "connection timeout", + }, + }, + }, + wantLen: 2, + validateFn: func(hrs []orchestrator.HostResult) { + s.Equal("web-01", hrs[0].Hostname) + s.True(hrs[0].Changed) + s.Equal("web-02", hrs[1].Hostname) + s.Equal("connection timeout", hrs[1].Error) + }, + }, + { + name: "result with no host results", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusUnchanged, + }, + wantLen: 0, + }, + { + name: "host result with data map", + result: &orchestrator.Result{ + Changed: true, + Status: orchestrator.StatusChanged, + HostResults: []orchestrator.HostResult{ + { + Hostname: "db-01", + Changed: true, + Data: map[string]any{ + "stdout": "migrated", + "exit_code": float64(0), + }, + }, + }, + }, + wantLen: 1, + validateFn: func(hrs []orchestrator.HostResult) { + s.Equal("db-01", hrs[0].Hostname) + s.Equal("migrated", hrs[0].Data["stdout"]) + }, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + s.Len(tt.result.HostResults, tt.wantLen) + + if tt.validateFn != nil { + tt.validateFn(tt.result.HostResults) + } + }) + } +} + func (s *ResultPublicTestSuite) TestResultsGet() { tests := []struct { name string From a92775c1a143cec9ebc676b11eb31e2e6ef11d2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 09:16:33 -0800 Subject: [PATCH 04/12] feat(orchestrator): extract per-host results and handle non-zero exit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add IsBroadcastTarget helper to detect broadcast/label targets, extractHostResults to parse per-agent results from collection responses, and isCommandOp to identify command operations. Update executeOp to populate HostResults for broadcast targets and fail on non-zero exit codes for command operations. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pkg/orchestrator/runner.go | 77 +++++++++- pkg/orchestrator/runner_broadcast_test.go | 162 ++++++++++++++++++++++ pkg/orchestrator/task.go | 9 ++ 3 files changed, 247 insertions(+), 1 deletion(-) create mode 100644 pkg/orchestrator/runner_broadcast_test.go diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index 8709eed..2919787 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -385,6 +385,59 @@ func (r *runner) runTask( // DefaultPollInterval is the interval between job status polls. var DefaultPollInterval = 500 * time.Millisecond +// isCommandOp returns true for command execution operations. +func isCommandOp( + operation string, +) bool { + return operation == "command.exec.execute" || + operation == "command.shell.execute" +} + +// extractHostResults parses per-agent results from a broadcast +// collection response. +func extractHostResults( + data map[string]any, +) []HostResult { + resultsRaw, ok := data["results"] + if !ok { + return nil + } + + items, ok := resultsRaw.([]any) + if !ok { + return nil + } + + hostResults := make([]HostResult, 0, len(items)) + + for _, item := range items { + m, ok := item.(map[string]any) + if !ok { + continue + } + + hr := HostResult{ + Data: m, + } + + if h, ok := m["hostname"].(string); ok { + hr.Hostname = h + } + + if c, ok := m["changed"].(bool); ok { + hr.Changed = c + } + + if e, ok := m["error"].(string); ok { + hr.Error = e + } + + hostResults = append(hostResults, hr) + } + + return hostResults +} + // executeOp submits a declarative Op as a job via the SDK and polls // for completion. func (r *runner) executeOp( @@ -427,7 +480,29 @@ func (r *runner) executeOp( jobID := createResp.JSON201.JobId.String() - return r.pollJob(ctx, jobID) + result, err := r.pollJob(ctx, jobID) + if err != nil { + return nil, err + } + + // Extract per-host results for broadcast targets. + if IsBroadcastTarget(op.Target) { + result.HostResults = extractHostResults(result.Data) + } + + // Non-zero exit for command operations = failure. + if isCommandOp(op.Operation) { + if exitCode, ok := result.Data["exit_code"].(float64); ok && exitCode != 0 { + result.Status = StatusFailed + + return result, fmt.Errorf( + "command exited with code %d", + int(exitCode), + ) + } + } + + return result, nil } // pollJob polls a job until it reaches a terminal state. diff --git a/pkg/orchestrator/runner_broadcast_test.go b/pkg/orchestrator/runner_broadcast_test.go new file mode 100644 index 0000000..5be2496 --- /dev/null +++ b/pkg/orchestrator/runner_broadcast_test.go @@ -0,0 +1,162 @@ +package orchestrator + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type RunnerBroadcastTestSuite struct { + suite.Suite +} + +func TestRunnerBroadcastTestSuite(t *testing.T) { + suite.Run(t, new(RunnerBroadcastTestSuite)) +} + +func (s *RunnerBroadcastTestSuite) TestIsBroadcastTarget() { + tests := []struct { + name string + target string + want bool + }{ + { + name: "all agents is broadcast", + target: "_all", + want: true, + }, + { + name: "label selector is broadcast", + target: "role:web", + want: true, + }, + { + name: "single agent is not broadcast", + target: "agent-001", + want: false, + }, + { + name: "empty string is not broadcast", + target: "", + want: false, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + got := IsBroadcastTarget(tt.target) + s.Equal(tt.want, got) + }) + } +} + +func (s *RunnerBroadcastTestSuite) TestExtractHostResults() { + tests := []struct { + name string + data map[string]any + want []HostResult + }{ + { + name: "extracts host results from results array", + data: map[string]any{ + "results": []any{ + map[string]any{ + "hostname": "host-1", + "changed": true, + "data": "something", + }, + map[string]any{ + "hostname": "host-2", + "changed": false, + "error": "connection refused", + }, + }, + }, + want: []HostResult{ + { + Hostname: "host-1", + Changed: true, + Data: map[string]any{ + "hostname": "host-1", + "changed": true, + "data": "something", + }, + }, + { + Hostname: "host-2", + Changed: false, + Error: "connection refused", + Data: map[string]any{ + "hostname": "host-2", + "changed": false, + "error": "connection refused", + }, + }, + }, + }, + { + name: "no results key returns nil", + data: map[string]any{ + "other": "value", + }, + want: nil, + }, + { + name: "results not an array returns nil", + data: map[string]any{ + "results": "not-an-array", + }, + want: nil, + }, + { + name: "empty results array returns empty slice", + data: map[string]any{ + "results": []any{}, + }, + want: []HostResult{}, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + got := extractHostResults(tt.data) + s.Equal(tt.want, got) + }) + } +} + +func (s *RunnerBroadcastTestSuite) TestIsCommandOp() { + tests := []struct { + name string + operation string + want bool + }{ + { + name: "command.exec.execute is a command op", + operation: "command.exec.execute", + want: true, + }, + { + name: "command.shell.execute is a command op", + operation: "command.shell.execute", + want: true, + }, + { + name: "node.hostname.get is not a command op", + operation: "node.hostname.get", + want: false, + }, + { + name: "empty string is not a command op", + operation: "", + want: false, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + got := isCommandOp(tt.operation) + s.Equal(tt.want, got) + }) + } +} diff --git a/pkg/orchestrator/task.go b/pkg/orchestrator/task.go index cc818e1..8e17115 100644 --- a/pkg/orchestrator/task.go +++ b/pkg/orchestrator/task.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "strings" "github.com/osapi-io/osapi-sdk/pkg/osapi" ) @@ -128,3 +129,11 @@ func (t *Task) OnError( func (t *Task) ErrorStrategy() *ErrorStrategy { return t.errorStrategy } + +// IsBroadcastTarget returns true if the target addresses multiple +// agents (broadcast or label selector). +func IsBroadcastTarget( + target string, +) bool { + return target == "_all" || strings.Contains(target, ":") +} From 8a6a689f7f56f3f7ed57a36bcb9347c904b46cff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 09:48:34 -0800 Subject: [PATCH 05/12] feat(orchestrator): add TaskFnWithResults and Data on TaskResult MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add inter-task data access mechanism: TaskFnWithResults lets a task closure receive the results map from prior tasks, enabling data flow between dependent tasks. Add Data field to TaskResult so post-execution result inspection can access task output data. - Add TaskFnWithResults type and fnr field on Task - Add NewTaskFuncWithResults constructor - Update IsFunc() to account for fnr - Add Data map[string]any field to TaskResult - Update runner to handle fnr execution with lock-protected results - Populate Data on TaskResult in success path - Add TaskFuncWithResults method on Plan - Fix Explain() to use IsFunc() for fnr task detection 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pkg/orchestrator/plan.go | 15 ++++- pkg/orchestrator/result.go | 1 + pkg/orchestrator/runner.go | 9 ++- pkg/orchestrator/runner_test.go | 108 ++++++++++++++++++++++++++++++++ pkg/orchestrator/task.go | 23 ++++++- 5 files changed, 153 insertions(+), 3 deletions(-) diff --git a/pkg/orchestrator/plan.go b/pkg/orchestrator/plan.go index 8c92737..2a10233 100644 --- a/pkg/orchestrator/plan.go +++ b/pkg/orchestrator/plan.go @@ -67,6 +67,19 @@ func (p *Plan) TaskFunc( return t } +// TaskFuncWithResults creates a functional task that receives +// completed results from prior tasks, adds it to the plan, and +// returns it. +func (p *Plan) TaskFuncWithResults( + name string, + fn TaskFnWithResults, +) *Task { + t := NewTaskFuncWithResults(name, fn) + p.tasks = append(p.tasks, t) + + return t +} + // Tasks returns all tasks in the plan. func (p *Plan) Tasks() []*Task { return p.tasks @@ -93,7 +106,7 @@ func (p *Plan) Explain() string { for _, t := range level { kind := "op" - if t.fn != nil { + if t.IsFunc() { kind = "fn" } diff --git a/pkg/orchestrator/result.go b/pkg/orchestrator/result.go index f8b07fa..4318495 100644 --- a/pkg/orchestrator/result.go +++ b/pkg/orchestrator/result.go @@ -47,6 +47,7 @@ type TaskResult struct { Changed bool Duration time.Duration Error error + Data map[string]any } // Results is a map of task name to Result, used for conditional logic. diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index 2919787..e8f8a29 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -324,7 +324,13 @@ func (r *runner) runTask( client := r.plan.client for attempt := range maxAttempts { - if t.fn != nil { + if t.fnr != nil { + r.mu.Lock() + results := r.results + r.mu.Unlock() + + result, err = t.fnr(ctx, client, results) + } else if t.fn != nil { result, err = t.fn(ctx, client) } else { result, err = r.executeOp(ctx, t.op) @@ -375,6 +381,7 @@ func (r *runner) runTask( Status: status, Changed: result.Changed, Duration: elapsed, + Data: result.Data, } r.callAfterTask(t, tr) diff --git a/pkg/orchestrator/runner_test.go b/pkg/orchestrator/runner_test.go index c00c80b..d202158 100644 --- a/pkg/orchestrator/runner_test.go +++ b/pkg/orchestrator/runner_test.go @@ -312,3 +312,111 @@ func (s *RunnerTestSuite) TestDownstreamGuardInspectsSkippedStatus() { }) } } + +func (s *RunnerTestSuite) TestTaskFuncWithResultsReceivesResults() { + tests := []struct { + name string + setup func() (*Plan, *string) + wantCapture string + }{ + { + name: "receives upstream result data", + setup: func() (*Plan, *string) { + plan := NewPlan(nil, OnError(StopAll)) + var captured string + + a := plan.TaskFunc("a", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{ + Changed: true, + Data: map[string]any{"hostname": "web-01"}, + }, nil + }) + + b := plan.TaskFuncWithResults("b", func( + _ context.Context, + _ *osapi.Client, + results Results, + ) (*Result, error) { + r := results.Get("a") + if r != nil { + if h, ok := r.Data["hostname"].(string); ok { + captured = h + } + } + + return &Result{Changed: false}, nil + }) + b.DependsOn(a) + + return plan, &captured + }, + wantCapture: "web-01", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan, captured := tt.setup() + + _, err := plan.Run(context.Background()) + + s.Require().NoError(err) + s.Equal(tt.wantCapture, *captured) + }) + } +} + +func (s *RunnerTestSuite) TestTaskResultCarriesData() { + tests := []struct { + name string + setup func() *Plan + taskName string + wantKey string + wantVal any + }{ + { + name: "success result includes data", + setup: func() *Plan { + plan := NewPlan(nil, OnError(StopAll)) + + plan.TaskFunc("a", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{ + Changed: true, + Data: map[string]any{"stdout": "hello"}, + }, nil + }) + + return plan + }, + taskName: "a", + wantKey: "stdout", + wantVal: "hello", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan := tt.setup() + + report, err := plan.Run(context.Background()) + + s.Require().NoError(err) + + var found bool + for _, tr := range report.Tasks { + if tr.Name == tt.taskName { + found = true + s.Equal(tt.wantVal, tr.Data[tt.wantKey]) + } + } + + s.True(found, "task %q should be in report", tt.taskName) + }) + } +} diff --git a/pkg/orchestrator/task.go b/pkg/orchestrator/task.go index 8e17115..2c79e0f 100644 --- a/pkg/orchestrator/task.go +++ b/pkg/orchestrator/task.go @@ -21,6 +21,14 @@ type TaskFn func( client *osapi.Client, ) (*Result, error) +// TaskFnWithResults is like TaskFn but receives completed task results +// for inter-task data access. +type TaskFnWithResults func( + ctx context.Context, + client *osapi.Client, + results Results, +) (*Result, error) + // GuardFn is a predicate that determines if a task should run. type GuardFn func(results Results) bool @@ -29,6 +37,7 @@ type Task struct { name string op *Op fn TaskFn + fnr TaskFnWithResults deps []*Task guard GuardFn requiresChange bool @@ -57,6 +66,18 @@ func NewTaskFunc( } } +// NewTaskFuncWithResults creates a functional task that receives +// completed results from prior tasks. +func NewTaskFuncWithResults( + name string, + fn TaskFnWithResults, +) *Task { + return &Task{ + name: name, + fnr: fn, + } +} + // Name returns the task name. func (t *Task) Name() string { return t.name @@ -64,7 +85,7 @@ func (t *Task) Name() string { // IsFunc returns true if this is a functional task. func (t *Task) IsFunc() bool { - return t.fn != nil + return t.fn != nil || t.fnr != nil } // Operation returns the declarative operation, or nil for functional From 924a1a32c3f0869b886534bc9b01eca045879ab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 12:41:59 -0800 Subject: [PATCH 06/12] docs: update README and example for new orchestrator features MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add TaskFuncWithResults, Result types (Status, HostResult, Data on TaskResult), and broadcast results to docs. Update example to use TaskFuncWithResults with Results access and show Data on TaskResult. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- README.md | 74 ++++++++++++++++++------------------ docs/orchestration/README.md | 60 +++++++++++++++++++++++++++++ examples/all/main.go | 29 +++++++++++--- 3 files changed, 121 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index c3eba98..2e30751 100644 --- a/README.md +++ b/README.md @@ -24,16 +24,16 @@ Typed Go client for every OSAPI endpoint. See the [client docs](docs/osapi/README.md) for targeting, options, and quick start. -| Service | Operations | Docs | Source | -| ------- | ---------- | ---- | ------ | -| Node | Hostname, disk, memory, load, uptime, OS info, status | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | -| Network | DNS get/update, ping | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | -| Command | exec, shell | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | -| Job | Create, get, list, delete, retry, stats | [docs](docs/osapi/job.md) | [`job.go`](pkg/osapi/job.go) | -| Agent | List, get (discovery + heartbeat data) | [docs](docs/osapi/agent.md) | [`agent.go`](pkg/osapi/agent.go) | -| Health | Liveness, readiness, status | [docs](docs/osapi/health.md) | [`health.go`](pkg/osapi/health.go) | -| Audit | List, get, export | [docs](docs/osapi/audit.md) | [`audit.go`](pkg/osapi/audit.go) | -| Metrics | Prometheus text | [docs](docs/osapi/metrics.md) | [`metrics.go`](pkg/osapi/metrics.go) | +| Service | Operations | Docs | Source | +| ------- | ----------------------------------------------------- | ----------------------------- | ------------------------------------ | +| Node | Hostname, disk, memory, load, uptime, OS info, status | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | +| Network | DNS get/update, ping | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | +| Command | exec, shell | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | +| Job | Create, get, list, delete, retry, stats | [docs](docs/osapi/job.md) | [`job.go`](pkg/osapi/job.go) | +| Agent | List, get (discovery + heartbeat data) | [docs](docs/osapi/agent.md) | [`agent.go`](pkg/osapi/agent.go) | +| Health | Liveness, readiness, status | [docs](docs/osapi/health.md) | [`health.go`](pkg/osapi/health.go) | +| Audit | List, get, export | [docs](docs/osapi/audit.md) | [`audit.go`](pkg/osapi/audit.go) | +| Metrics | Prometheus text | [docs](docs/osapi/metrics.md) | [`metrics.go`](pkg/osapi/metrics.go) | ## Orchestration @@ -41,40 +41,42 @@ DAG-based task execution on top of the client. See the [orchestration docs](docs/orchestration/README.md) for hooks, error strategies, and adding new operations. -| Feature | Description | Source | -| ------- | ----------- | ------ | -| DAG execution | Dependency-based ordering with automatic parallelism | [`plan.go`](pkg/orchestrator/plan.go) | -| Op tasks | Declarative OSAPI operations with target routing and params | [`task.go`](pkg/orchestrator/task.go) | -| TaskFunc | Custom Go functions with SDK client access | [`task.go`](pkg/orchestrator/task.go) | -| Hooks | 8 lifecycle callbacks — consumer-controlled logging | [`options.go`](pkg/orchestrator/options.go) | -| Error strategies | StopAll, Continue (skip dependents), Retry(n) | [`options.go`](pkg/orchestrator/options.go) | -| Guards | `When` predicates, `OnlyIfChanged` conditional execution | [`task.go`](pkg/orchestrator/task.go) | -| Changed detection | Read-only ops return false, mutators reflect actual state | [`runner.go`](pkg/orchestrator/runner.go) | -| Result access | `Results.Get()` for cross-task data flow | [`result.go`](pkg/orchestrator/result.go) | +| Feature | Description | Source | +| ------------------- | ------------------------------------------------------------------------- | ------------------------------------------- | +| DAG execution | Dependency-based ordering with automatic parallelism | [`plan.go`](pkg/orchestrator/plan.go) | +| Op tasks | Declarative OSAPI operations with target routing and params | [`task.go`](pkg/orchestrator/task.go) | +| TaskFunc | Custom Go functions with SDK client access | [`task.go`](pkg/orchestrator/task.go) | +| TaskFuncWithResults | Custom functions that receive completed results from prior tasks | [`task.go`](pkg/orchestrator/task.go) | +| Hooks | 8 lifecycle callbacks — consumer-controlled logging | [`options.go`](pkg/orchestrator/options.go) | +| Error strategies | StopAll, Continue (skip dependents), Retry(n) | [`options.go`](pkg/orchestrator/options.go) | +| Guards | `When` predicates, `OnlyIfChanged` conditional execution | [`task.go`](pkg/orchestrator/task.go) | +| Changed detection | Read-only ops return false, mutators reflect actual state | [`runner.go`](pkg/orchestrator/runner.go) | +| Result access | `Results.Get()` for cross-task data flow, `Status` for outcome inspection | [`result.go`](pkg/orchestrator/result.go) | +| Broadcast results | Per-host `HostResult` data for multi-target operations | [`result.go`](pkg/orchestrator/result.go) | ### Operations -| Operation | Description | Idempotent | Docs | -| --------- | ----------- | ---------- | ---- | -| `node.hostname.get` | Get system hostname | Read-only | [docs](docs/orchestration/node-hostname.md) | -| `node.status.get` | Get node status | Read-only | [docs](docs/orchestration/node-status.md) | -| `node.disk.get` | Get disk usage | Read-only | [docs](docs/orchestration/node-disk.md) | -| `node.memory.get` | Get memory stats | Read-only | [docs](docs/orchestration/node-memory.md) | -| `node.load.get` | Get load averages | Read-only | [docs](docs/orchestration/node-load.md) | -| `network.dns.get` | Get DNS configuration | Read-only | [docs](docs/orchestration/network-dns-get.md) | -| `network.dns.update` | Update DNS servers | Yes | [docs](docs/orchestration/network-dns-update.md) | -| `network.ping.do` | Ping a host | Read-only | [docs](docs/orchestration/network-ping.md) | -| `command.exec.execute` | Execute a command | No | [docs](docs/orchestration/command-exec.md) | -| `command.shell.execute` | Execute a shell string | No | [docs](docs/orchestration/command-shell.md) | +| Operation | Description | Idempotent | Docs | +| ----------------------- | ---------------------- | ---------- | ------------------------------------------------ | +| `node.hostname.get` | Get system hostname | Read-only | [docs](docs/orchestration/node-hostname.md) | +| `node.status.get` | Get node status | Read-only | [docs](docs/orchestration/node-status.md) | +| `node.disk.get` | Get disk usage | Read-only | [docs](docs/orchestration/node-disk.md) | +| `node.memory.get` | Get memory stats | Read-only | [docs](docs/orchestration/node-memory.md) | +| `node.load.get` | Get load averages | Read-only | [docs](docs/orchestration/node-load.md) | +| `network.dns.get` | Get DNS configuration | Read-only | [docs](docs/orchestration/network-dns-get.md) | +| `network.dns.update` | Update DNS servers | Yes | [docs](docs/orchestration/network-dns-update.md) | +| `network.ping.do` | Ping a host | Read-only | [docs](docs/orchestration/network-ping.md) | +| `command.exec.execute` | Execute a command | No | [docs](docs/orchestration/command-exec.md) | +| `command.shell.execute` | Execute a shell string | No | [docs](docs/orchestration/command-shell.md) | ## Examples Each example is a standalone Go program you can read and run. -| Example | What it shows | -| ------- | ------------- | -| [discovery](examples/discovery/main.go) | Runnable DAG that discovers fleet info: health check, agent listing, and status in parallel | -| [all](examples/all/main.go) | Every feature: hooks, Op tasks, TaskFunc, dependencies, guards, Levels(), error strategies, reporting | +| Example | What it shows | +| --------------------------------------- | ----------------------------------------------------------------------------------------------------- | +| [discovery](examples/discovery/main.go) | Runnable DAG that discovers fleet info: health check, agent listing, and status in parallel | +| [all](examples/all/main.go) | Every feature: hooks, Op tasks, TaskFunc, dependencies, guards, Levels(), error strategies, reporting | ```bash cd examples/discovery diff --git a/docs/orchestration/README.md b/docs/orchestration/README.md index 83deb87..2bbe214 100644 --- a/docs/orchestration/README.md +++ b/docs/orchestration/README.md @@ -69,6 +69,66 @@ plan := orchestrator.NewPlan(client, orchestrator.OnError(orchestrator.Continue) task.OnError(orchestrator.Retry(3)) // override for this task ``` +## Result Types + +### Result + +The `Result` struct returned by task functions: + +| Field | Type | Description | +| ------------- | ---------------- | ---------------------------------------------- | +| `Changed` | `bool` | Whether the operation modified state | +| `Data` | `map[string]any` | Operation-specific response data | +| `Status` | `Status` | Terminal status (`changed`, `unchanged`, etc.) | +| `HostResults` | `[]HostResult` | Per-host results for broadcast operations | + +### TaskResult + +The `TaskResult` struct provided to `AfterTask` hooks and in `Report.Tasks`: + +| Field | Type | Description | +| ---------- | ---------------- | ------------------------------------------- | +| `Name` | `string` | Task name | +| `Status` | `Status` | Terminal status | +| `Changed` | `bool` | Whether the operation reported changes | +| `Duration` | `time.Duration` | Execution time | +| `Error` | `error` | Error if task failed; nil on success | +| `Data` | `map[string]any` | Operation response data for post-run access | + +### HostResult + +Per-host data for broadcast operations (targeting `_all` or label selectors): + +| Field | Type | Description | +| ---------- | ---------------- | ---------------------------------- | +| `Hostname` | `string` | Agent hostname | +| `Changed` | `bool` | Whether this host reported changes | +| `Error` | `string` | Error message; empty on success | +| `Data` | `map[string]any` | Host-specific response data | + +## TaskFuncWithResults + +Use `TaskFuncWithResults` when a task needs to read results from prior tasks: + +```go +summarize := plan.TaskFuncWithResults( + "summarize", + func(ctx context.Context, client *osapi.Client, results orchestrator.Results) (*orchestrator.Result, error) { + r := results.Get("get-hostname") + hostname := r.Data["hostname"].(string) + + return &orchestrator.Result{ + Changed: true, + Data: map[string]any{"summary": hostname}, + }, nil + }, +) +summarize.DependsOn(getHostname) +``` + +Unlike `TaskFunc`, the function receives the `Results` map containing completed +dependency outputs. + ## Adding a New Operation When a new operation is added to OSAPI: diff --git a/examples/all/main.go b/examples/all/main.go index ce46d84..3dbe72f 100644 --- a/examples/all/main.go +++ b/examples/all/main.go @@ -32,7 +32,7 @@ // check-health // ├── get-hostname ────────────┐ // ├── get-disk │ -// ├── get-memory ├── print-summary (only-if-changed, when) +// ├── get-memory ├── print-summary (TaskFuncWithResults, only-if-changed, when) // ├── get-load [retry:2] ──────┘ // └── run-uptime [params] ─────┘ // optional-fail [continue] (independent, no deps on summary) @@ -255,17 +255,27 @@ func main() { optionalFail.OnError(orchestrator.Continue) // Level 2: summary (depends on all queries, guard + OnlyIfChanged). - // Uses Results.Get() to read data from previous tasks. - summary := plan.TaskFunc( + // Uses TaskFuncWithResults to access completed results from prior tasks. + summary := plan.TaskFuncWithResults( "print-summary", func( _ context.Context, _ *osapi.Client, + results orchestrator.Results, ) (*orchestrator.Result, error) { fmt.Println("\n --- Fleet Summary ---") - fmt.Println(" All node queries completed.") - return &orchestrator.Result{Changed: false}, nil + r := results.Get("get-hostname") + if r != nil { + if h, ok := r.Data["hostname"].(string); ok { + fmt.Printf(" Hostname: %s\n", h) + } + } + + return &orchestrator.Result{ + Changed: false, + Data: map[string]any{"completed": true}, + }, nil }, ) summary.DependsOn(getHostname, getDisk, getMemory, getLoad, runUptime) @@ -273,7 +283,7 @@ func main() { summary.When(func(results orchestrator.Results) bool { r := results.Get("get-hostname") - return r != nil && r.Data["hostname"] != nil + return r != nil && r.Status == orchestrator.StatusChanged }) // --- Structured DAG access --- @@ -324,5 +334,12 @@ func main() { r.Changed, r.Duration, ) + + // TaskResult.Data carries operation response data for post-run access. + if r.Data != nil { + if stdout, ok := r.Data["stdout"].(string); ok && stdout != "" { + fmt.Printf(" %-20s stdout=%q\n", "", stdout) + } + } } } From e1d1e298b98e90146d219c885d5d0f781f61804f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 12:46:49 -0800 Subject: [PATCH 07/12] docs: enhance examples/all with inter-task data passing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrite examples/all to showcase low-level SDK patterns: TaskFuncWithResults reading data from prior tasks, Status inspection in When guards, alert-on-failure triggered by StatusFailed, and Data extraction in post-run reporting. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- examples/all/main.go | 58 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/examples/all/main.go b/examples/all/main.go index 3dbe72f..6432a45 100644 --- a/examples/all/main.go +++ b/examples/all/main.go @@ -20,9 +20,10 @@ // Package main demonstrates every orchestrator feature: hooks for consumer- // controlled logging at every lifecycle point, Op and TaskFunc tasks, -// dependencies, guards, Levels() for DAG inspection, error strategies +// TaskFuncWithResults for inter-task data passing, dependencies, guards +// with Status inspection, Levels() for DAG inspection, error strategies // (plan-level Continue + per-task Retry), parameterized operations, -// result data access, and detailed result reporting. +// result data access, and detailed result reporting with Data. // // This example serves as a reference for building tools like Terraform // or Ansible that consume the SDK. @@ -32,10 +33,11 @@ // check-health // ├── get-hostname ────────────┐ // ├── get-disk │ -// ├── get-memory ├── print-summary (TaskFuncWithResults, only-if-changed, when) +// ├── get-memory ├── print-summary (TaskFuncWithResults, reads prior data) // ├── get-load [retry:2] ──────┘ // └── run-uptime [params] ─────┘ -// optional-fail [continue] (independent, no deps on summary) +// optional-fail [continue] (independent) +// └── alert-on-failure (When: checks Status == StatusFailed) // // Run with: OSAPI_TOKEN="" go run main.go package main @@ -254,8 +256,8 @@ func main() { ) optionalFail.OnError(orchestrator.Continue) - // Level 2: summary (depends on all queries, guard + OnlyIfChanged). - // Uses TaskFuncWithResults to access completed results from prior tasks. + // Level 2: summary — uses TaskFuncWithResults to read data from prior + // tasks and aggregate it. This is the key inter-task data passing pattern. summary := plan.TaskFuncWithResults( "print-summary", func( @@ -265,13 +267,22 @@ func main() { ) (*orchestrator.Result, error) { fmt.Println("\n --- Fleet Summary ---") - r := results.Get("get-hostname") - if r != nil { + // Read hostname from a prior task via Results.Get(). + if r := results.Get("get-hostname"); r != nil { if h, ok := r.Data["hostname"].(string); ok { fmt.Printf(" Hostname: %s\n", h) } } + // Read uptime stdout from a prior command task. + if r := results.Get("run-uptime"); r != nil { + if stdout, ok := r.Data["stdout"].(string); ok { + fmt.Printf(" Uptime: %s\n", stdout) + } + } + + // Return aggregated data — available in Report.Tasks[].Data + // after plan execution completes. return &orchestrator.Result{ Changed: false, Data: map[string]any{"completed": true}, @@ -280,12 +291,37 @@ func main() { ) summary.DependsOn(getHostname, getDisk, getMemory, getLoad, runUptime) summary.OnlyIfChanged() // skip if no dependency reported changes + + // Guard using Status inspection — only run if hostname succeeded. summary.When(func(results orchestrator.Results) bool { r := results.Get("get-hostname") return r != nil && r.Status == orchestrator.StatusChanged }) + // Level 2: alert task — runs only if optional-fail has StatusFailed. + // Demonstrates using Status in a When guard for failure-triggered recovery. + alertOnFailure := plan.TaskFunc( + "alert-on-failure", + func( + _ context.Context, + _ *osapi.Client, + ) (*orchestrator.Result, error) { + fmt.Println("\n [alert] optional-fail task failed — sending alert") + + return &orchestrator.Result{ + Changed: true, + Data: map[string]any{"alerted": true}, + }, nil + }, + ) + alertOnFailure.DependsOn(optionalFail) + alertOnFailure.When(func(results orchestrator.Results) bool { + r := results.Get("optional-fail") + + return r != nil && r.Status == orchestrator.StatusFailed + }) + // --- Structured DAG access --- levels, err := plan.Levels() @@ -318,6 +354,7 @@ func main() { } // --- Detailed result inspection --- + // TaskResult.Data carries operation response data for post-run access. fmt.Println("\nDetailed results:") @@ -335,11 +372,14 @@ func main() { r.Duration, ) - // TaskResult.Data carries operation response data for post-run access. if r.Data != nil { if stdout, ok := r.Data["stdout"].(string); ok && stdout != "" { fmt.Printf(" %-20s stdout=%q\n", "", stdout) } + + if hostname, ok := r.Data["hostname"].(string); ok { + fmt.Printf(" %-20s hostname=%q\n", "", hostname) + } } } } From 1835e014e8e1ef02e29bf568491cf3bb432384ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 12:54:43 -0800 Subject: [PATCH 08/12] fix(orchestrator): allow When guards on tasks with failed deps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tasks with a When guard were being auto-skipped when a dependency failed, preventing failure-triggered patterns like alert-on-failure. Now tasks with guards bypass the dependency-failed skip and let the guard decide whether to run. Also print all Data keys in the example instead of only stdout/hostname. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- examples/all/main.go | 10 ++---- pkg/orchestrator/plan_public_test.go | 54 ++++++++++++++++++++++++++++ pkg/orchestrator/runner.go | 37 ++++++++++--------- 3 files changed, 77 insertions(+), 24 deletions(-) diff --git a/examples/all/main.go b/examples/all/main.go index 6432a45..fd0cf89 100644 --- a/examples/all/main.go +++ b/examples/all/main.go @@ -372,14 +372,8 @@ func main() { r.Duration, ) - if r.Data != nil { - if stdout, ok := r.Data["stdout"].(string); ok && stdout != "" { - fmt.Printf(" %-20s stdout=%q\n", "", stdout) - } - - if hostname, ok := r.Data["hostname"].(string); ok { - fmt.Printf(" %-20s hostname=%q\n", "", hostname) - } + for k, v := range r.Data { + fmt.Printf(" %-20s %s=%v\n", "", k, v) } } } diff --git a/pkg/orchestrator/plan_public_test.go b/pkg/orchestrator/plan_public_test.go index c1410f1..920620c 100644 --- a/pkg/orchestrator/plan_public_test.go +++ b/pkg/orchestrator/plan_public_test.go @@ -311,6 +311,60 @@ func (s *PlanPublicTestSuite) TestRunGuard() { } } +func (s *PlanPublicTestSuite) TestRunGuardWithFailedDependency() { + tests := []struct { + name string + guard func(orchestrator.Results) bool + expectRan bool + expectStatus orchestrator.Status + }{ + { + name: "guard runs and returns true when dependency failed", + guard: func(r orchestrator.Results) bool { + res := r.Get("fail") + + return res != nil && res.Status == orchestrator.StatusFailed + }, + expectRan: true, + expectStatus: orchestrator.StatusChanged, + }, + { + name: "guard runs and returns false when dependency failed", + guard: func(r orchestrator.Results) bool { + res := r.Get("fail") + + return res != nil && res.Status == orchestrator.StatusChanged + }, + expectRan: false, + expectStatus: orchestrator.StatusSkipped, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan := orchestrator.NewPlan( + nil, + orchestrator.OnError(orchestrator.Continue), + ) + ran := false + + fail := plan.TaskFunc("fail", failFunc("boom")) + alert := plan.TaskFunc("alert", taskFunc(true, func() { + ran = true + })) + alert.DependsOn(fail) + alert.When(tt.guard) + + report, err := plan.Run(context.Background()) + s.Require().NoError(err) + s.Equal(tt.expectRan, ran) + + sm := statusMap(report) + s.Equal(tt.expectStatus, sm["alert"]) + }) + } +} + func (s *PlanPublicTestSuite) TestRunErrorStrategy() { s.Run("stop all on error", func() { plan := orchestrator.NewPlan(nil) diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index e8f8a29..240b074 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -232,26 +232,31 @@ func (r *runner) runTask( ) TaskResult { start := time.Now() - // Skip if any dependency failed (Continue strategy). - r.mu.Lock() - for _, dep := range t.deps { - if r.failed[dep.name] { - r.failed[t.name] = true - r.results[t.name] = &Result{Status: StatusSkipped} - r.mu.Unlock() + // Skip if any dependency failed — unless the task has a When guard, + // which may intentionally inspect failure status (e.g. alert-on-failure). + if t.guard == nil { + r.mu.Lock() - tr := TaskResult{ - Name: t.name, - Status: StatusSkipped, - Duration: time.Since(start), - } - r.callOnSkip(t, "dependency failed") - r.callAfterTask(t, tr) + for _, dep := range t.deps { + if r.failed[dep.name] { + r.failed[t.name] = true + r.results[t.name] = &Result{Status: StatusSkipped} + r.mu.Unlock() + + tr := TaskResult{ + Name: t.name, + Status: StatusSkipped, + Duration: time.Since(start), + } + r.callOnSkip(t, "dependency failed") + r.callAfterTask(t, tr) - return tr + return tr + } } + + r.mu.Unlock() } - r.mu.Unlock() if t.requiresChange { anyChanged := false From eb889951758a6e2f2ffa7459ccb863ce67b2693e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 12:58:12 -0800 Subject: [PATCH 09/12] fix(orchestrator): remove changed key from result Data map MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The runner extracted the changed boolean from the API response data but left it in the Data map, causing it to appear alongside real fields in result output. Also switch example detailed results to JSON formatting for readability. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- examples/all/main.go | 6 ++++-- pkg/orchestrator/runner.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/all/main.go b/examples/all/main.go index fd0cf89..9c2fabe 100644 --- a/examples/all/main.go +++ b/examples/all/main.go @@ -44,6 +44,7 @@ package main import ( "context" + "encoding/json" "fmt" "log" "os" @@ -372,8 +373,9 @@ func main() { r.Duration, ) - for k, v := range r.Data { - fmt.Printf(" %-20s %s=%v\n", "", k, v) + if len(r.Data) > 0 { + b, _ := json.MarshalIndent(r.Data, " "+strings.Repeat(" ", 20), " ") + fmt.Printf(" %-20s data=%s\n", "", b) } } } diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index 240b074..6aa7f00 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -565,6 +565,7 @@ func (r *runner) pollJob( } changed, _ := data["changed"].(bool) + delete(data, "changed") return &Result{Changed: changed, Data: data}, nil case "failed": From a2d355fb13f9d8542e68425fefbc33ccb2a88279 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 13:17:54 -0800 Subject: [PATCH 10/12] feat(orchestrator): add HostResults to TaskResult and SetName MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Propagate HostResults from Result to TaskResult so AfterTask hooks can display per-host data for broadcast operations. Add SetName method to Task for renaming after creation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pkg/orchestrator/result.go | 13 +++++++------ pkg/orchestrator/runner.go | 11 ++++++----- pkg/orchestrator/task.go | 7 +++++++ 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/pkg/orchestrator/result.go b/pkg/orchestrator/result.go index 4318495..1ad5c7d 100644 --- a/pkg/orchestrator/result.go +++ b/pkg/orchestrator/result.go @@ -42,12 +42,13 @@ type Result struct { // TaskResult records the full execution details of a task. type TaskResult struct { - Name string - Status Status - Changed bool - Duration time.Duration - Error error - Data map[string]any + Name string + Status Status + Changed bool + Duration time.Duration + Error error + Data map[string]any + HostResults []HostResult } // Results is a map of task name to Result, used for conditional logic. diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index 6aa7f00..c9e16aa 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -382,11 +382,12 @@ func (r *runner) runTask( r.mu.Unlock() tr := TaskResult{ - Name: t.name, - Status: status, - Changed: result.Changed, - Duration: elapsed, - Data: result.Data, + Name: t.name, + Status: status, + Changed: result.Changed, + Duration: elapsed, + Data: result.Data, + HostResults: result.HostResults, } r.callAfterTask(t, tr) diff --git a/pkg/orchestrator/task.go b/pkg/orchestrator/task.go index 2c79e0f..044454b 100644 --- a/pkg/orchestrator/task.go +++ b/pkg/orchestrator/task.go @@ -83,6 +83,13 @@ func (t *Task) Name() string { return t.name } +// SetName changes the task name. +func (t *Task) SetName( + name string, +) { + t.name = name +} + // IsFunc returns true if this is a functional task. func (t *Task) IsFunc() bool { return t.fn != nil || t.fnr != nil From 475b953fded2bea8997ab4a3d61f3783d7703f83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 15:04:52 -0800 Subject: [PATCH 11/12] feat(orchestrator): add WhenWithReason for custom guard skip messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add WhenWithReason method to Task that allows setting a custom reason string shown when a guard returns false, improving skip hook output. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- docs/gen/orchestrator.md | 159 ++++++++++++++++++++++++++++--------- pkg/orchestrator/runner.go | 6 +- pkg/orchestrator/task.go | 11 +++ 3 files changed, 137 insertions(+), 39 deletions(-) diff --git a/docs/gen/orchestrator.md b/docs/gen/orchestrator.md index e54b998..6cc68ce 100644 --- a/docs/gen/orchestrator.md +++ b/docs/gen/orchestrator.md @@ -11,12 +11,14 @@ Package orchestrator provides DAG\-based task orchestration primitives. ## Index - [Variables](<#variables>) +- [func IsBroadcastTarget\(target string\) bool](<#IsBroadcastTarget>) - [type ErrorStrategy](<#ErrorStrategy>) - [func Retry\(n int\) ErrorStrategy](<#Retry>) - [func \(e ErrorStrategy\) RetryCount\(\) int](<#ErrorStrategy.RetryCount>) - [func \(e ErrorStrategy\) String\(\) string](<#ErrorStrategy.String>) - [type GuardFn](<#GuardFn>) - [type Hooks](<#Hooks>) +- [type HostResult](<#HostResult>) - [type Op](<#Op>) - [type Plan](<#Plan>) - [func NewPlan\(client \*osapi.Client, opts ...PlanOption\) \*Plan](<#NewPlan>) @@ -27,6 +29,7 @@ Package orchestrator provides DAG\-based task orchestration primitives. - [func \(p \*Plan\) Run\(ctx context.Context\) \(\*Report, error\)](<#Plan.Run>) - [func \(p \*Plan\) Task\(name string, op \*Op\) \*Task](<#Plan.Task>) - [func \(p \*Plan\) TaskFunc\(name string, fn TaskFn\) \*Task](<#Plan.TaskFunc>) + - [func \(p \*Plan\) TaskFuncWithResults\(name string, fn TaskFnWithResults\) \*Task](<#Plan.TaskFuncWithResults>) - [func \(p \*Plan\) Tasks\(\) \[\]\*Task](<#Plan.Tasks>) - [func \(p \*Plan\) Validate\(\) error](<#Plan.Validate>) - [type PlanConfig](<#PlanConfig>) @@ -44,6 +47,7 @@ Package orchestrator provides DAG\-based task orchestration primitives. - [type Task](<#Task>) - [func NewTask\(name string, op \*Op\) \*Task](<#NewTask>) - [func NewTaskFunc\(name string, fn TaskFn\) \*Task](<#NewTaskFunc>) + - [func NewTaskFuncWithResults\(name string, fn TaskFnWithResults\) \*Task](<#NewTaskFuncWithResults>) - [func \(t \*Task\) Dependencies\(\) \[\]\*Task](<#Task.Dependencies>) - [func \(t \*Task\) DependsOn\(deps ...\*Task\) \*Task](<#Task.DependsOn>) - [func \(t \*Task\) ErrorStrategy\(\) \*ErrorStrategy](<#Task.ErrorStrategy>) @@ -55,8 +59,11 @@ Package orchestrator provides DAG\-based task orchestration primitives. - [func \(t \*Task\) OnlyIfChanged\(\)](<#Task.OnlyIfChanged>) - [func \(t \*Task\) Operation\(\) \*Op](<#Task.Operation>) - [func \(t \*Task\) RequiresChange\(\) bool](<#Task.RequiresChange>) + - [func \(t \*Task\) SetName\(name string\)](<#Task.SetName>) - [func \(t \*Task\) When\(fn GuardFn\)](<#Task.When>) + - [func \(t \*Task\) WhenWithReason\(fn GuardFn, reason string\)](<#Task.WhenWithReason>) - [type TaskFn](<#TaskFn>) +- [type TaskFnWithResults](<#TaskFnWithResults>) - [type TaskResult](<#TaskResult>) @@ -80,6 +87,15 @@ var DefaultPollInterval = 500 * time.Millisecond var StopAll = ErrorStrategy{/* contains filtered or unexported fields */} ``` + +## func [IsBroadcastTarget]() + +```go +func IsBroadcastTarget(target string) bool +``` + +IsBroadcastTarget returns true if the target addresses multiple agents \(broadcast or label selector\). + ## type [ErrorStrategy]() @@ -119,7 +135,7 @@ func (e ErrorStrategy) String() string String returns a human\-readable representation of the strategy. -## type [GuardFn]() +## type [GuardFn]() GuardFn is a predicate that determines if a task should run. @@ -145,8 +161,22 @@ type Hooks struct { } ``` + +## type [HostResult]() + +HostResult represents a single host's response within a broadcast operation. + +```go +type HostResult struct { + Hostname string + Changed bool + Error string + Data map[string]any +} +``` + -## type [Op]() +## type [Op]() Op represents a declarative SDK operation. @@ -197,7 +227,7 @@ func (p *Plan) Config() PlanConfig Config returns the plan configuration. -### func \(\*Plan\) [Explain]() +### func \(\*Plan\) [Explain]() ```go func (p *Plan) Explain() string @@ -206,7 +236,7 @@ func (p *Plan) Explain() string Explain returns a human\-readable representation of the execution plan showing levels, parallelism, dependencies, and guards. -### func \(\*Plan\) [Levels]() +### func \(\*Plan\) [Levels]() ```go func (p *Plan) Levels() ([][]*Task, error) @@ -215,7 +245,7 @@ func (p *Plan) Levels() ([][]*Task, error) Levels returns the levelized DAG \-\- tasks grouped into execution levels where all tasks in a level can run concurrently. Returns an error if the plan fails validation. -### func \(\*Plan\) [Run]() +### func \(\*Plan\) [Run]() ```go func (p *Plan) Run(ctx context.Context) (*Report, error) @@ -241,8 +271,17 @@ func (p *Plan) TaskFunc(name string, fn TaskFn) *Task TaskFunc creates a functional task, adds it to the plan, and returns it. + +### func \(\*Plan\) [TaskFuncWithResults]() + +```go +func (p *Plan) TaskFuncWithResults(name string, fn TaskFnWithResults) *Task +``` + +TaskFuncWithResults creates a functional task that receives completed results from prior tasks, adds it to the plan, and returns it. + -### func \(\*Plan\) [Tasks]() +### func \(\*Plan\) [Tasks]() ```go func (p *Plan) Tasks() []*Task @@ -251,7 +290,7 @@ func (p *Plan) Tasks() []*Task Tasks returns all tasks in the plan. -### func \(\*Plan\) [Validate]() +### func \(\*Plan\) [Validate]() ```go func (p *Plan) Validate() error @@ -299,7 +338,7 @@ func WithHooks(hooks Hooks) PlanOption WithHooks attaches lifecycle callbacks to plan execution. -## type [PlanSummary]() +## type [PlanSummary]() PlanSummary describes the execution plan before it runs. @@ -311,7 +350,7 @@ type PlanSummary struct { ``` -## type [Report]() +## type [Report]() Report is the aggregate output of a plan execution. @@ -323,7 +362,7 @@ type Report struct { ``` -### func \(\*Report\) [Summary]() +### func \(\*Report\) [Summary]() ```go func (r *Report) Summary() string @@ -332,19 +371,21 @@ func (r *Report) Summary() string Summary returns a human\-readable summary of the report. -## type [Result]() +## type [Result]() Result is the outcome of a single task execution. ```go type Result struct { - Changed bool - Data map[string]any + Changed bool + Data map[string]any + Status Status + HostResults []HostResult } ``` -## type [Results]() +## type [Results]() Results is a map of task name to Result, used for conditional logic. @@ -353,7 +394,7 @@ type Results map[string]*Result ``` -### func \(Results\) [Get]() +### func \(Results\) [Get]() ```go func (r Results) Get(name string) *Result @@ -387,7 +428,7 @@ const ( ``` -## type [StepSummary]() +## type [StepSummary]() StepSummary describes a single execution step \(DAG level\). @@ -399,7 +440,7 @@ type StepSummary struct { ``` -## type [Task]() +## type [Task]() Task is a unit of work in an orchestration plan. @@ -410,7 +451,7 @@ type Task struct { ``` -### func [NewTask]() +### func [NewTask]() ```go func NewTask(name string, op *Op) *Task @@ -419,7 +460,7 @@ func NewTask(name string, op *Op) *Task NewTask creates a declarative task wrapping an SDK operation. -### func [NewTaskFunc]() +### func [NewTaskFunc]() ```go func NewTaskFunc(name string, fn TaskFn) *Task @@ -427,8 +468,17 @@ func NewTaskFunc(name string, fn TaskFn) *Task NewTaskFunc creates a functional task with custom logic. + +### func [NewTaskFuncWithResults]() + +```go +func NewTaskFuncWithResults(name string, fn TaskFnWithResults) *Task +``` + +NewTaskFuncWithResults creates a functional task that receives completed results from prior tasks. + -### func \(\*Task\) [Dependencies]() +### func \(\*Task\) [Dependencies]() ```go func (t *Task) Dependencies() []*Task @@ -437,7 +487,7 @@ func (t *Task) Dependencies() []*Task Dependencies returns the task's dependencies. -### func \(\*Task\) [DependsOn]() +### func \(\*Task\) [DependsOn]() ```go func (t *Task) DependsOn(deps ...*Task) *Task @@ -446,7 +496,7 @@ func (t *Task) DependsOn(deps ...*Task) *Task DependsOn sets this task's dependencies. Returns the task for chaining. -### func \(\*Task\) [ErrorStrategy]() +### func \(\*Task\) [ErrorStrategy]() ```go func (t *Task) ErrorStrategy() *ErrorStrategy @@ -455,7 +505,7 @@ func (t *Task) ErrorStrategy() *ErrorStrategy ErrorStrategy returns the per\-task error strategy, or nil to use the plan default. -### func \(\*Task\) [Fn]() +### func \(\*Task\) [Fn]() ```go func (t *Task) Fn() TaskFn @@ -464,7 +514,7 @@ func (t *Task) Fn() TaskFn Fn returns the task function, or nil for declarative tasks. -### func \(\*Task\) [Guard]() +### func \(\*Task\) [Guard]() ```go func (t *Task) Guard() GuardFn @@ -473,7 +523,7 @@ func (t *Task) Guard() GuardFn Guard returns the guard function, or nil if none is set. -### func \(\*Task\) [IsFunc]() +### func \(\*Task\) [IsFunc]() ```go func (t *Task) IsFunc() bool @@ -482,7 +532,7 @@ func (t *Task) IsFunc() bool IsFunc returns true if this is a functional task. -### func \(\*Task\) [Name]() +### func \(\*Task\) [Name]() ```go func (t *Task) Name() string @@ -491,7 +541,7 @@ func (t *Task) Name() string Name returns the task name. -### func \(\*Task\) [OnError]() +### func \(\*Task\) [OnError]() ```go func (t *Task) OnError(strategy ErrorStrategy) @@ -500,7 +550,7 @@ func (t *Task) OnError(strategy ErrorStrategy) OnError sets a per\-task error strategy override. -### func \(\*Task\) [OnlyIfChanged]() +### func \(\*Task\) [OnlyIfChanged]() ```go func (t *Task) OnlyIfChanged() @@ -509,7 +559,7 @@ func (t *Task) OnlyIfChanged() OnlyIfChanged marks this task to only run if at least one dependency reported Changed=true. -### func \(\*Task\) [Operation]() +### func \(\*Task\) [Operation]() ```go func (t *Task) Operation() *Op @@ -518,7 +568,7 @@ func (t *Task) Operation() *Op Operation returns the declarative operation, or nil for functional tasks. -### func \(\*Task\) [RequiresChange]() +### func \(\*Task\) [RequiresChange]() ```go func (t *Task) RequiresChange() bool @@ -526,8 +576,17 @@ func (t *Task) RequiresChange() bool RequiresChange returns true if OnlyIfChanged was set. + +### func \(\*Task\) [SetName]() + +```go +func (t *Task) SetName(name string) +``` + +SetName changes the task name. + -### func \(\*Task\) [When]() +### func \(\*Task\) [When]() ```go func (t *Task) When(fn GuardFn) @@ -535,8 +594,17 @@ func (t *Task) When(fn GuardFn) When sets a custom guard function that determines whether this task should execute. + +### func \(\*Task\) [WhenWithReason]() + +```go +func (t *Task) WhenWithReason(fn GuardFn, reason string) +``` + +WhenWithReason sets a guard with a custom skip reason shown when the guard returns false. + -## type [TaskFn]() +## type [TaskFn]() TaskFn is the signature for functional tasks. The client parameter provides access to the OSAPI SDK for making API calls. @@ -547,18 +615,33 @@ type TaskFn func( ) (*Result, error) ``` + +## type [TaskFnWithResults]() + +TaskFnWithResults is like TaskFn but receives completed task results for inter\-task data access. + +```go +type TaskFnWithResults func( + ctx context.Context, + client *osapi.Client, + results Results, +) (*Result, error) +``` + -## type [TaskResult]() +## type [TaskResult]() TaskResult records the full execution details of a task. ```go type TaskResult struct { - Name string - Status Status - Changed bool - Duration time.Duration - Error error + Name string + Status Status + Changed bool + Duration time.Duration + Error error + Data map[string]any + HostResults []HostResult } ``` diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index c9e16aa..f030851 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -307,7 +307,11 @@ func (r *runner) runTask( Duration: time.Since(start), } - r.callOnSkip(t, "guard returned false") + reason := "guard returned false" + if t.guardReason != "" { + reason = t.guardReason + } + r.callOnSkip(t, reason) r.callAfterTask(t, tr) return tr diff --git a/pkg/orchestrator/task.go b/pkg/orchestrator/task.go index 044454b..d593193 100644 --- a/pkg/orchestrator/task.go +++ b/pkg/orchestrator/task.go @@ -40,6 +40,7 @@ type Task struct { fnr TaskFnWithResults deps []*Task guard GuardFn + guardReason string requiresChange bool errorStrategy *ErrorStrategy } @@ -140,6 +141,16 @@ func (t *Task) When( t.guard = fn } +// WhenWithReason sets a guard with a custom skip reason shown when +// the guard returns false. +func (t *Task) WhenWithReason( + fn GuardFn, + reason string, +) { + t.guard = fn + t.guardReason = reason +} + // Guard returns the guard function, or nil if none is set. func (t *Task) Guard() GuardFn { return t.guard From b71fdfbfa8f67c6c3f70ca443306832285aebe05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Mon, 2 Mar 2026 15:16:53 -0800 Subject: [PATCH 12/12] test(orchestrator): add tests for SetName and WhenWithReason MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover SetName and WhenWithReason with table-driven unit tests and an integration test verifying custom skip reasons flow through OnSkip. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pkg/orchestrator/plan_public_test.go | 22 ++++++++++ pkg/orchestrator/task_public_test.go | 62 ++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/pkg/orchestrator/plan_public_test.go b/pkg/orchestrator/plan_public_test.go index 920620c..f3e155a 100644 --- a/pkg/orchestrator/plan_public_test.go +++ b/pkg/orchestrator/plan_public_test.go @@ -1033,6 +1033,28 @@ func (s *PlanPublicTestSuite) TestRunHooks() { s.Contains(skips[0], "guard returned false") }) + s.Run("skip hook for guard with custom reason", func() { + var events []string + + hooks := allHooks(&events) + plan := orchestrator.NewPlan(nil, orchestrator.WithHooks(hooks)) + + a := plan.TaskFunc("a", taskFunc(false, nil)) + b := plan.TaskFunc("b", taskFunc(true, nil)) + b.DependsOn(a) + b.WhenWithReason( + func(_ orchestrator.Results) bool { return false }, + "host is unreachable", + ) + + _, err := plan.Run(context.Background()) + s.NoError(err) + + skips := filterPrefix(events, "skip-") + s.Len(skips, 1) + s.Contains(skips[0], "host is unreachable") + }) + s.Run("skip hook for only-if-changed", func() { var events []string diff --git a/pkg/orchestrator/task_public_test.go b/pkg/orchestrator/task_public_test.go index 7319d51..01327c6 100644 --- a/pkg/orchestrator/task_public_test.go +++ b/pkg/orchestrator/task_public_test.go @@ -92,6 +92,68 @@ func (s *TaskPublicTestSuite) TestTaskFunc() { s.True(task.IsFunc()) } +func (s *TaskPublicTestSuite) TestSetName() { + tests := []struct { + name string + initial string + renamed string + wantName string + }{ + { + name: "changes task name", + initial: "original", + renamed: "renamed", + wantName: "renamed", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + task := orchestrator.NewTask( + tt.initial, + &orchestrator.Op{Operation: "noop"}, + ) + task.SetName(tt.renamed) + s.Equal(tt.wantName, task.Name()) + }) + } +} + +func (s *TaskPublicTestSuite) TestWhenWithReason() { + tests := []struct { + name string + guardResult bool + reason string + }{ + { + name: "sets guard and reason when guard returns false", + guardResult: false, + reason: "host is unreachable", + }, + { + name: "sets guard and reason when guard returns true", + guardResult: true, + reason: "custom reason", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + task := orchestrator.NewTask( + "t", + &orchestrator.Op{Operation: "noop"}, + ) + task.WhenWithReason(func(_ orchestrator.Results) bool { + return tt.guardResult + }, tt.reason) + + guard := task.Guard() + s.NotNil(guard) + s.Equal(tt.guardResult, guard(orchestrator.Results{})) + }) + } +} + func (s *TaskPublicTestSuite) TestOnErrorOverride() { task := orchestrator.NewTask("t", &orchestrator.Op{Operation: "noop"}) task.OnError(orchestrator.Continue)