From 419973aa84c62d64d9ce506839315833aa566573 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 15:23:19 +0530 Subject: [PATCH 1/6] add new health check endpoint with plugin interface --- internal/pkg/heimdall/health.go | 151 ++++++++++++++++++++++++++++++ internal/pkg/heimdall/heimdall.go | 1 + pkg/object/command/command.go | 1 + pkg/plugin/plugin.go | 4 + 4 files changed, 157 insertions(+) create mode 100644 internal/pkg/heimdall/health.go diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go new file mode 100644 index 0000000..83d31c1 --- /dev/null +++ b/internal/pkg/heimdall/health.go @@ -0,0 +1,151 @@ +package heimdall + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/object/command" + "github.com/patterninc/heimdall/pkg/object/job" + "github.com/patterninc/heimdall/pkg/object/status" + "github.com/patterninc/heimdall/pkg/plugin" +) + +const ( + healthCheckTimeout = 30 * time.Second + healthCheckUser = `heimdall-health` + healthStatusOK = `ok` + healthStatusError = `error` +) + +type healthCheckResult struct { + CommandID string `json:"command_id"` + ClusterID string `json:"cluster_id"` + Status string `json:"status"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` +} + +type healthChecksResponse struct { + Healthy bool `json:"healthy"` + Checks []healthCheckResult `json:"checks"` +} + +type healthPair struct { + cmd *command.Command + cluster *cluster.Cluster + handler plugin.Handler +} + +func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) + defer cancel() + + results := h.runHealthChecks(ctx, h.resolveHealthPairs()) + + healthy := true + for _, res := range results { + if res.Status == healthStatusError { + healthy = false + break + } + } + + resp := healthChecksResponse{Healthy: healthy, Checks: results} + data, _ := json.Marshal(resp) + + w.Header().Set(contentTypeKey, contentTypeJSON) + if healthy { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + w.Write(data) +} + +func (h *Heimdall) resolveHealthPairs() []*healthPair { + var pairs []*healthPair + for _, cmd := range h.Commands { + if !cmd.HealthCheck || cmd.Status != status.Active { + continue + } + for _, cl := range h.Clusters { + if cl.Status != status.Active { + continue + } + if cl.Tags.Contains(cmd.ClusterTags) { + pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) + } + } + } + return pairs +} + +func (h *Heimdall) runHealthChecks(ctx context.Context, pairs []*healthPair) []healthCheckResult { + results := make([]healthCheckResult, len(pairs)) + var wg sync.WaitGroup + for i, pair := range pairs { + wg.Add(1) + go func(i int, pair *healthPair) { + defer wg.Done() + results[i] = h.checkPair(ctx, pair) + }(i, pair) + } + wg.Wait() + return results +} + +func (h *Heimdall) checkPair(ctx context.Context, pair *healthPair) healthCheckResult { + start := time.Now() + res := healthCheckResult{CommandID: pair.cmd.ID, ClusterID: pair.cluster.ID} + + var err error + if hc, ok := pair.handler.(plugin.HealthChecker); ok { + err = hc.HealthCheck(ctx, pair.cluster) + } else { + err = h.pluginProbe(ctx, pair.cluster, pair.handler) + } + + res.LatencyMs = time.Since(start).Milliseconds() + if err != nil { + res.Status = healthStatusError + res.Error = err.Error() + } else { + res.Status = healthStatusOK + } + return res +} + +func (h *Heimdall) pluginProbe(ctx context.Context, cl *cluster.Cluster, handler plugin.Handler) error { + j := &job.Job{} + j.ID = uuid.NewString() + j.User = healthCheckUser + + tmpDir, err := os.MkdirTemp("", "heimdall-health-*") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + runtime := &plugin.Runtime{ + WorkingDirectory: tmpDir, + ResultDirectory: tmpDir + separator + "result", + Version: h.Version, + UserAgent: fmt.Sprintf(formatUserAgent, h.Version), + } + + if err := runtime.Set(); err != nil { + return err + } + defer runtime.Stdout.Close() + defer runtime.Stderr.Close() + + return handler.Execute(ctx, runtime, j, cl) +} diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 10ac68a..36c303b 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,6 +172,7 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... + apiRouter.Methods(methodGET).Path(`/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) diff --git a/pkg/object/command/command.go b/pkg/object/command/command.go index d31fadd..aef7de8 100644 --- a/pkg/object/command/command.go +++ b/pkg/object/command/command.go @@ -20,6 +20,7 @@ type Command struct { Plugin string `yaml:"plugin,omitempty" json:"plugin,omitempty"` IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` ClusterTags *set.Set[string] `yaml:"cluster_tags,omitempty" json:"cluster_tags,omitempty"` + HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"` Handler plugin.Handler `yaml:"-" json:"-"` } diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 2f72af2..ef7c691 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -11,3 +11,7 @@ type Handler interface { Execute(context.Context, *Runtime, *job.Job, *cluster.Cluster) error Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error } + +type HealthChecker interface { + HealthCheck(ctx context.Context, c *cluster.Cluster) error +} From b93cbd64e89b6410051573465091e63d7014018a Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 15:26:59 +0530 Subject: [PATCH 2/6] change endpoint --- internal/pkg/heimdall/heimdall.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 36c303b..be23c38 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,7 +172,7 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... - apiRouter.Methods(methodGET).Path(`/health`).HandlerFunc(h.healthHandler) + apiRouter.Methods(methodGET).Path(`/command/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) From 2132855a303733ddf7b9dd391a6a9fba39fcc035 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 16:56:02 +0530 Subject: [PATCH 3/6] formatting and in memory changes --- configs/local.yaml | 1 + internal/pkg/heimdall/command_dal.go | 5 ++ internal/pkg/heimdall/heimdall.go | 2 +- internal/pkg/janitor/janitor.go | 6 +-- .../object/command/clickhouse/column_types.go | 49 +++++++++---------- .../pkg/object/command/postgres/postgres.go | 4 +- 6 files changed, 36 insertions(+), 31 deletions(-) diff --git a/configs/local.yaml b/configs/local.yaml index a61c8da..f2a49be 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -28,6 +28,7 @@ commands: version: 0.0.1 store_result_sync: false description: Test ping command + health_check: true tags: - type:ping cluster_tags: diff --git a/internal/pkg/heimdall/command_dal.go b/internal/pkg/heimdall/command_dal.go index 2274801..6c5a7be 100644 --- a/internal/pkg/heimdall/command_dal.go +++ b/internal/pkg/heimdall/command_dal.go @@ -269,6 +269,11 @@ func (h *Heimdall) updateCommandStatus(ctx context.Context, c *command.Command) return nil, ErrUnknownCommandID } + // keep in-memory state in sync with the DB + if cmd, ok := h.Commands[c.ID]; ok { + cmd.Status = c.Status + } + updateCommandStatusMethod.CountSuccess() return h.getCommandStatus(ctx, c) diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index be23c38..f5adcf1 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,7 +172,7 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... - apiRouter.Methods(methodGET).Path(`/command/health`).HandlerFunc(h.healthHandler) + apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index a269888..543deef 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -21,10 +21,10 @@ type Janitor struct { Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"` - CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` + CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` db *database.Database - commandHandlers map[string]plugin.Handler - clusters cluster.Clusters + commandHandlers map[string]plugin.Handler + clusters cluster.Clusters } func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.Handler, clusters cluster.Clusters) error { diff --git a/internal/pkg/object/command/clickhouse/column_types.go b/internal/pkg/object/command/clickhouse/column_types.go index 79b05a9..aafd5c7 100644 --- a/internal/pkg/object/command/clickhouse/column_types.go +++ b/internal/pkg/object/command/clickhouse/column_types.go @@ -148,32 +148,31 @@ func handleDecimal(nullable bool) (any, func() any) { } } func handleTuple(nullable bool) (any, func() any) { - if nullable { - var p *any - return &p, func() any { - if p == nil || *p == nil { - return nil - } - return *p - } - } - var v any - return &v, func() any { return v } + if nullable { + var p *any + return &p, func() any { + if p == nil || *p == nil { + return nil + } + return *p + } + } + var v any + return &v, func() any { return v } } - func handleArray(nullable bool) (any, func() any) { - if nullable { - var p *any - return &p, func() any { - if p == nil || *p == nil { - return nil - } - return *p - } - } - var v any - return &v, func() any { return v } + if nullable { + var p *any + return &p, func() any { + if p == nil || *p == nil { + return nil + } + return *p + } + } + var v any + return &v, func() any { return v } } func handleDefault(nullable bool) (any, func() any) { @@ -207,8 +206,8 @@ func unwrapCHType(t string) (base string, nullable bool) { return "Array", nullable } if strings.HasPrefix(s, "Tuple(") { - return "Tuple", nullable - } + return "Tuple", nullable + } // Decimal(N,S) normalize to "Decimal" if isDecimal(s) { diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go index 56dbd87..034a13c 100644 --- a/internal/pkg/object/command/postgres/postgres.go +++ b/internal/pkg/object/command/postgres/postgres.go @@ -156,7 +156,7 @@ func splitAndTrimQueries(query string) []string { return queries } -func (p *postgresCommandContext)Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error{ +func (p *postgresCommandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // implement me return nil -} \ No newline at end of file +} From cb3438cd7cabf622320354e18c75560b444ecb36 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 17:07:47 +0530 Subject: [PATCH 4/6] remove in memory changes and use DB for status --- internal/pkg/heimdall/command_dal.go | 5 ----- internal/pkg/heimdall/health.go | 23 ++++++++++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/internal/pkg/heimdall/command_dal.go b/internal/pkg/heimdall/command_dal.go index 6c5a7be..2274801 100644 --- a/internal/pkg/heimdall/command_dal.go +++ b/internal/pkg/heimdall/command_dal.go @@ -269,11 +269,6 @@ func (h *Heimdall) updateCommandStatus(ctx context.Context, c *command.Command) return nil, ErrUnknownCommandID } - // keep in-memory state in sync with the DB - if cmd, ok := h.Commands[c.ID]; ok { - cmd.Status = c.Status - } - updateCommandStatusMethod.CountSuccess() return h.getCommandStatus(ctx, c) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index 83d31c1..ca888b5 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -48,7 +48,7 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) defer cancel() - results := h.runHealthChecks(ctx, h.resolveHealthPairs()) + results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx)) healthy := true for _, res := range results { @@ -70,18 +70,27 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { w.Write(data) } -func (h *Heimdall) resolveHealthPairs() []*healthPair { +func (h *Heimdall) resolveHealthPairs(ctx context.Context) []*healthPair { var pairs []*healthPair for _, cmd := range h.Commands { - if !cmd.HealthCheck || cmd.Status != status.Active { + if !cmd.HealthCheck { continue } - for _, cl := range h.Clusters { - if cl.Status != status.Active { + // check DB for command status to avoid unnecessary health checks for inactive commands + dbCmd, err := h.getCommandStatus(ctx, cmd) + if err != nil { + continue + } + if dbCmd.(*command.Command).Status != status.Active { + continue + } + // find active clusters matching command's cluster tags + for _, cluster := range h.Clusters { + if cluster.Status != status.Active { continue } - if cl.Tags.Contains(cmd.ClusterTags) { - pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) + if cluster.Tags.Contains(cmd.ClusterTags) { + pairs = append(pairs, &healthPair{cmd, cluster, h.commandHandlers[cmd.ID]}) } } } From a550c567401606a1ca50071456fcdefd8f6250ef Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 18:33:10 +0530 Subject: [PATCH 5/6] add per command endpoint --- internal/pkg/heimdall/health.go | 75 +++++++++++++++++++++++-------- internal/pkg/heimdall/heimdall.go | 3 +- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index ca888b5..c9372fb 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/gorilla/mux" "github.com/patterninc/heimdall/pkg/object/cluster" "github.com/patterninc/heimdall/pkg/object/command" @@ -47,8 +48,29 @@ type healthPair struct { func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) defer cancel() + h.writeHealthResponse(w, ctx, nil) +} - results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx)) +func (h *Heimdall) commandHealthHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) + defer cancel() + + commandID := mux.Vars(r)[idKey] + cmd, found := h.Commands[commandID] + if !found { + writeAPIError(w, ErrUnknownCommandID, nil) + return + } + if !cmd.HealthCheck { + writeAPIError(w, fmt.Errorf("command %s has not opted into health checks", commandID), nil) + return + } + + h.writeHealthResponse(w, ctx, &commandID) +} + +func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Context, commandID *string) { + results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx, commandID)) healthy := true for _, res := range results { @@ -70,28 +92,43 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { w.Write(data) } -func (h *Heimdall) resolveHealthPairs(ctx context.Context) []*healthPair { +func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) []*healthPair { var pairs []*healthPair - for _, cmd := range h.Commands { - if !cmd.HealthCheck { - continue - } - // check DB for command status to avoid unnecessary health checks for inactive commands - dbCmd, err := h.getCommandStatus(ctx, cmd) - if err != nil { - continue + if commandID != nil { + cmd, found := h.Commands[*commandID] + if !found { + return pairs } - if dbCmd.(*command.Command).Status != status.Active { + return h.resolveHealthPairsForCommand(ctx, cmd) + } + for _, cmd := range h.Commands { + pairs = append(pairs, h.resolveHealthPairsForCommand(ctx, cmd)...) + } + return pairs +} + +func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) []*healthPair { + var pairs []*healthPair + if cmd == nil { + return pairs + } + if !cmd.HealthCheck { + return pairs + } + // check DB for command status to avoid unnecessary health checks for inactive commands + dbCmd, err := h.getCommandStatus(ctx, cmd) + if err != nil { + return pairs + } + if dbCmd.(*command.Command).Status != status.Active { + return pairs + } + for _, cl := range h.Clusters { + if cl.Status != status.Active { continue } - // find active clusters matching command's cluster tags - for _, cluster := range h.Clusters { - if cluster.Status != status.Active { - continue - } - if cluster.Tags.Contains(cmd.ClusterTags) { - pairs = append(pairs, &healthPair{cmd, cluster, h.commandHandlers[cmd.ID]}) - } + if cl.Tags.Contains(cmd.ClusterTags) { + pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) } } return pairs diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index f5adcf1..e158960 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,7 +172,6 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... - apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) @@ -183,6 +182,8 @@ func (h *Heimdall) Start() error { apiRouter.Methods(methodGET).PathPrefix(`/command/statuses`).HandlerFunc(payloadHandler(h.getCommandStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.getCommandStatus)) apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.updateCommandStatus)) + apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler) + apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/health`).HandlerFunc(h.commandHealthHandler) apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.submitCommand)) apiRouter.Methods(methodGET).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.getCommand)) apiRouter.Methods(methodGET).PathPrefix(`/commands`).HandlerFunc(payloadHandler(h.getCommands)) From c1afd8635ff9c1652c8b7dd056c5795f5185b1fa Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 20:08:12 +0530 Subject: [PATCH 6/6] add semaphore --- internal/pkg/heimdall/health.go | 43 +++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index c9372fb..b3f37e8 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -20,10 +20,11 @@ import ( ) const ( - healthCheckTimeout = 30 * time.Second - healthCheckUser = `heimdall-health` - healthStatusOK = `ok` - healthStatusError = `error` + healthCheckTimeout = 30 * time.Second + healthCheckUser = `heimdall-health` + healthStatusOK = `ok` + healthStatusError = `error` + healthCheckConcurrency = 10 ) type healthCheckResult struct { @@ -70,7 +71,12 @@ func (h *Heimdall) commandHealthHandler(w http.ResponseWriter, r *http.Request) } func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Context, commandID *string) { - results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx, commandID)) + pairs, err := h.resolveHealthPairs(ctx, commandID) + if err != nil { + writeAPIError(w, fmt.Errorf("error resolving health check pairs: %w", err), nil) + return + } + results := h.runHealthChecks(ctx, pairs) healthy := true for _, res := range results { @@ -92,36 +98,40 @@ func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Contex w.Write(data) } -func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) []*healthPair { +func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) ([]*healthPair, error) { var pairs []*healthPair if commandID != nil { cmd, found := h.Commands[*commandID] if !found { - return pairs + return pairs, nil } return h.resolveHealthPairsForCommand(ctx, cmd) } for _, cmd := range h.Commands { - pairs = append(pairs, h.resolveHealthPairsForCommand(ctx, cmd)...) + cmdPairs, err := h.resolveHealthPairsForCommand(ctx, cmd) + if err != nil { + return pairs, err + } + pairs = append(pairs, cmdPairs...) } - return pairs + return pairs, nil } -func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) []*healthPair { +func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) ([]*healthPair, error) { var pairs []*healthPair if cmd == nil { - return pairs + return pairs, nil } if !cmd.HealthCheck { - return pairs + return pairs, nil } // check DB for command status to avoid unnecessary health checks for inactive commands dbCmd, err := h.getCommandStatus(ctx, cmd) if err != nil { - return pairs + return pairs, err } if dbCmd.(*command.Command).Status != status.Active { - return pairs + return pairs, nil } for _, cl := range h.Clusters { if cl.Status != status.Active { @@ -131,16 +141,19 @@ func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *comman pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) } } - return pairs + return pairs, nil } func (h *Heimdall) runHealthChecks(ctx context.Context, pairs []*healthPair) []healthCheckResult { results := make([]healthCheckResult, len(pairs)) + sem := make(chan struct{}, healthCheckConcurrency) var wg sync.WaitGroup for i, pair := range pairs { wg.Add(1) go func(i int, pair *healthPair) { defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() results[i] = h.checkPair(ctx, pair) }(i, pair) }