diff --git a/README.md b/README.md index c3eba98..2e30751 100644 --- a/README.md +++ b/README.md @@ -24,16 +24,16 @@ Typed Go client for every OSAPI endpoint. See the [client docs](docs/osapi/README.md) for targeting, options, and quick start. -| Service | Operations | Docs | Source | -| ------- | ---------- | ---- | ------ | -| Node | Hostname, disk, memory, load, uptime, OS info, status | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | -| Network | DNS get/update, ping | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | -| Command | exec, shell | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | -| Job | Create, get, list, delete, retry, stats | [docs](docs/osapi/job.md) | [`job.go`](pkg/osapi/job.go) | -| Agent | List, get (discovery + heartbeat data) | [docs](docs/osapi/agent.md) | [`agent.go`](pkg/osapi/agent.go) | -| Health | Liveness, readiness, status | [docs](docs/osapi/health.md) | [`health.go`](pkg/osapi/health.go) | -| Audit | List, get, export | [docs](docs/osapi/audit.md) | [`audit.go`](pkg/osapi/audit.go) | -| Metrics | Prometheus text | [docs](docs/osapi/metrics.md) | [`metrics.go`](pkg/osapi/metrics.go) | +| Service | Operations | Docs | Source | +| ------- | ----------------------------------------------------- | ----------------------------- | ------------------------------------ | +| Node | Hostname, disk, memory, load, uptime, OS info, status | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | +| Network | DNS get/update, ping | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | +| Command | exec, shell | [docs](docs/osapi/node.md) | [`node.go`](pkg/osapi/node.go) | +| Job | Create, get, list, delete, retry, stats | [docs](docs/osapi/job.md) | [`job.go`](pkg/osapi/job.go) | +| Agent | List, get (discovery + heartbeat data) | [docs](docs/osapi/agent.md) | [`agent.go`](pkg/osapi/agent.go) | +| Health | Liveness, readiness, status | [docs](docs/osapi/health.md) | [`health.go`](pkg/osapi/health.go) | +| Audit | List, get, export | [docs](docs/osapi/audit.md) | [`audit.go`](pkg/osapi/audit.go) | +| Metrics | Prometheus text | [docs](docs/osapi/metrics.md) | [`metrics.go`](pkg/osapi/metrics.go) | ## Orchestration @@ -41,40 +41,42 @@ DAG-based task execution on top of the client. See the [orchestration docs](docs/orchestration/README.md) for hooks, error strategies, and adding new operations. -| Feature | Description | Source | -| ------- | ----------- | ------ | -| DAG execution | Dependency-based ordering with automatic parallelism | [`plan.go`](pkg/orchestrator/plan.go) | -| Op tasks | Declarative OSAPI operations with target routing and params | [`task.go`](pkg/orchestrator/task.go) | -| TaskFunc | Custom Go functions with SDK client access | [`task.go`](pkg/orchestrator/task.go) | -| Hooks | 8 lifecycle callbacks — consumer-controlled logging | [`options.go`](pkg/orchestrator/options.go) | -| Error strategies | StopAll, Continue (skip dependents), Retry(n) | [`options.go`](pkg/orchestrator/options.go) | -| Guards | `When` predicates, `OnlyIfChanged` conditional execution | [`task.go`](pkg/orchestrator/task.go) | -| Changed detection | Read-only ops return false, mutators reflect actual state | [`runner.go`](pkg/orchestrator/runner.go) | -| Result access | `Results.Get()` for cross-task data flow | [`result.go`](pkg/orchestrator/result.go) | +| Feature | Description | Source | +| ------------------- | ------------------------------------------------------------------------- | ------------------------------------------- | +| DAG execution | Dependency-based ordering with automatic parallelism | [`plan.go`](pkg/orchestrator/plan.go) | +| Op tasks | Declarative OSAPI operations with target routing and params | [`task.go`](pkg/orchestrator/task.go) | +| TaskFunc | Custom Go functions with SDK client access | [`task.go`](pkg/orchestrator/task.go) | +| TaskFuncWithResults | Custom functions that receive completed results from prior tasks | [`task.go`](pkg/orchestrator/task.go) | +| Hooks | 8 lifecycle callbacks — consumer-controlled logging | [`options.go`](pkg/orchestrator/options.go) | +| Error strategies | StopAll, Continue (skip dependents), Retry(n) | [`options.go`](pkg/orchestrator/options.go) | +| Guards | `When` predicates, `OnlyIfChanged` conditional execution | [`task.go`](pkg/orchestrator/task.go) | +| Changed detection | Read-only ops return false, mutators reflect actual state | [`runner.go`](pkg/orchestrator/runner.go) | +| Result access | `Results.Get()` for cross-task data flow, `Status` for outcome inspection | [`result.go`](pkg/orchestrator/result.go) | +| Broadcast results | Per-host `HostResult` data for multi-target operations | [`result.go`](pkg/orchestrator/result.go) | ### Operations -| Operation | Description | Idempotent | Docs | -| --------- | ----------- | ---------- | ---- | -| `node.hostname.get` | Get system hostname | Read-only | [docs](docs/orchestration/node-hostname.md) | -| `node.status.get` | Get node status | Read-only | [docs](docs/orchestration/node-status.md) | -| `node.disk.get` | Get disk usage | Read-only | [docs](docs/orchestration/node-disk.md) | -| `node.memory.get` | Get memory stats | Read-only | [docs](docs/orchestration/node-memory.md) | -| `node.load.get` | Get load averages | Read-only | [docs](docs/orchestration/node-load.md) | -| `network.dns.get` | Get DNS configuration | Read-only | [docs](docs/orchestration/network-dns-get.md) | -| `network.dns.update` | Update DNS servers | Yes | [docs](docs/orchestration/network-dns-update.md) | -| `network.ping.do` | Ping a host | Read-only | [docs](docs/orchestration/network-ping.md) | -| `command.exec.execute` | Execute a command | No | [docs](docs/orchestration/command-exec.md) | -| `command.shell.execute` | Execute a shell string | No | [docs](docs/orchestration/command-shell.md) | +| Operation | Description | Idempotent | Docs | +| ----------------------- | ---------------------- | ---------- | ------------------------------------------------ | +| `node.hostname.get` | Get system hostname | Read-only | [docs](docs/orchestration/node-hostname.md) | +| `node.status.get` | Get node status | Read-only | [docs](docs/orchestration/node-status.md) | +| `node.disk.get` | Get disk usage | Read-only | [docs](docs/orchestration/node-disk.md) | +| `node.memory.get` | Get memory stats | Read-only | [docs](docs/orchestration/node-memory.md) | +| `node.load.get` | Get load averages | Read-only | [docs](docs/orchestration/node-load.md) | +| `network.dns.get` | Get DNS configuration | Read-only | [docs](docs/orchestration/network-dns-get.md) | +| `network.dns.update` | Update DNS servers | Yes | [docs](docs/orchestration/network-dns-update.md) | +| `network.ping.do` | Ping a host | Read-only | [docs](docs/orchestration/network-ping.md) | +| `command.exec.execute` | Execute a command | No | [docs](docs/orchestration/command-exec.md) | +| `command.shell.execute` | Execute a shell string | No | [docs](docs/orchestration/command-shell.md) | ## Examples Each example is a standalone Go program you can read and run. -| Example | What it shows | -| ------- | ------------- | -| [discovery](examples/discovery/main.go) | Runnable DAG that discovers fleet info: health check, agent listing, and status in parallel | -| [all](examples/all/main.go) | Every feature: hooks, Op tasks, TaskFunc, dependencies, guards, Levels(), error strategies, reporting | +| Example | What it shows | +| --------------------------------------- | ----------------------------------------------------------------------------------------------------- | +| [discovery](examples/discovery/main.go) | Runnable DAG that discovers fleet info: health check, agent listing, and status in parallel | +| [all](examples/all/main.go) | Every feature: hooks, Op tasks, TaskFunc, dependencies, guards, Levels(), error strategies, reporting | ```bash cd examples/discovery diff --git a/docs/gen/orchestrator.md b/docs/gen/orchestrator.md index e54b998..6cc68ce 100644 --- a/docs/gen/orchestrator.md +++ b/docs/gen/orchestrator.md @@ -11,12 +11,14 @@ Package orchestrator provides DAG\-based task orchestration primitives. ## Index - [Variables](<#variables>) +- [func IsBroadcastTarget\(target string\) bool](<#IsBroadcastTarget>) - [type ErrorStrategy](<#ErrorStrategy>) - [func Retry\(n int\) ErrorStrategy](<#Retry>) - [func \(e ErrorStrategy\) RetryCount\(\) int](<#ErrorStrategy.RetryCount>) - [func \(e ErrorStrategy\) String\(\) string](<#ErrorStrategy.String>) - [type GuardFn](<#GuardFn>) - [type Hooks](<#Hooks>) +- [type HostResult](<#HostResult>) - [type Op](<#Op>) - [type Plan](<#Plan>) - [func NewPlan\(client \*osapi.Client, opts ...PlanOption\) \*Plan](<#NewPlan>) @@ -27,6 +29,7 @@ Package orchestrator provides DAG\-based task orchestration primitives. - [func \(p \*Plan\) Run\(ctx context.Context\) \(\*Report, error\)](<#Plan.Run>) - [func \(p \*Plan\) Task\(name string, op \*Op\) \*Task](<#Plan.Task>) - [func \(p \*Plan\) TaskFunc\(name string, fn TaskFn\) \*Task](<#Plan.TaskFunc>) + - [func \(p \*Plan\) TaskFuncWithResults\(name string, fn TaskFnWithResults\) \*Task](<#Plan.TaskFuncWithResults>) - [func \(p \*Plan\) Tasks\(\) \[\]\*Task](<#Plan.Tasks>) - [func \(p \*Plan\) Validate\(\) error](<#Plan.Validate>) - [type PlanConfig](<#PlanConfig>) @@ -44,6 +47,7 @@ Package orchestrator provides DAG\-based task orchestration primitives. - [type Task](<#Task>) - [func NewTask\(name string, op \*Op\) \*Task](<#NewTask>) - [func NewTaskFunc\(name string, fn TaskFn\) \*Task](<#NewTaskFunc>) + - [func NewTaskFuncWithResults\(name string, fn TaskFnWithResults\) \*Task](<#NewTaskFuncWithResults>) - [func \(t \*Task\) Dependencies\(\) \[\]\*Task](<#Task.Dependencies>) - [func \(t \*Task\) DependsOn\(deps ...\*Task\) \*Task](<#Task.DependsOn>) - [func \(t \*Task\) ErrorStrategy\(\) \*ErrorStrategy](<#Task.ErrorStrategy>) @@ -55,8 +59,11 @@ Package orchestrator provides DAG\-based task orchestration primitives. - [func \(t \*Task\) OnlyIfChanged\(\)](<#Task.OnlyIfChanged>) - [func \(t \*Task\) Operation\(\) \*Op](<#Task.Operation>) - [func \(t \*Task\) RequiresChange\(\) bool](<#Task.RequiresChange>) + - [func \(t \*Task\) SetName\(name string\)](<#Task.SetName>) - [func \(t \*Task\) When\(fn GuardFn\)](<#Task.When>) + - [func \(t \*Task\) WhenWithReason\(fn GuardFn, reason string\)](<#Task.WhenWithReason>) - [type TaskFn](<#TaskFn>) +- [type TaskFnWithResults](<#TaskFnWithResults>) - [type TaskResult](<#TaskResult>) @@ -80,6 +87,15 @@ var DefaultPollInterval = 500 * time.Millisecond var StopAll = ErrorStrategy{/* contains filtered or unexported fields */} ``` + +## func [IsBroadcastTarget]() + +```go +func IsBroadcastTarget(target string) bool +``` + +IsBroadcastTarget returns true if the target addresses multiple agents \(broadcast or label selector\). + ## type [ErrorStrategy]() @@ -119,7 +135,7 @@ func (e ErrorStrategy) String() string String returns a human\-readable representation of the strategy. -## type [GuardFn]() +## type [GuardFn]() GuardFn is a predicate that determines if a task should run. @@ -145,8 +161,22 @@ type Hooks struct { } ``` + +## type [HostResult]() + +HostResult represents a single host's response within a broadcast operation. + +```go +type HostResult struct { + Hostname string + Changed bool + Error string + Data map[string]any +} +``` + -## type [Op]() +## type [Op]() Op represents a declarative SDK operation. @@ -197,7 +227,7 @@ func (p *Plan) Config() PlanConfig Config returns the plan configuration. -### func \(\*Plan\) [Explain]() +### func \(\*Plan\) [Explain]() ```go func (p *Plan) Explain() string @@ -206,7 +236,7 @@ func (p *Plan) Explain() string Explain returns a human\-readable representation of the execution plan showing levels, parallelism, dependencies, and guards. -### func \(\*Plan\) [Levels]() +### func \(\*Plan\) [Levels]() ```go func (p *Plan) Levels() ([][]*Task, error) @@ -215,7 +245,7 @@ func (p *Plan) Levels() ([][]*Task, error) Levels returns the levelized DAG \-\- tasks grouped into execution levels where all tasks in a level can run concurrently. Returns an error if the plan fails validation. -### func \(\*Plan\) [Run]() +### func \(\*Plan\) [Run]() ```go func (p *Plan) Run(ctx context.Context) (*Report, error) @@ -241,8 +271,17 @@ func (p *Plan) TaskFunc(name string, fn TaskFn) *Task TaskFunc creates a functional task, adds it to the plan, and returns it. + +### func \(\*Plan\) [TaskFuncWithResults]() + +```go +func (p *Plan) TaskFuncWithResults(name string, fn TaskFnWithResults) *Task +``` + +TaskFuncWithResults creates a functional task that receives completed results from prior tasks, adds it to the plan, and returns it. + -### func \(\*Plan\) [Tasks]() +### func \(\*Plan\) [Tasks]() ```go func (p *Plan) Tasks() []*Task @@ -251,7 +290,7 @@ func (p *Plan) Tasks() []*Task Tasks returns all tasks in the plan. -### func \(\*Plan\) [Validate]() +### func \(\*Plan\) [Validate]() ```go func (p *Plan) Validate() error @@ -299,7 +338,7 @@ func WithHooks(hooks Hooks) PlanOption WithHooks attaches lifecycle callbacks to plan execution. -## type [PlanSummary]() +## type [PlanSummary]() PlanSummary describes the execution plan before it runs. @@ -311,7 +350,7 @@ type PlanSummary struct { ``` -## type [Report]() +## type [Report]() Report is the aggregate output of a plan execution. @@ -323,7 +362,7 @@ type Report struct { ``` -### func \(\*Report\) [Summary]() +### func \(\*Report\) [Summary]() ```go func (r *Report) Summary() string @@ -332,19 +371,21 @@ func (r *Report) Summary() string Summary returns a human\-readable summary of the report. -## type [Result]() +## type [Result]() Result is the outcome of a single task execution. ```go type Result struct { - Changed bool - Data map[string]any + Changed bool + Data map[string]any + Status Status + HostResults []HostResult } ``` -## type [Results]() +## type [Results]() Results is a map of task name to Result, used for conditional logic. @@ -353,7 +394,7 @@ type Results map[string]*Result ``` -### func \(Results\) [Get]() +### func \(Results\) [Get]() ```go func (r Results) Get(name string) *Result @@ -387,7 +428,7 @@ const ( ``` -## type [StepSummary]() +## type [StepSummary]() StepSummary describes a single execution step \(DAG level\). @@ -399,7 +440,7 @@ type StepSummary struct { ``` -## type [Task]() +## type [Task]() Task is a unit of work in an orchestration plan. @@ -410,7 +451,7 @@ type Task struct { ``` -### func [NewTask]() +### func [NewTask]() ```go func NewTask(name string, op *Op) *Task @@ -419,7 +460,7 @@ func NewTask(name string, op *Op) *Task NewTask creates a declarative task wrapping an SDK operation. -### func [NewTaskFunc]() +### func [NewTaskFunc]() ```go func NewTaskFunc(name string, fn TaskFn) *Task @@ -427,8 +468,17 @@ func NewTaskFunc(name string, fn TaskFn) *Task NewTaskFunc creates a functional task with custom logic. + +### func [NewTaskFuncWithResults]() + +```go +func NewTaskFuncWithResults(name string, fn TaskFnWithResults) *Task +``` + +NewTaskFuncWithResults creates a functional task that receives completed results from prior tasks. + -### func \(\*Task\) [Dependencies]() +### func \(\*Task\) [Dependencies]() ```go func (t *Task) Dependencies() []*Task @@ -437,7 +487,7 @@ func (t *Task) Dependencies() []*Task Dependencies returns the task's dependencies. -### func \(\*Task\) [DependsOn]() +### func \(\*Task\) [DependsOn]() ```go func (t *Task) DependsOn(deps ...*Task) *Task @@ -446,7 +496,7 @@ func (t *Task) DependsOn(deps ...*Task) *Task DependsOn sets this task's dependencies. Returns the task for chaining. -### func \(\*Task\) [ErrorStrategy]() +### func \(\*Task\) [ErrorStrategy]() ```go func (t *Task) ErrorStrategy() *ErrorStrategy @@ -455,7 +505,7 @@ func (t *Task) ErrorStrategy() *ErrorStrategy ErrorStrategy returns the per\-task error strategy, or nil to use the plan default. -### func \(\*Task\) [Fn]() +### func \(\*Task\) [Fn]() ```go func (t *Task) Fn() TaskFn @@ -464,7 +514,7 @@ func (t *Task) Fn() TaskFn Fn returns the task function, or nil for declarative tasks. -### func \(\*Task\) [Guard]() +### func \(\*Task\) [Guard]() ```go func (t *Task) Guard() GuardFn @@ -473,7 +523,7 @@ func (t *Task) Guard() GuardFn Guard returns the guard function, or nil if none is set. -### func \(\*Task\) [IsFunc]() +### func \(\*Task\) [IsFunc]() ```go func (t *Task) IsFunc() bool @@ -482,7 +532,7 @@ func (t *Task) IsFunc() bool IsFunc returns true if this is a functional task. -### func \(\*Task\) [Name]() +### func \(\*Task\) [Name]() ```go func (t *Task) Name() string @@ -491,7 +541,7 @@ func (t *Task) Name() string Name returns the task name. -### func \(\*Task\) [OnError]() +### func \(\*Task\) [OnError]() ```go func (t *Task) OnError(strategy ErrorStrategy) @@ -500,7 +550,7 @@ func (t *Task) OnError(strategy ErrorStrategy) OnError sets a per\-task error strategy override. -### func \(\*Task\) [OnlyIfChanged]() +### func \(\*Task\) [OnlyIfChanged]() ```go func (t *Task) OnlyIfChanged() @@ -509,7 +559,7 @@ func (t *Task) OnlyIfChanged() OnlyIfChanged marks this task to only run if at least one dependency reported Changed=true. -### func \(\*Task\) [Operation]() +### func \(\*Task\) [Operation]() ```go func (t *Task) Operation() *Op @@ -518,7 +568,7 @@ func (t *Task) Operation() *Op Operation returns the declarative operation, or nil for functional tasks. -### func \(\*Task\) [RequiresChange]() +### func \(\*Task\) [RequiresChange]() ```go func (t *Task) RequiresChange() bool @@ -526,8 +576,17 @@ func (t *Task) RequiresChange() bool RequiresChange returns true if OnlyIfChanged was set. + +### func \(\*Task\) [SetName]() + +```go +func (t *Task) SetName(name string) +``` + +SetName changes the task name. + -### func \(\*Task\) [When]() +### func \(\*Task\) [When]() ```go func (t *Task) When(fn GuardFn) @@ -535,8 +594,17 @@ func (t *Task) When(fn GuardFn) When sets a custom guard function that determines whether this task should execute. + +### func \(\*Task\) [WhenWithReason]() + +```go +func (t *Task) WhenWithReason(fn GuardFn, reason string) +``` + +WhenWithReason sets a guard with a custom skip reason shown when the guard returns false. + -## type [TaskFn]() +## type [TaskFn]() TaskFn is the signature for functional tasks. The client parameter provides access to the OSAPI SDK for making API calls. @@ -547,18 +615,33 @@ type TaskFn func( ) (*Result, error) ``` + +## type [TaskFnWithResults]() + +TaskFnWithResults is like TaskFn but receives completed task results for inter\-task data access. + +```go +type TaskFnWithResults func( + ctx context.Context, + client *osapi.Client, + results Results, +) (*Result, error) +``` + -## type [TaskResult]() +## type [TaskResult]() TaskResult records the full execution details of a task. ```go type TaskResult struct { - Name string - Status Status - Changed bool - Duration time.Duration - Error error + Name string + Status Status + Changed bool + Duration time.Duration + Error error + Data map[string]any + HostResults []HostResult } ``` diff --git a/docs/orchestration/README.md b/docs/orchestration/README.md index 83deb87..2bbe214 100644 --- a/docs/orchestration/README.md +++ b/docs/orchestration/README.md @@ -69,6 +69,66 @@ plan := orchestrator.NewPlan(client, orchestrator.OnError(orchestrator.Continue) task.OnError(orchestrator.Retry(3)) // override for this task ``` +## Result Types + +### Result + +The `Result` struct returned by task functions: + +| Field | Type | Description | +| ------------- | ---------------- | ---------------------------------------------- | +| `Changed` | `bool` | Whether the operation modified state | +| `Data` | `map[string]any` | Operation-specific response data | +| `Status` | `Status` | Terminal status (`changed`, `unchanged`, etc.) | +| `HostResults` | `[]HostResult` | Per-host results for broadcast operations | + +### TaskResult + +The `TaskResult` struct provided to `AfterTask` hooks and in `Report.Tasks`: + +| Field | Type | Description | +| ---------- | ---------------- | ------------------------------------------- | +| `Name` | `string` | Task name | +| `Status` | `Status` | Terminal status | +| `Changed` | `bool` | Whether the operation reported changes | +| `Duration` | `time.Duration` | Execution time | +| `Error` | `error` | Error if task failed; nil on success | +| `Data` | `map[string]any` | Operation response data for post-run access | + +### HostResult + +Per-host data for broadcast operations (targeting `_all` or label selectors): + +| Field | Type | Description | +| ---------- | ---------------- | ---------------------------------- | +| `Hostname` | `string` | Agent hostname | +| `Changed` | `bool` | Whether this host reported changes | +| `Error` | `string` | Error message; empty on success | +| `Data` | `map[string]any` | Host-specific response data | + +## TaskFuncWithResults + +Use `TaskFuncWithResults` when a task needs to read results from prior tasks: + +```go +summarize := plan.TaskFuncWithResults( + "summarize", + func(ctx context.Context, client *osapi.Client, results orchestrator.Results) (*orchestrator.Result, error) { + r := results.Get("get-hostname") + hostname := r.Data["hostname"].(string) + + return &orchestrator.Result{ + Changed: true, + Data: map[string]any{"summary": hostname}, + }, nil + }, +) +summarize.DependsOn(getHostname) +``` + +Unlike `TaskFunc`, the function receives the `Results` map containing completed +dependency outputs. + ## Adding a New Operation When a new operation is added to OSAPI: diff --git a/examples/all/main.go b/examples/all/main.go index ce46d84..9c2fabe 100644 --- a/examples/all/main.go +++ b/examples/all/main.go @@ -20,9 +20,10 @@ // Package main demonstrates every orchestrator feature: hooks for consumer- // controlled logging at every lifecycle point, Op and TaskFunc tasks, -// dependencies, guards, Levels() for DAG inspection, error strategies +// TaskFuncWithResults for inter-task data passing, dependencies, guards +// with Status inspection, Levels() for DAG inspection, error strategies // (plan-level Continue + per-task Retry), parameterized operations, -// result data access, and detailed result reporting. +// result data access, and detailed result reporting with Data. // // This example serves as a reference for building tools like Terraform // or Ansible that consume the SDK. @@ -32,16 +33,18 @@ // check-health // ├── get-hostname ────────────┐ // ├── get-disk │ -// ├── get-memory ├── print-summary (only-if-changed, when) +// ├── get-memory ├── print-summary (TaskFuncWithResults, reads prior data) // ├── get-load [retry:2] ──────┘ // └── run-uptime [params] ─────┘ -// optional-fail [continue] (independent, no deps on summary) +// optional-fail [continue] (independent) +// └── alert-on-failure (When: checks Status == StatusFailed) // // Run with: OSAPI_TOKEN="" go run main.go package main import ( "context" + "encoding/json" "fmt" "log" "os" @@ -254,26 +257,70 @@ func main() { ) optionalFail.OnError(orchestrator.Continue) - // Level 2: summary (depends on all queries, guard + OnlyIfChanged). - // Uses Results.Get() to read data from previous tasks. - summary := plan.TaskFunc( + // Level 2: summary — uses TaskFuncWithResults to read data from prior + // tasks and aggregate it. This is the key inter-task data passing pattern. + summary := plan.TaskFuncWithResults( "print-summary", func( _ context.Context, _ *osapi.Client, + results orchestrator.Results, ) (*orchestrator.Result, error) { fmt.Println("\n --- Fleet Summary ---") - fmt.Println(" All node queries completed.") - return &orchestrator.Result{Changed: false}, nil + // Read hostname from a prior task via Results.Get(). + if r := results.Get("get-hostname"); r != nil { + if h, ok := r.Data["hostname"].(string); ok { + fmt.Printf(" Hostname: %s\n", h) + } + } + + // Read uptime stdout from a prior command task. + if r := results.Get("run-uptime"); r != nil { + if stdout, ok := r.Data["stdout"].(string); ok { + fmt.Printf(" Uptime: %s\n", stdout) + } + } + + // Return aggregated data — available in Report.Tasks[].Data + // after plan execution completes. + return &orchestrator.Result{ + Changed: false, + Data: map[string]any{"completed": true}, + }, nil }, ) summary.DependsOn(getHostname, getDisk, getMemory, getLoad, runUptime) summary.OnlyIfChanged() // skip if no dependency reported changes + + // Guard using Status inspection — only run if hostname succeeded. summary.When(func(results orchestrator.Results) bool { r := results.Get("get-hostname") - return r != nil && r.Data["hostname"] != nil + return r != nil && r.Status == orchestrator.StatusChanged + }) + + // Level 2: alert task — runs only if optional-fail has StatusFailed. + // Demonstrates using Status in a When guard for failure-triggered recovery. + alertOnFailure := plan.TaskFunc( + "alert-on-failure", + func( + _ context.Context, + _ *osapi.Client, + ) (*orchestrator.Result, error) { + fmt.Println("\n [alert] optional-fail task failed — sending alert") + + return &orchestrator.Result{ + Changed: true, + Data: map[string]any{"alerted": true}, + }, nil + }, + ) + alertOnFailure.DependsOn(optionalFail) + alertOnFailure.When(func(results orchestrator.Results) bool { + r := results.Get("optional-fail") + + return r != nil && r.Status == orchestrator.StatusFailed }) // --- Structured DAG access --- @@ -308,6 +355,7 @@ func main() { } // --- Detailed result inspection --- + // TaskResult.Data carries operation response data for post-run access. fmt.Println("\nDetailed results:") @@ -324,5 +372,10 @@ func main() { r.Changed, r.Duration, ) + + if len(r.Data) > 0 { + b, _ := json.MarshalIndent(r.Data, " "+strings.Repeat(" ", 20), " ") + fmt.Printf(" %-20s data=%s\n", "", b) + } } } diff --git a/pkg/orchestrator/plan.go b/pkg/orchestrator/plan.go index 8c92737..2a10233 100644 --- a/pkg/orchestrator/plan.go +++ b/pkg/orchestrator/plan.go @@ -67,6 +67,19 @@ func (p *Plan) TaskFunc( return t } +// TaskFuncWithResults creates a functional task that receives +// completed results from prior tasks, adds it to the plan, and +// returns it. +func (p *Plan) TaskFuncWithResults( + name string, + fn TaskFnWithResults, +) *Task { + t := NewTaskFuncWithResults(name, fn) + p.tasks = append(p.tasks, t) + + return t +} + // Tasks returns all tasks in the plan. func (p *Plan) Tasks() []*Task { return p.tasks @@ -93,7 +106,7 @@ func (p *Plan) Explain() string { for _, t := range level { kind := "op" - if t.fn != nil { + if t.IsFunc() { kind = "fn" } diff --git a/pkg/orchestrator/plan_public_test.go b/pkg/orchestrator/plan_public_test.go index c1410f1..f3e155a 100644 --- a/pkg/orchestrator/plan_public_test.go +++ b/pkg/orchestrator/plan_public_test.go @@ -311,6 +311,60 @@ func (s *PlanPublicTestSuite) TestRunGuard() { } } +func (s *PlanPublicTestSuite) TestRunGuardWithFailedDependency() { + tests := []struct { + name string + guard func(orchestrator.Results) bool + expectRan bool + expectStatus orchestrator.Status + }{ + { + name: "guard runs and returns true when dependency failed", + guard: func(r orchestrator.Results) bool { + res := r.Get("fail") + + return res != nil && res.Status == orchestrator.StatusFailed + }, + expectRan: true, + expectStatus: orchestrator.StatusChanged, + }, + { + name: "guard runs and returns false when dependency failed", + guard: func(r orchestrator.Results) bool { + res := r.Get("fail") + + return res != nil && res.Status == orchestrator.StatusChanged + }, + expectRan: false, + expectStatus: orchestrator.StatusSkipped, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan := orchestrator.NewPlan( + nil, + orchestrator.OnError(orchestrator.Continue), + ) + ran := false + + fail := plan.TaskFunc("fail", failFunc("boom")) + alert := plan.TaskFunc("alert", taskFunc(true, func() { + ran = true + })) + alert.DependsOn(fail) + alert.When(tt.guard) + + report, err := plan.Run(context.Background()) + s.Require().NoError(err) + s.Equal(tt.expectRan, ran) + + sm := statusMap(report) + s.Equal(tt.expectStatus, sm["alert"]) + }) + } +} + func (s *PlanPublicTestSuite) TestRunErrorStrategy() { s.Run("stop all on error", func() { plan := orchestrator.NewPlan(nil) @@ -979,6 +1033,28 @@ func (s *PlanPublicTestSuite) TestRunHooks() { s.Contains(skips[0], "guard returned false") }) + s.Run("skip hook for guard with custom reason", func() { + var events []string + + hooks := allHooks(&events) + plan := orchestrator.NewPlan(nil, orchestrator.WithHooks(hooks)) + + a := plan.TaskFunc("a", taskFunc(false, nil)) + b := plan.TaskFunc("b", taskFunc(true, nil)) + b.DependsOn(a) + b.WhenWithReason( + func(_ orchestrator.Results) bool { return false }, + "host is unreachable", + ) + + _, err := plan.Run(context.Background()) + s.NoError(err) + + skips := filterPrefix(events, "skip-") + s.Len(skips, 1) + s.Contains(skips[0], "host is unreachable") + }) + s.Run("skip hook for only-if-changed", func() { var events []string diff --git a/pkg/orchestrator/result.go b/pkg/orchestrator/result.go index fbb4841..1ad5c7d 100644 --- a/pkg/orchestrator/result.go +++ b/pkg/orchestrator/result.go @@ -23,19 +23,32 @@ const ( StatusFailed Status = "failed" ) +// HostResult represents a single host's response within a broadcast +// operation. +type HostResult struct { + Hostname string + Changed bool + Error string + Data map[string]any +} + // Result is the outcome of a single task execution. type Result struct { - Changed bool - Data map[string]any + Changed bool + Data map[string]any + Status Status + HostResults []HostResult } // TaskResult records the full execution details of a task. type TaskResult struct { - Name string - Status Status - Changed bool - Duration time.Duration - Error error + Name string + Status Status + Changed bool + Duration time.Duration + Error error + Data map[string]any + HostResults []HostResult } // Results is a map of task name to Result, used for conditional logic. diff --git a/pkg/orchestrator/result_public_test.go b/pkg/orchestrator/result_public_test.go index 59ba13f..bc3052c 100644 --- a/pkg/orchestrator/result_public_test.go +++ b/pkg/orchestrator/result_public_test.go @@ -76,6 +76,142 @@ func (s *ResultPublicTestSuite) TestReportSummary() { } } +func (s *ResultPublicTestSuite) TestResultStatusField() { + tests := []struct { + name string + result *orchestrator.Result + wantStatus orchestrator.Status + wantChange bool + }{ + { + name: "changed result carries status", + result: &orchestrator.Result{ + Changed: true, + Data: map[string]any{"hostname": "web-01"}, + Status: orchestrator.StatusChanged, + }, + wantStatus: orchestrator.StatusChanged, + wantChange: true, + }, + { + name: "unchanged result carries status", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusUnchanged, + }, + wantStatus: orchestrator.StatusUnchanged, + wantChange: false, + }, + { + name: "failed result carries status", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusFailed, + }, + wantStatus: orchestrator.StatusFailed, + wantChange: false, + }, + { + name: "skipped result carries status", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusSkipped, + }, + wantStatus: orchestrator.StatusSkipped, + wantChange: false, + }, + { + name: "zero value has empty status", + result: &orchestrator.Result{}, + wantStatus: "", + wantChange: false, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + s.Equal(tt.wantStatus, tt.result.Status) + s.Equal(tt.wantChange, tt.result.Changed) + }) + } +} + +func (s *ResultPublicTestSuite) TestResultHostResults() { + tests := []struct { + name string + result *orchestrator.Result + wantLen int + validateFn func(hrs []orchestrator.HostResult) + }{ + { + name: "result with multiple host results", + result: &orchestrator.Result{ + Changed: true, + Status: orchestrator.StatusChanged, + HostResults: []orchestrator.HostResult{ + { + Hostname: "web-01", + Changed: true, + Data: map[string]any{"stdout": "ok"}, + }, + { + Hostname: "web-02", + Changed: false, + Error: "connection timeout", + }, + }, + }, + wantLen: 2, + validateFn: func(hrs []orchestrator.HostResult) { + s.Equal("web-01", hrs[0].Hostname) + s.True(hrs[0].Changed) + s.Equal("web-02", hrs[1].Hostname) + s.Equal("connection timeout", hrs[1].Error) + }, + }, + { + name: "result with no host results", + result: &orchestrator.Result{ + Changed: false, + Status: orchestrator.StatusUnchanged, + }, + wantLen: 0, + }, + { + name: "host result with data map", + result: &orchestrator.Result{ + Changed: true, + Status: orchestrator.StatusChanged, + HostResults: []orchestrator.HostResult{ + { + Hostname: "db-01", + Changed: true, + Data: map[string]any{ + "stdout": "migrated", + "exit_code": float64(0), + }, + }, + }, + }, + wantLen: 1, + validateFn: func(hrs []orchestrator.HostResult) { + s.Equal("db-01", hrs[0].Hostname) + s.Equal("migrated", hrs[0].Data["stdout"]) + }, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + s.Len(tt.result.HostResults, tt.wantLen) + + if tt.validateFn != nil { + tt.validateFn(tt.result.HostResults) + } + }) + } +} + func (s *ResultPublicTestSuite) TestResultsGet() { tests := []struct { name string diff --git a/pkg/orchestrator/runner.go b/pkg/orchestrator/runner.go index afce9e5..f030851 100644 --- a/pkg/orchestrator/runner.go +++ b/pkg/orchestrator/runner.go @@ -232,25 +232,31 @@ func (r *runner) runTask( ) TaskResult { start := time.Now() - // Skip if any dependency failed (Continue strategy). - r.mu.Lock() - for _, dep := range t.deps { - if r.failed[dep.name] { - r.failed[t.name] = true - r.mu.Unlock() + // Skip if any dependency failed — unless the task has a When guard, + // which may intentionally inspect failure status (e.g. alert-on-failure). + if t.guard == nil { + r.mu.Lock() - tr := TaskResult{ - Name: t.name, - Status: StatusSkipped, - Duration: time.Since(start), - } - r.callOnSkip(t, "dependency failed") - r.callAfterTask(t, tr) + for _, dep := range t.deps { + if r.failed[dep.name] { + r.failed[t.name] = true + r.results[t.name] = &Result{Status: StatusSkipped} + r.mu.Unlock() + + tr := TaskResult{ + Name: t.name, + Status: StatusSkipped, + Duration: time.Since(start), + } + r.callOnSkip(t, "dependency failed") + r.callAfterTask(t, tr) - return tr + return tr + } } + + r.mu.Unlock() } - r.mu.Unlock() if t.requiresChange { anyChanged := false @@ -268,6 +274,10 @@ func (r *runner) runTask( r.mu.Unlock() if !anyChanged { + r.mu.Lock() + r.results[t.name] = &Result{Status: StatusSkipped} + r.mu.Unlock() + tr := TaskResult{ Name: t.name, Status: StatusSkipped, @@ -287,13 +297,21 @@ func (r *runner) runTask( r.mu.Unlock() if !shouldRun { + r.mu.Lock() + r.results[t.name] = &Result{Status: StatusSkipped} + r.mu.Unlock() + tr := TaskResult{ Name: t.name, Status: StatusSkipped, Duration: time.Since(start), } - r.callOnSkip(t, "guard returned false") + reason := "guard returned false" + if t.guardReason != "" { + reason = t.guardReason + } + r.callOnSkip(t, reason) r.callAfterTask(t, tr) return tr @@ -315,7 +333,13 @@ func (r *runner) runTask( client := r.plan.client for attempt := range maxAttempts { - if t.fn != nil { + if t.fnr != nil { + r.mu.Lock() + results := r.results + r.mu.Unlock() + + result, err = t.fnr(ctx, client, results) + } else if t.fn != nil { result, err = t.fn(ctx, client) } else { result, err = r.executeOp(ctx, t.op) @@ -335,6 +359,7 @@ func (r *runner) runTask( if err != nil { r.mu.Lock() r.failed[t.name] = true + r.results[t.name] = &Result{Status: StatusFailed} r.mu.Unlock() tr := TaskResult{ @@ -349,20 +374,24 @@ func (r *runner) runTask( return tr } - r.mu.Lock() - r.results[t.name] = result - r.mu.Unlock() - status := StatusUnchanged if result.Changed { status = StatusChanged } + result.Status = status + + r.mu.Lock() + r.results[t.name] = result + r.mu.Unlock() + tr := TaskResult{ - Name: t.name, - Status: status, - Changed: result.Changed, - Duration: elapsed, + Name: t.name, + Status: status, + Changed: result.Changed, + Duration: elapsed, + Data: result.Data, + HostResults: result.HostResults, } r.callAfterTask(t, tr) @@ -373,6 +402,59 @@ func (r *runner) runTask( // DefaultPollInterval is the interval between job status polls. var DefaultPollInterval = 500 * time.Millisecond +// isCommandOp returns true for command execution operations. +func isCommandOp( + operation string, +) bool { + return operation == "command.exec.execute" || + operation == "command.shell.execute" +} + +// extractHostResults parses per-agent results from a broadcast +// collection response. +func extractHostResults( + data map[string]any, +) []HostResult { + resultsRaw, ok := data["results"] + if !ok { + return nil + } + + items, ok := resultsRaw.([]any) + if !ok { + return nil + } + + hostResults := make([]HostResult, 0, len(items)) + + for _, item := range items { + m, ok := item.(map[string]any) + if !ok { + continue + } + + hr := HostResult{ + Data: m, + } + + if h, ok := m["hostname"].(string); ok { + hr.Hostname = h + } + + if c, ok := m["changed"].(bool); ok { + hr.Changed = c + } + + if e, ok := m["error"].(string); ok { + hr.Error = e + } + + hostResults = append(hostResults, hr) + } + + return hostResults +} + // executeOp submits a declarative Op as a job via the SDK and polls // for completion. func (r *runner) executeOp( @@ -415,7 +497,29 @@ func (r *runner) executeOp( jobID := createResp.JSON201.JobId.String() - return r.pollJob(ctx, jobID) + result, err := r.pollJob(ctx, jobID) + if err != nil { + return nil, err + } + + // Extract per-host results for broadcast targets. + if IsBroadcastTarget(op.Target) { + result.HostResults = extractHostResults(result.Data) + } + + // Non-zero exit for command operations = failure. + if isCommandOp(op.Operation) { + if exitCode, ok := result.Data["exit_code"].(float64); ok && exitCode != 0 { + result.Status = StatusFailed + + return result, fmt.Errorf( + "command exited with code %d", + int(exitCode), + ) + } + } + + return result, nil } // pollJob polls a job until it reaches a terminal state. @@ -466,6 +570,7 @@ func (r *runner) pollJob( } changed, _ := data["changed"].(bool) + delete(data, "changed") return &Result{Changed: changed, Data: data}, nil case "failed": diff --git a/pkg/orchestrator/runner_broadcast_test.go b/pkg/orchestrator/runner_broadcast_test.go new file mode 100644 index 0000000..5be2496 --- /dev/null +++ b/pkg/orchestrator/runner_broadcast_test.go @@ -0,0 +1,162 @@ +package orchestrator + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type RunnerBroadcastTestSuite struct { + suite.Suite +} + +func TestRunnerBroadcastTestSuite(t *testing.T) { + suite.Run(t, new(RunnerBroadcastTestSuite)) +} + +func (s *RunnerBroadcastTestSuite) TestIsBroadcastTarget() { + tests := []struct { + name string + target string + want bool + }{ + { + name: "all agents is broadcast", + target: "_all", + want: true, + }, + { + name: "label selector is broadcast", + target: "role:web", + want: true, + }, + { + name: "single agent is not broadcast", + target: "agent-001", + want: false, + }, + { + name: "empty string is not broadcast", + target: "", + want: false, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + got := IsBroadcastTarget(tt.target) + s.Equal(tt.want, got) + }) + } +} + +func (s *RunnerBroadcastTestSuite) TestExtractHostResults() { + tests := []struct { + name string + data map[string]any + want []HostResult + }{ + { + name: "extracts host results from results array", + data: map[string]any{ + "results": []any{ + map[string]any{ + "hostname": "host-1", + "changed": true, + "data": "something", + }, + map[string]any{ + "hostname": "host-2", + "changed": false, + "error": "connection refused", + }, + }, + }, + want: []HostResult{ + { + Hostname: "host-1", + Changed: true, + Data: map[string]any{ + "hostname": "host-1", + "changed": true, + "data": "something", + }, + }, + { + Hostname: "host-2", + Changed: false, + Error: "connection refused", + Data: map[string]any{ + "hostname": "host-2", + "changed": false, + "error": "connection refused", + }, + }, + }, + }, + { + name: "no results key returns nil", + data: map[string]any{ + "other": "value", + }, + want: nil, + }, + { + name: "results not an array returns nil", + data: map[string]any{ + "results": "not-an-array", + }, + want: nil, + }, + { + name: "empty results array returns empty slice", + data: map[string]any{ + "results": []any{}, + }, + want: []HostResult{}, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + got := extractHostResults(tt.data) + s.Equal(tt.want, got) + }) + } +} + +func (s *RunnerBroadcastTestSuite) TestIsCommandOp() { + tests := []struct { + name string + operation string + want bool + }{ + { + name: "command.exec.execute is a command op", + operation: "command.exec.execute", + want: true, + }, + { + name: "command.shell.execute is a command op", + operation: "command.shell.execute", + want: true, + }, + { + name: "node.hostname.get is not a command op", + operation: "node.hostname.get", + want: false, + }, + { + name: "empty string is not a command op", + operation: "", + want: false, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + got := isCommandOp(tt.operation) + s.Equal(tt.want, got) + }) + } +} diff --git a/pkg/orchestrator/runner_test.go b/pkg/orchestrator/runner_test.go index ccf7549..d202158 100644 --- a/pkg/orchestrator/runner_test.go +++ b/pkg/orchestrator/runner_test.go @@ -1,9 +1,13 @@ package orchestrator import ( + "context" + "fmt" "testing" "github.com/stretchr/testify/suite" + + "github.com/osapi-io/osapi-sdk/pkg/osapi" ) type RunnerTestSuite struct { @@ -68,3 +72,351 @@ func (s *RunnerTestSuite) TestLevelize() { }) } } + +func (s *RunnerTestSuite) TestRunTaskStoresResultForAllPaths() { + tests := []struct { + name string + setup func() *Plan + taskName string + wantStatus Status + }{ + { + name: "OnlyIfChanged skip stores StatusSkipped", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + // dep returns Changed=false, so child with + // OnlyIfChanged should be skipped. + dep := plan.TaskFunc("dep", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: false}, nil + }) + + child := plan.TaskFunc("child", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + child.DependsOn(dep) + child.OnlyIfChanged() + + return plan + }, + taskName: "child", + wantStatus: StatusSkipped, + }, + { + name: "failed task stores StatusFailed", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("failing", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return nil, fmt.Errorf("deliberate error") + }) + + return plan + }, + taskName: "failing", + wantStatus: StatusFailed, + }, + { + name: "guard-false skip stores StatusSkipped", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("guarded", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }).When(func(_ Results) bool { + return false + }) + + return plan + }, + taskName: "guarded", + wantStatus: StatusSkipped, + }, + { + name: "dependency-failed skip stores StatusSkipped", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + dep := plan.TaskFunc("dep", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return nil, fmt.Errorf("deliberate error") + }) + + child := plan.TaskFunc("child", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + child.DependsOn(dep) + + return plan + }, + taskName: "child", + wantStatus: StatusSkipped, + }, + { + name: "successful changed task stores StatusChanged", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("ok", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + + return plan + }, + taskName: "ok", + wantStatus: StatusChanged, + }, + { + name: "successful unchanged task stores StatusUnchanged", + setup: func() *Plan { + plan := NewPlan(nil, OnError(Continue)) + + plan.TaskFunc("ok", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: false}, nil + }) + + return plan + }, + taskName: "ok", + wantStatus: StatusUnchanged, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan := tt.setup() + runner := newRunner(plan) + + _, err := runner.run(context.Background()) + // Some plans produce errors (e.g. StopAll with a + // failing task); we don't assert on err here because + // we only care about the results map. + _ = err + + result := runner.results.Get(tt.taskName) + s.NotNil( + result, + "results map should contain entry for %q", + tt.taskName, + ) + s.Equal( + tt.wantStatus, + result.Status, + "result status for %q", + tt.taskName, + ) + }) + } +} + +func (s *RunnerTestSuite) TestDownstreamGuardInspectsSkippedStatus() { + tests := []struct { + name string + setup func() (*Plan, *bool) + observerName string + wantGuardCalled bool + wantTaskStatus Status + }{ + { + name: "guard can see guard-skipped task status", + setup: func() (*Plan, *bool) { + plan := NewPlan(nil, OnError(Continue)) + guardCalled := false + + // This task is skipped because its guard + // returns false. + guarded := plan.TaskFunc("guarded", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: true}, nil + }) + guarded.When(func(_ Results) bool { + return false + }) + + // Observer depends on guarded so it runs in a + // later level. Its guard inspects the skipped + // task's status. + observer := plan.TaskFunc("observer", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{Changed: false}, nil + }) + observer.DependsOn(guarded) + observer.When(func(r Results) bool { + guardCalled = true + res := r.Get("guarded") + + return res != nil && res.Status == StatusSkipped + }) + + return plan, &guardCalled + }, + observerName: "observer", + wantGuardCalled: true, + // Observer runs because the guard sees the skipped + // status and returns true. + wantTaskStatus: StatusUnchanged, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan, guardCalled := tt.setup() + runner := newRunner(plan) + + _, err := runner.run(context.Background()) + _ = err + + s.Equal( + tt.wantGuardCalled, + *guardCalled, + "guard should have been called", + ) + + result := runner.results.Get(tt.observerName) + s.NotNil( + result, + "observer should have a result entry", + ) + s.Equal( + tt.wantTaskStatus, + result.Status, + "observer task status", + ) + }) + } +} + +func (s *RunnerTestSuite) TestTaskFuncWithResultsReceivesResults() { + tests := []struct { + name string + setup func() (*Plan, *string) + wantCapture string + }{ + { + name: "receives upstream result data", + setup: func() (*Plan, *string) { + plan := NewPlan(nil, OnError(StopAll)) + var captured string + + a := plan.TaskFunc("a", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{ + Changed: true, + Data: map[string]any{"hostname": "web-01"}, + }, nil + }) + + b := plan.TaskFuncWithResults("b", func( + _ context.Context, + _ *osapi.Client, + results Results, + ) (*Result, error) { + r := results.Get("a") + if r != nil { + if h, ok := r.Data["hostname"].(string); ok { + captured = h + } + } + + return &Result{Changed: false}, nil + }) + b.DependsOn(a) + + return plan, &captured + }, + wantCapture: "web-01", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan, captured := tt.setup() + + _, err := plan.Run(context.Background()) + + s.Require().NoError(err) + s.Equal(tt.wantCapture, *captured) + }) + } +} + +func (s *RunnerTestSuite) TestTaskResultCarriesData() { + tests := []struct { + name string + setup func() *Plan + taskName string + wantKey string + wantVal any + }{ + { + name: "success result includes data", + setup: func() *Plan { + plan := NewPlan(nil, OnError(StopAll)) + + plan.TaskFunc("a", func( + _ context.Context, + _ *osapi.Client, + ) (*Result, error) { + return &Result{ + Changed: true, + Data: map[string]any{"stdout": "hello"}, + }, nil + }) + + return plan + }, + taskName: "a", + wantKey: "stdout", + wantVal: "hello", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + plan := tt.setup() + + report, err := plan.Run(context.Background()) + + s.Require().NoError(err) + + var found bool + for _, tr := range report.Tasks { + if tr.Name == tt.taskName { + found = true + s.Equal(tt.wantVal, tr.Data[tt.wantKey]) + } + } + + s.True(found, "task %q should be in report", tt.taskName) + }) + } +} diff --git a/pkg/orchestrator/task.go b/pkg/orchestrator/task.go index cc818e1..d593193 100644 --- a/pkg/orchestrator/task.go +++ b/pkg/orchestrator/task.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "strings" "github.com/osapi-io/osapi-sdk/pkg/osapi" ) @@ -20,6 +21,14 @@ type TaskFn func( client *osapi.Client, ) (*Result, error) +// TaskFnWithResults is like TaskFn but receives completed task results +// for inter-task data access. +type TaskFnWithResults func( + ctx context.Context, + client *osapi.Client, + results Results, +) (*Result, error) + // GuardFn is a predicate that determines if a task should run. type GuardFn func(results Results) bool @@ -28,8 +37,10 @@ type Task struct { name string op *Op fn TaskFn + fnr TaskFnWithResults deps []*Task guard GuardFn + guardReason string requiresChange bool errorStrategy *ErrorStrategy } @@ -56,14 +67,33 @@ func NewTaskFunc( } } +// NewTaskFuncWithResults creates a functional task that receives +// completed results from prior tasks. +func NewTaskFuncWithResults( + name string, + fn TaskFnWithResults, +) *Task { + return &Task{ + name: name, + fnr: fn, + } +} + // Name returns the task name. func (t *Task) Name() string { return t.name } +// SetName changes the task name. +func (t *Task) SetName( + name string, +) { + t.name = name +} + // IsFunc returns true if this is a functional task. func (t *Task) IsFunc() bool { - return t.fn != nil + return t.fn != nil || t.fnr != nil } // Operation returns the declarative operation, or nil for functional @@ -111,6 +141,16 @@ func (t *Task) When( t.guard = fn } +// WhenWithReason sets a guard with a custom skip reason shown when +// the guard returns false. +func (t *Task) WhenWithReason( + fn GuardFn, + reason string, +) { + t.guard = fn + t.guardReason = reason +} + // Guard returns the guard function, or nil if none is set. func (t *Task) Guard() GuardFn { return t.guard @@ -128,3 +168,11 @@ func (t *Task) OnError( func (t *Task) ErrorStrategy() *ErrorStrategy { return t.errorStrategy } + +// IsBroadcastTarget returns true if the target addresses multiple +// agents (broadcast or label selector). +func IsBroadcastTarget( + target string, +) bool { + return target == "_all" || strings.Contains(target, ":") +} diff --git a/pkg/orchestrator/task_public_test.go b/pkg/orchestrator/task_public_test.go index 7319d51..01327c6 100644 --- a/pkg/orchestrator/task_public_test.go +++ b/pkg/orchestrator/task_public_test.go @@ -92,6 +92,68 @@ func (s *TaskPublicTestSuite) TestTaskFunc() { s.True(task.IsFunc()) } +func (s *TaskPublicTestSuite) TestSetName() { + tests := []struct { + name string + initial string + renamed string + wantName string + }{ + { + name: "changes task name", + initial: "original", + renamed: "renamed", + wantName: "renamed", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + task := orchestrator.NewTask( + tt.initial, + &orchestrator.Op{Operation: "noop"}, + ) + task.SetName(tt.renamed) + s.Equal(tt.wantName, task.Name()) + }) + } +} + +func (s *TaskPublicTestSuite) TestWhenWithReason() { + tests := []struct { + name string + guardResult bool + reason string + }{ + { + name: "sets guard and reason when guard returns false", + guardResult: false, + reason: "host is unreachable", + }, + { + name: "sets guard and reason when guard returns true", + guardResult: true, + reason: "custom reason", + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + task := orchestrator.NewTask( + "t", + &orchestrator.Op{Operation: "noop"}, + ) + task.WhenWithReason(func(_ orchestrator.Results) bool { + return tt.guardResult + }, tt.reason) + + guard := task.Guard() + s.NotNil(guard) + s.Equal(tt.guardResult, guard(orchestrator.Results{})) + }) + } +} + func (s *TaskPublicTestSuite) TestOnErrorOverride() { task := orchestrator.NewTask("t", &orchestrator.Op{Operation: "noop"}) task.OnError(orchestrator.Continue)