diff --git a/README.md b/README.md
index c3eba98..2e30751 100644
--- a/README.md
+++ b/README.md
@@ -24,16 +24,16 @@ Typed Go client for every OSAPI endpoint. See the
[client docs](docs/osapi/README.md) for targeting, options, and quick
start.
-| Service | Operations | Docs | Source |
-| ------- | ---------- | ---- | ------ |
-| Node | Hostname, disk, memory, load, uptime, OS info, status | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) |
-| Network | DNS get/update, ping | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) |
-| Command | exec, shell | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) |
-| Job | Create, get, list, delete, retry, stats | [docs](docs/osapi/job.md) | [`job.go`](pkg/osapi/job.go) |
-| Agent | List, get (discovery + heartbeat data) | [docs](docs/osapi/agent.md) | [`agent.go`](pkg/osapi/agent.go) |
-| Health | Liveness, readiness, status | [docs](docs/osapi/health.md) | [`health.go`](pkg/osapi/health.go) |
-| Audit | List, get, export | [docs](docs/osapi/audit.md) | [`audit.go`](pkg/osapi/audit.go) |
-| Metrics | Prometheus text | [docs](docs/osapi/metrics.md) | [`metrics.go`](pkg/osapi/metrics.go) |
+| Service | Operations | Docs | Source |
+| ------- | ----------------------------------------------------- | ----------------------------- | ------------------------------------ |
+| Node | Hostname, disk, memory, load, uptime, OS info, status | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) |
+| Network | DNS get/update, ping | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) |
+| Command | exec, shell | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) |
+| Job | Create, get, list, delete, retry, stats | [docs](docs/osapi/job.md) | [`job.go`](pkg/osapi/job.go) |
+| Agent | List, get (discovery + heartbeat data) | [docs](docs/osapi/agent.md) | [`agent.go`](pkg/osapi/agent.go) |
+| Health | Liveness, readiness, status | [docs](docs/osapi/health.md) | [`health.go`](pkg/osapi/health.go) |
+| Audit | List, get, export | [docs](docs/osapi/audit.md) | [`audit.go`](pkg/osapi/audit.go) |
+| Metrics | Prometheus text | [docs](docs/osapi/metrics.md) | [`metrics.go`](pkg/osapi/metrics.go) |
## Orchestration
@@ -41,40 +41,42 @@ DAG-based task execution on top of the client. See the
[orchestration docs](docs/orchestration/README.md) for hooks, error
strategies, and adding new operations.
-| Feature | Description | Source |
-| ------- | ----------- | ------ |
-| DAG execution | Dependency-based ordering with automatic parallelism | [`plan.go`](pkg/orchestrator/plan.go) |
-| Op tasks | Declarative OSAPI operations with target routing and params | [`task.go`](pkg/orchestrator/task.go) |
-| TaskFunc | Custom Go functions with SDK client access | [`task.go`](pkg/orchestrator/task.go) |
-| Hooks | 8 lifecycle callbacks — consumer-controlled logging | [`options.go`](pkg/orchestrator/options.go) |
-| Error strategies | StopAll, Continue (skip dependents), Retry(n) | [`options.go`](pkg/orchestrator/options.go) |
-| Guards | `When` predicates, `OnlyIfChanged` conditional execution | [`task.go`](pkg/orchestrator/task.go) |
-| Changed detection | Read-only ops return false, mutators reflect actual state | [`runner.go`](pkg/orchestrator/runner.go) |
-| Result access | `Results.Get()` for cross-task data flow | [`result.go`](pkg/orchestrator/result.go) |
+| Feature | Description | Source |
+| ------------------- | ------------------------------------------------------------------------- | ------------------------------------------- |
+| DAG execution | Dependency-based ordering with automatic parallelism | [`plan.go`](pkg/orchestrator/plan.go) |
+| Op tasks | Declarative OSAPI operations with target routing and params | [`task.go`](pkg/orchestrator/task.go) |
+| TaskFunc | Custom Go functions with SDK client access | [`task.go`](pkg/orchestrator/task.go) |
+| TaskFuncWithResults | Custom functions that receive completed results from prior tasks | [`task.go`](pkg/orchestrator/task.go) |
+| Hooks | 8 lifecycle callbacks — consumer-controlled logging | [`options.go`](pkg/orchestrator/options.go) |
+| Error strategies | StopAll, Continue (skip dependents), Retry(n) | [`options.go`](pkg/orchestrator/options.go) |
+| Guards | `When` predicates, `OnlyIfChanged` conditional execution | [`task.go`](pkg/orchestrator/task.go) |
+| Changed detection | Read-only ops return false, mutators reflect actual state | [`runner.go`](pkg/orchestrator/runner.go) |
+| Result access | `Results.Get()` for cross-task data flow, `Status` for outcome inspection | [`result.go`](pkg/orchestrator/result.go) |
+| Broadcast results | Per-host `HostResult` data for multi-target operations | [`result.go`](pkg/orchestrator/result.go) |
### Operations
-| Operation | Description | Idempotent | Docs |
-| --------- | ----------- | ---------- | ---- |
-| `node.hostname.get` | Get system hostname | Read-only | [docs](docs/orchestration/node-hostname.md) |
-| `node.status.get` | Get node status | Read-only | [docs](docs/orchestration/node-status.md) |
-| `node.disk.get` | Get disk usage | Read-only | [docs](docs/orchestration/node-disk.md) |
-| `node.memory.get` | Get memory stats | Read-only | [docs](docs/orchestration/node-memory.md) |
-| `node.load.get` | Get load averages | Read-only | [docs](docs/orchestration/node-load.md) |
-| `network.dns.get` | Get DNS configuration | Read-only | [docs](docs/orchestration/network-dns-get.md) |
-| `network.dns.update` | Update DNS servers | Yes | [docs](docs/orchestration/network-dns-update.md) |
-| `network.ping.do` | Ping a host | Read-only | [docs](docs/orchestration/network-ping.md) |
-| `command.exec.execute` | Execute a command | No | [docs](docs/orchestration/command-exec.md) |
-| `command.shell.execute` | Execute a shell string | No | [docs](docs/orchestration/command-shell.md) |
+| Operation | Description | Idempotent | Docs |
+| ----------------------- | ---------------------- | ---------- | ------------------------------------------------ |
+| `node.hostname.get` | Get system hostname | Read-only | [docs](docs/orchestration/node-hostname.md) |
+| `node.status.get` | Get node status | Read-only | [docs](docs/orchestration/node-status.md) |
+| `node.disk.get` | Get disk usage | Read-only | [docs](docs/orchestration/node-disk.md) |
+| `node.memory.get` | Get memory stats | Read-only | [docs](docs/orchestration/node-memory.md) |
+| `node.load.get` | Get load averages | Read-only | [docs](docs/orchestration/node-load.md) |
+| `network.dns.get` | Get DNS configuration | Read-only | [docs](docs/orchestration/network-dns-get.md) |
+| `network.dns.update` | Update DNS servers | Yes | [docs](docs/orchestration/network-dns-update.md) |
+| `network.ping.do` | Ping a host | Read-only | [docs](docs/orchestration/network-ping.md) |
+| `command.exec.execute` | Execute a command | No | [docs](docs/orchestration/command-exec.md) |
+| `command.shell.execute` | Execute a shell string | No | [docs](docs/orchestration/command-shell.md) |
## Examples
Each example is a standalone Go program you can read and run.
-| Example | What it shows |
-| ------- | ------------- |
-| [discovery](examples/discovery/main.go) | Runnable DAG that discovers fleet info: health check, agent listing, and status in parallel |
-| [all](examples/all/main.go) | Every feature: hooks, Op tasks, TaskFunc, dependencies, guards, Levels(), error strategies, reporting |
+| Example | What it shows |
+| --------------------------------------- | ----------------------------------------------------------------------------------------------------- |
+| [discovery](examples/discovery/main.go) | Runnable DAG that discovers fleet info: health check, agent listing, and status in parallel |
+| [all](examples/all/main.go) | Every feature: hooks, Op tasks, TaskFunc, dependencies, guards, Levels(), error strategies, reporting |
```bash
cd examples/discovery
diff --git a/docs/gen/orchestrator.md b/docs/gen/orchestrator.md
index e54b998..6cc68ce 100644
--- a/docs/gen/orchestrator.md
+++ b/docs/gen/orchestrator.md
@@ -11,12 +11,14 @@ Package orchestrator provides DAG\-based task orchestration primitives.
## Index
- [Variables](<#variables>)
+- [func IsBroadcastTarget\(target string\) bool](<#IsBroadcastTarget>)
- [type ErrorStrategy](<#ErrorStrategy>)
- [func Retry\(n int\) ErrorStrategy](<#Retry>)
- [func \(e ErrorStrategy\) RetryCount\(\) int](<#ErrorStrategy.RetryCount>)
- [func \(e ErrorStrategy\) String\(\) string](<#ErrorStrategy.String>)
- [type GuardFn](<#GuardFn>)
- [type Hooks](<#Hooks>)
+- [type HostResult](<#HostResult>)
- [type Op](<#Op>)
- [type Plan](<#Plan>)
- [func NewPlan\(client \*osapi.Client, opts ...PlanOption\) \*Plan](<#NewPlan>)
@@ -27,6 +29,7 @@ Package orchestrator provides DAG\-based task orchestration primitives.
- [func \(p \*Plan\) Run\(ctx context.Context\) \(\*Report, error\)](<#Plan.Run>)
- [func \(p \*Plan\) Task\(name string, op \*Op\) \*Task](<#Plan.Task>)
- [func \(p \*Plan\) TaskFunc\(name string, fn TaskFn\) \*Task](<#Plan.TaskFunc>)
+ - [func \(p \*Plan\) TaskFuncWithResults\(name string, fn TaskFnWithResults\) \*Task](<#Plan.TaskFuncWithResults>)
- [func \(p \*Plan\) Tasks\(\) \[\]\*Task](<#Plan.Tasks>)
- [func \(p \*Plan\) Validate\(\) error](<#Plan.Validate>)
- [type PlanConfig](<#PlanConfig>)
@@ -44,6 +47,7 @@ Package orchestrator provides DAG\-based task orchestration primitives.
- [type Task](<#Task>)
- [func NewTask\(name string, op \*Op\) \*Task](<#NewTask>)
- [func NewTaskFunc\(name string, fn TaskFn\) \*Task](<#NewTaskFunc>)
+ - [func NewTaskFuncWithResults\(name string, fn TaskFnWithResults\) \*Task](<#NewTaskFuncWithResults>)
- [func \(t \*Task\) Dependencies\(\) \[\]\*Task](<#Task.Dependencies>)
- [func \(t \*Task\) DependsOn\(deps ...\*Task\) \*Task](<#Task.DependsOn>)
- [func \(t \*Task\) ErrorStrategy\(\) \*ErrorStrategy](<#Task.ErrorStrategy>)
@@ -55,8 +59,11 @@ Package orchestrator provides DAG\-based task orchestration primitives.
- [func \(t \*Task\) OnlyIfChanged\(\)](<#Task.OnlyIfChanged>)
- [func \(t \*Task\) Operation\(\) \*Op](<#Task.Operation>)
- [func \(t \*Task\) RequiresChange\(\) bool](<#Task.RequiresChange>)
+ - [func \(t \*Task\) SetName\(name string\)](<#Task.SetName>)
- [func \(t \*Task\) When\(fn GuardFn\)](<#Task.When>)
+ - [func \(t \*Task\) WhenWithReason\(fn GuardFn, reason string\)](<#Task.WhenWithReason>)
- [type TaskFn](<#TaskFn>)
+- [type TaskFnWithResults](<#TaskFnWithResults>)
- [type TaskResult](<#TaskResult>)
@@ -80,6 +87,15 @@ var DefaultPollInterval = 500 * time.Millisecond
var StopAll = ErrorStrategy{/* contains filtered or unexported fields */}
```
+
+## func [IsBroadcastTarget]()
+
+```go
+func IsBroadcastTarget(target string) bool
+```
+
+IsBroadcastTarget returns true if the target addresses multiple agents \(broadcast or label selector\).
+
## type [ErrorStrategy]()
@@ -119,7 +135,7 @@ func (e ErrorStrategy) String() string
String returns a human\-readable representation of the strategy.
-## type [GuardFn]()
+## type [GuardFn]()
GuardFn is a predicate that determines if a task should run.
@@ -145,8 +161,22 @@ type Hooks struct {
}
```
+
+## type [HostResult]()
+
+HostResult represents a single host's response within a broadcast operation.
+
+```go
+type HostResult struct {
+ Hostname string
+ Changed bool
+ Error string
+ Data map[string]any
+}
+```
+
-## type [Op]()
+## type [Op]()
Op represents a declarative SDK operation.
@@ -197,7 +227,7 @@ func (p *Plan) Config() PlanConfig
Config returns the plan configuration.
-### func \(\*Plan\) [Explain]()
+### func \(\*Plan\) [Explain]()
```go
func (p *Plan) Explain() string
@@ -206,7 +236,7 @@ func (p *Plan) Explain() string
Explain returns a human\-readable representation of the execution plan showing levels, parallelism, dependencies, and guards.
-### func \(\*Plan\) [Levels]()
+### func \(\*Plan\) [Levels]()
```go
func (p *Plan) Levels() ([][]*Task, error)
@@ -215,7 +245,7 @@ func (p *Plan) Levels() ([][]*Task, error)
Levels returns the levelized DAG \-\- tasks grouped into execution levels where all tasks in a level can run concurrently. Returns an error if the plan fails validation.
-### func \(\*Plan\) [Run]()
+### func \(\*Plan\) [Run]()
```go
func (p *Plan) Run(ctx context.Context) (*Report, error)
@@ -241,8 +271,17 @@ func (p *Plan) TaskFunc(name string, fn TaskFn) *Task
TaskFunc creates a functional task, adds it to the plan, and returns it.
+
+### func \(\*Plan\) [TaskFuncWithResults]()
+
+```go
+func (p *Plan) TaskFuncWithResults(name string, fn TaskFnWithResults) *Task
+```
+
+TaskFuncWithResults creates a functional task that receives completed results from prior tasks, adds it to the plan, and returns it.
+
-### func \(\*Plan\) [Tasks]()
+### func \(\*Plan\) [Tasks]()
```go
func (p *Plan) Tasks() []*Task
@@ -251,7 +290,7 @@ func (p *Plan) Tasks() []*Task
Tasks returns all tasks in the plan.
-### func \(\*Plan\) [Validate]()
+### func \(\*Plan\) [Validate]()
```go
func (p *Plan) Validate() error
@@ -299,7 +338,7 @@ func WithHooks(hooks Hooks) PlanOption
WithHooks attaches lifecycle callbacks to plan execution.
-## type [PlanSummary]()
+## type [PlanSummary]()
PlanSummary describes the execution plan before it runs.
@@ -311,7 +350,7 @@ type PlanSummary struct {
```
-## type [Report]()
+## type [Report]()
Report is the aggregate output of a plan execution.
@@ -323,7 +362,7 @@ type Report struct {
```
-### func \(\*Report\) [Summary]()
+### func \(\*Report\) [Summary]()
```go
func (r *Report) Summary() string
@@ -332,19 +371,21 @@ func (r *Report) Summary() string
Summary returns a human\-readable summary of the report.
-## type [Result]()
+## type [Result]()
Result is the outcome of a single task execution.
```go
type Result struct {
- Changed bool
- Data map[string]any
+ Changed bool
+ Data map[string]any
+ Status Status
+ HostResults []HostResult
}
```
-## type [Results]()
+## type [Results]()
Results is a map of task name to Result, used for conditional logic.
@@ -353,7 +394,7 @@ type Results map[string]*Result
```
-### func \(Results\) [Get]()
+### func \(Results\) [Get]()
```go
func (r Results) Get(name string) *Result
@@ -387,7 +428,7 @@ const (
```
-## type [StepSummary]()
+## type [StepSummary]()
StepSummary describes a single execution step \(DAG level\).
@@ -399,7 +440,7 @@ type StepSummary struct {
```
-## type [Task]()
+## type [Task]()
Task is a unit of work in an orchestration plan.
@@ -410,7 +451,7 @@ type Task struct {
```
-### func [NewTask]()
+### func [NewTask]()
```go
func NewTask(name string, op *Op) *Task
@@ -419,7 +460,7 @@ func NewTask(name string, op *Op) *Task
NewTask creates a declarative task wrapping an SDK operation.
-### func [NewTaskFunc]()
+### func [NewTaskFunc]()
```go
func NewTaskFunc(name string, fn TaskFn) *Task
@@ -427,8 +468,17 @@ func NewTaskFunc(name string, fn TaskFn) *Task
NewTaskFunc creates a functional task with custom logic.
+
+### func [NewTaskFuncWithResults]()
+
+```go
+func NewTaskFuncWithResults(name string, fn TaskFnWithResults) *Task
+```
+
+NewTaskFuncWithResults creates a functional task that receives completed results from prior tasks.
+
-### func \(\*Task\) [Dependencies]()
+### func \(\*Task\) [Dependencies]()
```go
func (t *Task) Dependencies() []*Task
@@ -437,7 +487,7 @@ func (t *Task) Dependencies() []*Task
Dependencies returns the task's dependencies.
-### func \(\*Task\) [DependsOn]()
+### func \(\*Task\) [DependsOn]()
```go
func (t *Task) DependsOn(deps ...*Task) *Task
@@ -446,7 +496,7 @@ func (t *Task) DependsOn(deps ...*Task) *Task
DependsOn sets this task's dependencies. Returns the task for chaining.
-### func \(\*Task\) [ErrorStrategy]()
+### func \(\*Task\) [ErrorStrategy]()
```go
func (t *Task) ErrorStrategy() *ErrorStrategy
@@ -455,7 +505,7 @@ func (t *Task) ErrorStrategy() *ErrorStrategy
ErrorStrategy returns the per\-task error strategy, or nil to use the plan default.
-### func \(\*Task\) [Fn]()
+### func \(\*Task\) [Fn]()
```go
func (t *Task) Fn() TaskFn
@@ -464,7 +514,7 @@ func (t *Task) Fn() TaskFn
Fn returns the task function, or nil for declarative tasks.
-### func \(\*Task\) [Guard]()
+### func \(\*Task\) [Guard]()
```go
func (t *Task) Guard() GuardFn
@@ -473,7 +523,7 @@ func (t *Task) Guard() GuardFn
Guard returns the guard function, or nil if none is set.
-### func \(\*Task\) [IsFunc]()
+### func \(\*Task\) [IsFunc]()
```go
func (t *Task) IsFunc() bool
@@ -482,7 +532,7 @@ func (t *Task) IsFunc() bool
IsFunc returns true if this is a functional task.
-### func \(\*Task\) [Name]()
+### func \(\*Task\) [Name]()
```go
func (t *Task) Name() string
@@ -491,7 +541,7 @@ func (t *Task) Name() string
Name returns the task name.
-### func \(\*Task\) [OnError]()
+### func \(\*Task\) [OnError]()
```go
func (t *Task) OnError(strategy ErrorStrategy)
@@ -500,7 +550,7 @@ func (t *Task) OnError(strategy ErrorStrategy)
OnError sets a per\-task error strategy override.
-### func \(\*Task\) [OnlyIfChanged]()
+### func \(\*Task\) [OnlyIfChanged]()
```go
func (t *Task) OnlyIfChanged()
@@ -509,7 +559,7 @@ func (t *Task) OnlyIfChanged()
OnlyIfChanged marks this task to only run if at least one dependency reported Changed=true.
-### func \(\*Task\) [Operation]()
+### func \(\*Task\) [Operation]()
```go
func (t *Task) Operation() *Op
@@ -518,7 +568,7 @@ func (t *Task) Operation() *Op
Operation returns the declarative operation, or nil for functional tasks.
-### func \(\*Task\) [RequiresChange]()
+### func \(\*Task\) [RequiresChange]()
```go
func (t *Task) RequiresChange() bool
@@ -526,8 +576,17 @@ func (t *Task) RequiresChange() bool
RequiresChange returns true if OnlyIfChanged was set.
+
+### func \(\*Task\) [SetName]()
+
+```go
+func (t *Task) SetName(name string)
+```
+
+SetName changes the task name.
+
-### func \(\*Task\) [When]()
+### func \(\*Task\) [When]()
```go
func (t *Task) When(fn GuardFn)
@@ -535,8 +594,17 @@ func (t *Task) When(fn GuardFn)
When sets a custom guard function that determines whether this task should execute.
+
+### func \(\*Task\) [WhenWithReason]()
+
+```go
+func (t *Task) WhenWithReason(fn GuardFn, reason string)
+```
+
+WhenWithReason sets a guard with a custom skip reason shown when the guard returns false.
+
-## type [TaskFn]()
+## type [TaskFn]()
TaskFn is the signature for functional tasks. The client parameter provides access to the OSAPI SDK for making API calls.
@@ -547,18 +615,33 @@ type TaskFn func(
) (*Result, error)
```
+
+## type [TaskFnWithResults]()
+
+TaskFnWithResults is like TaskFn but receives completed task results for inter\-task data access.
+
+```go
+type TaskFnWithResults func(
+ ctx context.Context,
+ client *osapi.Client,
+ results Results,
+) (*Result, error)
+```
+
-## type [TaskResult]()
+## type [TaskResult]()
TaskResult records the full execution details of a task.
```go
type TaskResult struct {
- Name string
- Status Status
- Changed bool
- Duration time.Duration
- Error error
+ Name string
+ Status Status
+ Changed bool
+ Duration time.Duration
+ Error error
+ Data map[string]any
+ HostResults []HostResult
}
```
diff --git a/docs/orchestration/README.md b/docs/orchestration/README.md
index 83deb87..2bbe214 100644
--- a/docs/orchestration/README.md
+++ b/docs/orchestration/README.md
@@ -69,6 +69,66 @@ plan := orchestrator.NewPlan(client, orchestrator.OnError(orchestrator.Continue)
task.OnError(orchestrator.Retry(3)) // override for this task
```
+## Result Types
+
+### Result
+
+The `Result` struct returned by task functions:
+
+| Field | Type | Description |
+| ------------- | ---------------- | ---------------------------------------------- |
+| `Changed` | `bool` | Whether the operation modified state |
+| `Data` | `map[string]any` | Operation-specific response data |
+| `Status` | `Status` | Terminal status (`changed`, `unchanged`, etc.) |
+| `HostResults` | `[]HostResult` | Per-host results for broadcast operations |
+
+### TaskResult
+
+The `TaskResult` struct provided to `AfterTask` hooks and in `Report.Tasks`:
+
+| Field | Type | Description |
+| ---------- | ---------------- | ------------------------------------------- |
+| `Name` | `string` | Task name |
+| `Status` | `Status` | Terminal status |
+| `Changed` | `bool` | Whether the operation reported changes |
+| `Duration` | `time.Duration` | Execution time |
+| `Error` | `error` | Error if task failed; nil on success |
+| `Data` | `map[string]any` | Operation response data for post-run access |
+
+### HostResult
+
+Per-host data for broadcast operations (targeting `_all` or label selectors):
+
+| Field | Type | Description |
+| ---------- | ---------------- | ---------------------------------- |
+| `Hostname` | `string` | Agent hostname |
+| `Changed` | `bool` | Whether this host reported changes |
+| `Error` | `string` | Error message; empty on success |
+| `Data` | `map[string]any` | Host-specific response data |
+
+## TaskFuncWithResults
+
+Use `TaskFuncWithResults` when a task needs to read results from prior tasks:
+
+```go
+summarize := plan.TaskFuncWithResults(
+ "summarize",
+ func(ctx context.Context, client *osapi.Client, results orchestrator.Results) (*orchestrator.Result, error) {
+ r := results.Get("get-hostname")
+ hostname := r.Data["hostname"].(string)
+
+ return &orchestrator.Result{
+ Changed: true,
+ Data: map[string]any{"summary": hostname},
+ }, nil
+ },
+)
+summarize.DependsOn(getHostname)
+```
+
+Unlike `TaskFunc`, the function receives the `Results` map containing completed
+dependency outputs.
+
## Adding a New Operation
When a new operation is added to OSAPI:
diff --git a/examples/all/main.go b/examples/all/main.go
index ce46d84..9c2fabe 100644
--- a/examples/all/main.go
+++ b/examples/all/main.go
@@ -20,9 +20,10 @@
// Package main demonstrates every orchestrator feature: hooks for consumer-
// controlled logging at every lifecycle point, Op and TaskFunc tasks,
-// dependencies, guards, Levels() for DAG inspection, error strategies
+// TaskFuncWithResults for inter-task data passing, dependencies, guards
+// with Status inspection, Levels() for DAG inspection, error strategies
// (plan-level Continue + per-task Retry), parameterized operations,
-// result data access, and detailed result reporting.
+// result data access, and detailed result reporting with Data.
//
// This example serves as a reference for building tools like Terraform
// or Ansible that consume the SDK.
@@ -32,16 +33,18 @@
// check-health
// ├── get-hostname ────────────┐
// ├── get-disk │
-// ├── get-memory ├── print-summary (only-if-changed, when)
+// ├── get-memory ├── print-summary (TaskFuncWithResults, reads prior data)
// ├── get-load [retry:2] ──────┘
// └── run-uptime [params] ─────┘
-// optional-fail [continue] (independent, no deps on summary)
+// optional-fail [continue] (independent)
+// └── alert-on-failure (When: checks Status == StatusFailed)
//
// Run with: OSAPI_TOKEN="" go run main.go
package main
import (
"context"
+ "encoding/json"
"fmt"
"log"
"os"
@@ -254,26 +257,70 @@ func main() {
)
optionalFail.OnError(orchestrator.Continue)
- // Level 2: summary (depends on all queries, guard + OnlyIfChanged).
- // Uses Results.Get() to read data from previous tasks.
- summary := plan.TaskFunc(
+ // Level 2: summary — uses TaskFuncWithResults to read data from prior
+ // tasks and aggregate it. This is the key inter-task data passing pattern.
+ summary := plan.TaskFuncWithResults(
"print-summary",
func(
_ context.Context,
_ *osapi.Client,
+ results orchestrator.Results,
) (*orchestrator.Result, error) {
fmt.Println("\n --- Fleet Summary ---")
- fmt.Println(" All node queries completed.")
- return &orchestrator.Result{Changed: false}, nil
+ // Read hostname from a prior task via Results.Get().
+ if r := results.Get("get-hostname"); r != nil {
+ if h, ok := r.Data["hostname"].(string); ok {
+ fmt.Printf(" Hostname: %s\n", h)
+ }
+ }
+
+ // Read uptime stdout from a prior command task.
+ if r := results.Get("run-uptime"); r != nil {
+ if stdout, ok := r.Data["stdout"].(string); ok {
+ fmt.Printf(" Uptime: %s\n", stdout)
+ }
+ }
+
+ // Return aggregated data — available in Report.Tasks[].Data
+ // after plan execution completes.
+ return &orchestrator.Result{
+ Changed: false,
+ Data: map[string]any{"completed": true},
+ }, nil
},
)
summary.DependsOn(getHostname, getDisk, getMemory, getLoad, runUptime)
summary.OnlyIfChanged() // skip if no dependency reported changes
+
+ // Guard using Status inspection — only run if hostname succeeded.
summary.When(func(results orchestrator.Results) bool {
r := results.Get("get-hostname")
- return r != nil && r.Data["hostname"] != nil
+ return r != nil && r.Status == orchestrator.StatusChanged
+ })
+
+ // Level 2: alert task — runs only if optional-fail has StatusFailed.
+ // Demonstrates using Status in a When guard for failure-triggered recovery.
+ alertOnFailure := plan.TaskFunc(
+ "alert-on-failure",
+ func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*orchestrator.Result, error) {
+ fmt.Println("\n [alert] optional-fail task failed — sending alert")
+
+ return &orchestrator.Result{
+ Changed: true,
+ Data: map[string]any{"alerted": true},
+ }, nil
+ },
+ )
+ alertOnFailure.DependsOn(optionalFail)
+ alertOnFailure.When(func(results orchestrator.Results) bool {
+ r := results.Get("optional-fail")
+
+ return r != nil && r.Status == orchestrator.StatusFailed
})
// --- Structured DAG access ---
@@ -308,6 +355,7 @@ func main() {
}
// --- Detailed result inspection ---
+ // TaskResult.Data carries operation response data for post-run access.
fmt.Println("\nDetailed results:")
@@ -324,5 +372,10 @@ func main() {
r.Changed,
r.Duration,
)
+
+ if len(r.Data) > 0 {
+ b, _ := json.MarshalIndent(r.Data, " "+strings.Repeat(" ", 20), " ")
+ fmt.Printf(" %-20s data=%s\n", "", b)
+ }
}
}
diff --git a/pkg/orchestrator/plan.go b/pkg/orchestrator/plan.go
index 8c92737..2a10233 100644
--- a/pkg/orchestrator/plan.go
+++ b/pkg/orchestrator/plan.go
@@ -67,6 +67,19 @@ func (p *Plan) TaskFunc(
return t
}
+// TaskFuncWithResults creates a functional task that receives
+// completed results from prior tasks, adds it to the plan, and
+// returns it.
+func (p *Plan) TaskFuncWithResults(
+ name string,
+ fn TaskFnWithResults,
+) *Task {
+ t := NewTaskFuncWithResults(name, fn)
+ p.tasks = append(p.tasks, t)
+
+ return t
+}
+
// Tasks returns all tasks in the plan.
func (p *Plan) Tasks() []*Task {
return p.tasks
@@ -93,7 +106,7 @@ func (p *Plan) Explain() string {
for _, t := range level {
kind := "op"
- if t.fn != nil {
+ if t.IsFunc() {
kind = "fn"
}
diff --git a/pkg/orchestrator/plan_public_test.go b/pkg/orchestrator/plan_public_test.go
index c1410f1..f3e155a 100644
--- a/pkg/orchestrator/plan_public_test.go
+++ b/pkg/orchestrator/plan_public_test.go
@@ -311,6 +311,60 @@ func (s *PlanPublicTestSuite) TestRunGuard() {
}
}
+func (s *PlanPublicTestSuite) TestRunGuardWithFailedDependency() {
+ tests := []struct {
+ name string
+ guard func(orchestrator.Results) bool
+ expectRan bool
+ expectStatus orchestrator.Status
+ }{
+ {
+ name: "guard runs and returns true when dependency failed",
+ guard: func(r orchestrator.Results) bool {
+ res := r.Get("fail")
+
+ return res != nil && res.Status == orchestrator.StatusFailed
+ },
+ expectRan: true,
+ expectStatus: orchestrator.StatusChanged,
+ },
+ {
+ name: "guard runs and returns false when dependency failed",
+ guard: func(r orchestrator.Results) bool {
+ res := r.Get("fail")
+
+ return res != nil && res.Status == orchestrator.StatusChanged
+ },
+ expectRan: false,
+ expectStatus: orchestrator.StatusSkipped,
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ plan := orchestrator.NewPlan(
+ nil,
+ orchestrator.OnError(orchestrator.Continue),
+ )
+ ran := false
+
+ fail := plan.TaskFunc("fail", failFunc("boom"))
+ alert := plan.TaskFunc("alert", taskFunc(true, func() {
+ ran = true
+ }))
+ alert.DependsOn(fail)
+ alert.When(tt.guard)
+
+ report, err := plan.Run(context.Background())
+ s.Require().NoError(err)
+ s.Equal(tt.expectRan, ran)
+
+ sm := statusMap(report)
+ s.Equal(tt.expectStatus, sm["alert"])
+ })
+ }
+}
+
func (s *PlanPublicTestSuite) TestRunErrorStrategy() {
s.Run("stop all on error", func() {
plan := orchestrator.NewPlan(nil)
@@ -979,6 +1033,28 @@ func (s *PlanPublicTestSuite) TestRunHooks() {
s.Contains(skips[0], "guard returned false")
})
+ s.Run("skip hook for guard with custom reason", func() {
+ var events []string
+
+ hooks := allHooks(&events)
+ plan := orchestrator.NewPlan(nil, orchestrator.WithHooks(hooks))
+
+ a := plan.TaskFunc("a", taskFunc(false, nil))
+ b := plan.TaskFunc("b", taskFunc(true, nil))
+ b.DependsOn(a)
+ b.WhenWithReason(
+ func(_ orchestrator.Results) bool { return false },
+ "host is unreachable",
+ )
+
+ _, err := plan.Run(context.Background())
+ s.NoError(err)
+
+ skips := filterPrefix(events, "skip-")
+ s.Len(skips, 1)
+ s.Contains(skips[0], "host is unreachable")
+ })
+
s.Run("skip hook for only-if-changed", func() {
var events []string
diff --git a/pkg/orchestrator/result.go b/pkg/orchestrator/result.go
index fbb4841..1ad5c7d 100644
--- a/pkg/orchestrator/result.go
+++ b/pkg/orchestrator/result.go
@@ -23,19 +23,32 @@ const (
StatusFailed Status = "failed"
)
+// HostResult represents a single host's response within a broadcast
+// operation.
+type HostResult struct {
+ Hostname string
+ Changed bool
+ Error string
+ Data map[string]any
+}
+
// Result is the outcome of a single task execution.
type Result struct {
- Changed bool
- Data map[string]any
+ Changed bool
+ Data map[string]any
+ Status Status
+ HostResults []HostResult
}
// TaskResult records the full execution details of a task.
type TaskResult struct {
- Name string
- Status Status
- Changed bool
- Duration time.Duration
- Error error
+ Name string
+ Status Status
+ Changed bool
+ Duration time.Duration
+ Error error
+ Data map[string]any
+ HostResults []HostResult
}
// Results is a map of task name to Result, used for conditional logic.
diff --git a/pkg/orchestrator/result_public_test.go b/pkg/orchestrator/result_public_test.go
index 59ba13f..bc3052c 100644
--- a/pkg/orchestrator/result_public_test.go
+++ b/pkg/orchestrator/result_public_test.go
@@ -76,6 +76,142 @@ func (s *ResultPublicTestSuite) TestReportSummary() {
}
}
+func (s *ResultPublicTestSuite) TestResultStatusField() {
+ tests := []struct {
+ name string
+ result *orchestrator.Result
+ wantStatus orchestrator.Status
+ wantChange bool
+ }{
+ {
+ name: "changed result carries status",
+ result: &orchestrator.Result{
+ Changed: true,
+ Data: map[string]any{"hostname": "web-01"},
+ Status: orchestrator.StatusChanged,
+ },
+ wantStatus: orchestrator.StatusChanged,
+ wantChange: true,
+ },
+ {
+ name: "unchanged result carries status",
+ result: &orchestrator.Result{
+ Changed: false,
+ Status: orchestrator.StatusUnchanged,
+ },
+ wantStatus: orchestrator.StatusUnchanged,
+ wantChange: false,
+ },
+ {
+ name: "failed result carries status",
+ result: &orchestrator.Result{
+ Changed: false,
+ Status: orchestrator.StatusFailed,
+ },
+ wantStatus: orchestrator.StatusFailed,
+ wantChange: false,
+ },
+ {
+ name: "skipped result carries status",
+ result: &orchestrator.Result{
+ Changed: false,
+ Status: orchestrator.StatusSkipped,
+ },
+ wantStatus: orchestrator.StatusSkipped,
+ wantChange: false,
+ },
+ {
+ name: "zero value has empty status",
+ result: &orchestrator.Result{},
+ wantStatus: "",
+ wantChange: false,
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ s.Equal(tt.wantStatus, tt.result.Status)
+ s.Equal(tt.wantChange, tt.result.Changed)
+ })
+ }
+}
+
+func (s *ResultPublicTestSuite) TestResultHostResults() {
+ tests := []struct {
+ name string
+ result *orchestrator.Result
+ wantLen int
+ validateFn func(hrs []orchestrator.HostResult)
+ }{
+ {
+ name: "result with multiple host results",
+ result: &orchestrator.Result{
+ Changed: true,
+ Status: orchestrator.StatusChanged,
+ HostResults: []orchestrator.HostResult{
+ {
+ Hostname: "web-01",
+ Changed: true,
+ Data: map[string]any{"stdout": "ok"},
+ },
+ {
+ Hostname: "web-02",
+ Changed: false,
+ Error: "connection timeout",
+ },
+ },
+ },
+ wantLen: 2,
+ validateFn: func(hrs []orchestrator.HostResult) {
+ s.Equal("web-01", hrs[0].Hostname)
+ s.True(hrs[0].Changed)
+ s.Equal("web-02", hrs[1].Hostname)
+ s.Equal("connection timeout", hrs[1].Error)
+ },
+ },
+ {
+ name: "result with no host results",
+ result: &orchestrator.Result{
+ Changed: false,
+ Status: orchestrator.StatusUnchanged,
+ },
+ wantLen: 0,
+ },
+ {
+ name: "host result with data map",
+ result: &orchestrator.Result{
+ Changed: true,
+ Status: orchestrator.StatusChanged,
+ HostResults: []orchestrator.HostResult{
+ {
+ Hostname: "db-01",
+ Changed: true,
+ Data: map[string]any{
+ "stdout": "migrated",
+ "exit_code": float64(0),
+ },
+ },
+ },
+ },
+ wantLen: 1,
+ validateFn: func(hrs []orchestrator.HostResult) {
+ s.Equal("db-01", hrs[0].Hostname)
+ s.Equal("migrated", hrs[0].Data["stdout"])
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ s.Len(tt.result.HostResults, tt.wantLen)
+
+ if tt.validateFn != nil {
+ tt.validateFn(tt.result.HostResults)
+ }
+ })
+ }
+}
+
func (s *ResultPublicTestSuite) TestResultsGet() {
tests := []struct {
name string
diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go
index afce9e5..f030851 100644
--- a/pkg/orchestrator/runner.go
+++ b/pkg/orchestrator/runner.go
@@ -232,25 +232,31 @@ func (r *runner) runTask(
) TaskResult {
start := time.Now()
- // Skip if any dependency failed (Continue strategy).
- r.mu.Lock()
- for _, dep := range t.deps {
- if r.failed[dep.name] {
- r.failed[t.name] = true
- r.mu.Unlock()
+ // Skip if any dependency failed — unless the task has a When guard,
+ // which may intentionally inspect failure status (e.g. alert-on-failure).
+ if t.guard == nil {
+ r.mu.Lock()
- tr := TaskResult{
- Name: t.name,
- Status: StatusSkipped,
- Duration: time.Since(start),
- }
- r.callOnSkip(t, "dependency failed")
- r.callAfterTask(t, tr)
+ for _, dep := range t.deps {
+ if r.failed[dep.name] {
+ r.failed[t.name] = true
+ r.results[t.name] = &Result{Status: StatusSkipped}
+ r.mu.Unlock()
+
+ tr := TaskResult{
+ Name: t.name,
+ Status: StatusSkipped,
+ Duration: time.Since(start),
+ }
+ r.callOnSkip(t, "dependency failed")
+ r.callAfterTask(t, tr)
- return tr
+ return tr
+ }
}
+
+ r.mu.Unlock()
}
- r.mu.Unlock()
if t.requiresChange {
anyChanged := false
@@ -268,6 +274,10 @@ func (r *runner) runTask(
r.mu.Unlock()
if !anyChanged {
+ r.mu.Lock()
+ r.results[t.name] = &Result{Status: StatusSkipped}
+ r.mu.Unlock()
+
tr := TaskResult{
Name: t.name,
Status: StatusSkipped,
@@ -287,13 +297,21 @@ func (r *runner) runTask(
r.mu.Unlock()
if !shouldRun {
+ r.mu.Lock()
+ r.results[t.name] = &Result{Status: StatusSkipped}
+ r.mu.Unlock()
+
tr := TaskResult{
Name: t.name,
Status: StatusSkipped,
Duration: time.Since(start),
}
- r.callOnSkip(t, "guard returned false")
+ reason := "guard returned false"
+ if t.guardReason != "" {
+ reason = t.guardReason
+ }
+ r.callOnSkip(t, reason)
r.callAfterTask(t, tr)
return tr
@@ -315,7 +333,13 @@ func (r *runner) runTask(
client := r.plan.client
for attempt := range maxAttempts {
- if t.fn != nil {
+ if t.fnr != nil {
+ r.mu.Lock()
+ results := r.results
+ r.mu.Unlock()
+
+ result, err = t.fnr(ctx, client, results)
+ } else if t.fn != nil {
result, err = t.fn(ctx, client)
} else {
result, err = r.executeOp(ctx, t.op)
@@ -335,6 +359,7 @@ func (r *runner) runTask(
if err != nil {
r.mu.Lock()
r.failed[t.name] = true
+ r.results[t.name] = &Result{Status: StatusFailed}
r.mu.Unlock()
tr := TaskResult{
@@ -349,20 +374,24 @@ func (r *runner) runTask(
return tr
}
- r.mu.Lock()
- r.results[t.name] = result
- r.mu.Unlock()
-
status := StatusUnchanged
if result.Changed {
status = StatusChanged
}
+ result.Status = status
+
+ r.mu.Lock()
+ r.results[t.name] = result
+ r.mu.Unlock()
+
tr := TaskResult{
- Name: t.name,
- Status: status,
- Changed: result.Changed,
- Duration: elapsed,
+ Name: t.name,
+ Status: status,
+ Changed: result.Changed,
+ Duration: elapsed,
+ Data: result.Data,
+ HostResults: result.HostResults,
}
r.callAfterTask(t, tr)
@@ -373,6 +402,59 @@ func (r *runner) runTask(
// DefaultPollInterval is the interval between job status polls.
var DefaultPollInterval = 500 * time.Millisecond
+// isCommandOp returns true for command execution operations.
+func isCommandOp(
+ operation string,
+) bool {
+ return operation == "command.exec.execute" ||
+ operation == "command.shell.execute"
+}
+
+// extractHostResults parses per-agent results from a broadcast
+// collection response.
+func extractHostResults(
+ data map[string]any,
+) []HostResult {
+ resultsRaw, ok := data["results"]
+ if !ok {
+ return nil
+ }
+
+ items, ok := resultsRaw.([]any)
+ if !ok {
+ return nil
+ }
+
+ hostResults := make([]HostResult, 0, len(items))
+
+ for _, item := range items {
+ m, ok := item.(map[string]any)
+ if !ok {
+ continue
+ }
+
+ hr := HostResult{
+ Data: m,
+ }
+
+ if h, ok := m["hostname"].(string); ok {
+ hr.Hostname = h
+ }
+
+ if c, ok := m["changed"].(bool); ok {
+ hr.Changed = c
+ }
+
+ if e, ok := m["error"].(string); ok {
+ hr.Error = e
+ }
+
+ hostResults = append(hostResults, hr)
+ }
+
+ return hostResults
+}
+
// executeOp submits a declarative Op as a job via the SDK and polls
// for completion.
func (r *runner) executeOp(
@@ -415,7 +497,29 @@ func (r *runner) executeOp(
jobID := createResp.JSON201.JobId.String()
- return r.pollJob(ctx, jobID)
+ result, err := r.pollJob(ctx, jobID)
+ if err != nil {
+ return nil, err
+ }
+
+ // Extract per-host results for broadcast targets.
+ if IsBroadcastTarget(op.Target) {
+ result.HostResults = extractHostResults(result.Data)
+ }
+
+ // Non-zero exit for command operations = failure.
+ if isCommandOp(op.Operation) {
+ if exitCode, ok := result.Data["exit_code"].(float64); ok && exitCode != 0 {
+ result.Status = StatusFailed
+
+ return result, fmt.Errorf(
+ "command exited with code %d",
+ int(exitCode),
+ )
+ }
+ }
+
+ return result, nil
}
// pollJob polls a job until it reaches a terminal state.
@@ -466,6 +570,7 @@ func (r *runner) pollJob(
}
changed, _ := data["changed"].(bool)
+ delete(data, "changed")
return &Result{Changed: changed, Data: data}, nil
case "failed":
diff --git a/pkg/orchestrator/runner_broadcast_test.go b/pkg/orchestrator/runner_broadcast_test.go
new file mode 100644
index 0000000..5be2496
--- /dev/null
+++ b/pkg/orchestrator/runner_broadcast_test.go
@@ -0,0 +1,162 @@
+package orchestrator
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/suite"
+)
+
+type RunnerBroadcastTestSuite struct {
+ suite.Suite
+}
+
+func TestRunnerBroadcastTestSuite(t *testing.T) {
+ suite.Run(t, new(RunnerBroadcastTestSuite))
+}
+
+func (s *RunnerBroadcastTestSuite) TestIsBroadcastTarget() {
+ tests := []struct {
+ name string
+ target string
+ want bool
+ }{
+ {
+ name: "all agents is broadcast",
+ target: "_all",
+ want: true,
+ },
+ {
+ name: "label selector is broadcast",
+ target: "role:web",
+ want: true,
+ },
+ {
+ name: "single agent is not broadcast",
+ target: "agent-001",
+ want: false,
+ },
+ {
+ name: "empty string is not broadcast",
+ target: "",
+ want: false,
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ got := IsBroadcastTarget(tt.target)
+ s.Equal(tt.want, got)
+ })
+ }
+}
+
+func (s *RunnerBroadcastTestSuite) TestExtractHostResults() {
+ tests := []struct {
+ name string
+ data map[string]any
+ want []HostResult
+ }{
+ {
+ name: "extracts host results from results array",
+ data: map[string]any{
+ "results": []any{
+ map[string]any{
+ "hostname": "host-1",
+ "changed": true,
+ "data": "something",
+ },
+ map[string]any{
+ "hostname": "host-2",
+ "changed": false,
+ "error": "connection refused",
+ },
+ },
+ },
+ want: []HostResult{
+ {
+ Hostname: "host-1",
+ Changed: true,
+ Data: map[string]any{
+ "hostname": "host-1",
+ "changed": true,
+ "data": "something",
+ },
+ },
+ {
+ Hostname: "host-2",
+ Changed: false,
+ Error: "connection refused",
+ Data: map[string]any{
+ "hostname": "host-2",
+ "changed": false,
+ "error": "connection refused",
+ },
+ },
+ },
+ },
+ {
+ name: "no results key returns nil",
+ data: map[string]any{
+ "other": "value",
+ },
+ want: nil,
+ },
+ {
+ name: "results not an array returns nil",
+ data: map[string]any{
+ "results": "not-an-array",
+ },
+ want: nil,
+ },
+ {
+ name: "empty results array returns empty slice",
+ data: map[string]any{
+ "results": []any{},
+ },
+ want: []HostResult{},
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ got := extractHostResults(tt.data)
+ s.Equal(tt.want, got)
+ })
+ }
+}
+
+func (s *RunnerBroadcastTestSuite) TestIsCommandOp() {
+ tests := []struct {
+ name string
+ operation string
+ want bool
+ }{
+ {
+ name: "command.exec.execute is a command op",
+ operation: "command.exec.execute",
+ want: true,
+ },
+ {
+ name: "command.shell.execute is a command op",
+ operation: "command.shell.execute",
+ want: true,
+ },
+ {
+ name: "node.hostname.get is not a command op",
+ operation: "node.hostname.get",
+ want: false,
+ },
+ {
+ name: "empty string is not a command op",
+ operation: "",
+ want: false,
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ got := isCommandOp(tt.operation)
+ s.Equal(tt.want, got)
+ })
+ }
+}
diff --git a/pkg/orchestrator/runner_test.go b/pkg/orchestrator/runner_test.go
index ccf7549..d202158 100644
--- a/pkg/orchestrator/runner_test.go
+++ b/pkg/orchestrator/runner_test.go
@@ -1,9 +1,13 @@
package orchestrator
import (
+ "context"
+ "fmt"
"testing"
"github.com/stretchr/testify/suite"
+
+ "github.com/osapi-io/osapi-sdk/pkg/osapi"
)
type RunnerTestSuite struct {
@@ -68,3 +72,351 @@ func (s *RunnerTestSuite) TestLevelize() {
})
}
}
+
+func (s *RunnerTestSuite) TestRunTaskStoresResultForAllPaths() {
+ tests := []struct {
+ name string
+ setup func() *Plan
+ taskName string
+ wantStatus Status
+ }{
+ {
+ name: "OnlyIfChanged skip stores StatusSkipped",
+ setup: func() *Plan {
+ plan := NewPlan(nil, OnError(Continue))
+
+ // dep returns Changed=false, so child with
+ // OnlyIfChanged should be skipped.
+ dep := plan.TaskFunc("dep", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: false}, nil
+ })
+
+ child := plan.TaskFunc("child", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: true}, nil
+ })
+ child.DependsOn(dep)
+ child.OnlyIfChanged()
+
+ return plan
+ },
+ taskName: "child",
+ wantStatus: StatusSkipped,
+ },
+ {
+ name: "failed task stores StatusFailed",
+ setup: func() *Plan {
+ plan := NewPlan(nil, OnError(Continue))
+
+ plan.TaskFunc("failing", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return nil, fmt.Errorf("deliberate error")
+ })
+
+ return plan
+ },
+ taskName: "failing",
+ wantStatus: StatusFailed,
+ },
+ {
+ name: "guard-false skip stores StatusSkipped",
+ setup: func() *Plan {
+ plan := NewPlan(nil, OnError(Continue))
+
+ plan.TaskFunc("guarded", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: true}, nil
+ }).When(func(_ Results) bool {
+ return false
+ })
+
+ return plan
+ },
+ taskName: "guarded",
+ wantStatus: StatusSkipped,
+ },
+ {
+ name: "dependency-failed skip stores StatusSkipped",
+ setup: func() *Plan {
+ plan := NewPlan(nil, OnError(Continue))
+
+ dep := plan.TaskFunc("dep", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return nil, fmt.Errorf("deliberate error")
+ })
+
+ child := plan.TaskFunc("child", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: true}, nil
+ })
+ child.DependsOn(dep)
+
+ return plan
+ },
+ taskName: "child",
+ wantStatus: StatusSkipped,
+ },
+ {
+ name: "successful changed task stores StatusChanged",
+ setup: func() *Plan {
+ plan := NewPlan(nil, OnError(Continue))
+
+ plan.TaskFunc("ok", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: true}, nil
+ })
+
+ return plan
+ },
+ taskName: "ok",
+ wantStatus: StatusChanged,
+ },
+ {
+ name: "successful unchanged task stores StatusUnchanged",
+ setup: func() *Plan {
+ plan := NewPlan(nil, OnError(Continue))
+
+ plan.TaskFunc("ok", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: false}, nil
+ })
+
+ return plan
+ },
+ taskName: "ok",
+ wantStatus: StatusUnchanged,
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ plan := tt.setup()
+ runner := newRunner(plan)
+
+ _, err := runner.run(context.Background())
+ // Some plans produce errors (e.g. StopAll with a
+ // failing task); we don't assert on err here because
+ // we only care about the results map.
+ _ = err
+
+ result := runner.results.Get(tt.taskName)
+ s.NotNil(
+ result,
+ "results map should contain entry for %q",
+ tt.taskName,
+ )
+ s.Equal(
+ tt.wantStatus,
+ result.Status,
+ "result status for %q",
+ tt.taskName,
+ )
+ })
+ }
+}
+
+func (s *RunnerTestSuite) TestDownstreamGuardInspectsSkippedStatus() {
+ tests := []struct {
+ name string
+ setup func() (*Plan, *bool)
+ observerName string
+ wantGuardCalled bool
+ wantTaskStatus Status
+ }{
+ {
+ name: "guard can see guard-skipped task status",
+ setup: func() (*Plan, *bool) {
+ plan := NewPlan(nil, OnError(Continue))
+ guardCalled := false
+
+ // This task is skipped because its guard
+ // returns false.
+ guarded := plan.TaskFunc("guarded", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: true}, nil
+ })
+ guarded.When(func(_ Results) bool {
+ return false
+ })
+
+ // Observer depends on guarded so it runs in a
+ // later level. Its guard inspects the skipped
+ // task's status.
+ observer := plan.TaskFunc("observer", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{Changed: false}, nil
+ })
+ observer.DependsOn(guarded)
+ observer.When(func(r Results) bool {
+ guardCalled = true
+ res := r.Get("guarded")
+
+ return res != nil && res.Status == StatusSkipped
+ })
+
+ return plan, &guardCalled
+ },
+ observerName: "observer",
+ wantGuardCalled: true,
+ // Observer runs because the guard sees the skipped
+ // status and returns true.
+ wantTaskStatus: StatusUnchanged,
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ plan, guardCalled := tt.setup()
+ runner := newRunner(plan)
+
+ _, err := runner.run(context.Background())
+ _ = err
+
+ s.Equal(
+ tt.wantGuardCalled,
+ *guardCalled,
+ "guard should have been called",
+ )
+
+ result := runner.results.Get(tt.observerName)
+ s.NotNil(
+ result,
+ "observer should have a result entry",
+ )
+ s.Equal(
+ tt.wantTaskStatus,
+ result.Status,
+ "observer task status",
+ )
+ })
+ }
+}
+
+func (s *RunnerTestSuite) TestTaskFuncWithResultsReceivesResults() {
+ tests := []struct {
+ name string
+ setup func() (*Plan, *string)
+ wantCapture string
+ }{
+ {
+ name: "receives upstream result data",
+ setup: func() (*Plan, *string) {
+ plan := NewPlan(nil, OnError(StopAll))
+ var captured string
+
+ a := plan.TaskFunc("a", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{
+ Changed: true,
+ Data: map[string]any{"hostname": "web-01"},
+ }, nil
+ })
+
+ b := plan.TaskFuncWithResults("b", func(
+ _ context.Context,
+ _ *osapi.Client,
+ results Results,
+ ) (*Result, error) {
+ r := results.Get("a")
+ if r != nil {
+ if h, ok := r.Data["hostname"].(string); ok {
+ captured = h
+ }
+ }
+
+ return &Result{Changed: false}, nil
+ })
+ b.DependsOn(a)
+
+ return plan, &captured
+ },
+ wantCapture: "web-01",
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ plan, captured := tt.setup()
+
+ _, err := plan.Run(context.Background())
+
+ s.Require().NoError(err)
+ s.Equal(tt.wantCapture, *captured)
+ })
+ }
+}
+
+func (s *RunnerTestSuite) TestTaskResultCarriesData() {
+ tests := []struct {
+ name string
+ setup func() *Plan
+ taskName string
+ wantKey string
+ wantVal any
+ }{
+ {
+ name: "success result includes data",
+ setup: func() *Plan {
+ plan := NewPlan(nil, OnError(StopAll))
+
+ plan.TaskFunc("a", func(
+ _ context.Context,
+ _ *osapi.Client,
+ ) (*Result, error) {
+ return &Result{
+ Changed: true,
+ Data: map[string]any{"stdout": "hello"},
+ }, nil
+ })
+
+ return plan
+ },
+ taskName: "a",
+ wantKey: "stdout",
+ wantVal: "hello",
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ plan := tt.setup()
+
+ report, err := plan.Run(context.Background())
+
+ s.Require().NoError(err)
+
+ var found bool
+ for _, tr := range report.Tasks {
+ if tr.Name == tt.taskName {
+ found = true
+ s.Equal(tt.wantVal, tr.Data[tt.wantKey])
+ }
+ }
+
+ s.True(found, "task %q should be in report", tt.taskName)
+ })
+ }
+}
diff --git a/pkg/orchestrator/task.go b/pkg/orchestrator/task.go
index cc818e1..d593193 100644
--- a/pkg/orchestrator/task.go
+++ b/pkg/orchestrator/task.go
@@ -2,6 +2,7 @@ package orchestrator
import (
"context"
+ "strings"
"github.com/osapi-io/osapi-sdk/pkg/osapi"
)
@@ -20,6 +21,14 @@ type TaskFn func(
client *osapi.Client,
) (*Result, error)
+// TaskFnWithResults is like TaskFn but receives completed task results
+// for inter-task data access.
+type TaskFnWithResults func(
+ ctx context.Context,
+ client *osapi.Client,
+ results Results,
+) (*Result, error)
+
// GuardFn is a predicate that determines if a task should run.
type GuardFn func(results Results) bool
@@ -28,8 +37,10 @@ type Task struct {
name string
op *Op
fn TaskFn
+ fnr TaskFnWithResults
deps []*Task
guard GuardFn
+ guardReason string
requiresChange bool
errorStrategy *ErrorStrategy
}
@@ -56,14 +67,33 @@ func NewTaskFunc(
}
}
+// NewTaskFuncWithResults creates a functional task that receives
+// completed results from prior tasks.
+func NewTaskFuncWithResults(
+ name string,
+ fn TaskFnWithResults,
+) *Task {
+ return &Task{
+ name: name,
+ fnr: fn,
+ }
+}
+
// Name returns the task name.
func (t *Task) Name() string {
return t.name
}
+// SetName changes the task name.
+func (t *Task) SetName(
+ name string,
+) {
+ t.name = name
+}
+
// IsFunc returns true if this is a functional task.
func (t *Task) IsFunc() bool {
- return t.fn != nil
+ return t.fn != nil || t.fnr != nil
}
// Operation returns the declarative operation, or nil for functional
@@ -111,6 +141,16 @@ func (t *Task) When(
t.guard = fn
}
+// WhenWithReason sets a guard with a custom skip reason shown when
+// the guard returns false.
+func (t *Task) WhenWithReason(
+ fn GuardFn,
+ reason string,
+) {
+ t.guard = fn
+ t.guardReason = reason
+}
+
// Guard returns the guard function, or nil if none is set.
func (t *Task) Guard() GuardFn {
return t.guard
@@ -128,3 +168,11 @@ func (t *Task) OnError(
func (t *Task) ErrorStrategy() *ErrorStrategy {
return t.errorStrategy
}
+
+// IsBroadcastTarget returns true if the target addresses multiple
+// agents (broadcast or label selector).
+func IsBroadcastTarget(
+ target string,
+) bool {
+ return target == "_all" || strings.Contains(target, ":")
+}
diff --git a/pkg/orchestrator/task_public_test.go b/pkg/orchestrator/task_public_test.go
index 7319d51..01327c6 100644
--- a/pkg/orchestrator/task_public_test.go
+++ b/pkg/orchestrator/task_public_test.go
@@ -92,6 +92,68 @@ func (s *TaskPublicTestSuite) TestTaskFunc() {
s.True(task.IsFunc())
}
+func (s *TaskPublicTestSuite) TestSetName() {
+ tests := []struct {
+ name string
+ initial string
+ renamed string
+ wantName string
+ }{
+ {
+ name: "changes task name",
+ initial: "original",
+ renamed: "renamed",
+ wantName: "renamed",
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ task := orchestrator.NewTask(
+ tt.initial,
+ &orchestrator.Op{Operation: "noop"},
+ )
+ task.SetName(tt.renamed)
+ s.Equal(tt.wantName, task.Name())
+ })
+ }
+}
+
+func (s *TaskPublicTestSuite) TestWhenWithReason() {
+ tests := []struct {
+ name string
+ guardResult bool
+ reason string
+ }{
+ {
+ name: "sets guard and reason when guard returns false",
+ guardResult: false,
+ reason: "host is unreachable",
+ },
+ {
+ name: "sets guard and reason when guard returns true",
+ guardResult: true,
+ reason: "custom reason",
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ task := orchestrator.NewTask(
+ "t",
+ &orchestrator.Op{Operation: "noop"},
+ )
+ task.WhenWithReason(func(_ orchestrator.Results) bool {
+ return tt.guardResult
+ }, tt.reason)
+
+ guard := task.Guard()
+ s.NotNil(guard)
+ s.Equal(tt.guardResult, guard(orchestrator.Results{}))
+ })
+ }
+}
+
func (s *TaskPublicTestSuite) TestOnErrorOverride() {
task := orchestrator.NewTask("t", &orchestrator.Op{Operation: "noop"})
task.OnError(orchestrator.Continue)