Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ commands:
version: 0.0.1
store_result_sync: false
description: Test ping command
health_check: true
tags:
- type:ping
cluster_tags:
Expand Down
210 changes: 210 additions & 0 deletions internal/pkg/heimdall/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package heimdall

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"sync"
"time"

"github.com/google/uuid"
"github.com/gorilla/mux"

"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/command"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/object/status"
"github.com/patterninc/heimdall/pkg/plugin"
)

const (
healthCheckTimeout = 30 * time.Second
healthCheckUser = `heimdall-health`
healthStatusOK = `ok`
healthStatusError = `error`
healthCheckConcurrency = 10
)

type healthCheckResult struct {
CommandID string `json:"command_id"`
ClusterID string `json:"cluster_id"`
Status string `json:"status"`
LatencyMs int64 `json:"latency_ms"`
Error string `json:"error,omitempty"`
}

type healthChecksResponse struct {
Healthy bool `json:"healthy"`
Checks []healthCheckResult `json:"checks"`
}

type healthPair struct {
cmd *command.Command
cluster *cluster.Cluster
handler plugin.Handler
}

func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout)
defer cancel()
h.writeHealthResponse(w, ctx, nil)
}

func (h *Heimdall) commandHealthHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout)
defer cancel()

commandID := mux.Vars(r)[idKey]
cmd, found := h.Commands[commandID]
if !found {
writeAPIError(w, ErrUnknownCommandID, nil)
return
}
if !cmd.HealthCheck {
writeAPIError(w, fmt.Errorf("command %s has not opted into health checks", commandID), nil)
return
}

h.writeHealthResponse(w, ctx, &commandID)
}

func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Context, commandID *string) {
pairs, err := h.resolveHealthPairs(ctx, commandID)
if err != nil {
writeAPIError(w, fmt.Errorf("error resolving health check pairs: %w", err), nil)
return
}
results := h.runHealthChecks(ctx, pairs)

healthy := true
for _, res := range results {
if res.Status == healthStatusError {
healthy = false
break
}
}

resp := healthChecksResponse{Healthy: healthy, Checks: results}
data, _ := json.Marshal(resp)

w.Header().Set(contentTypeKey, contentTypeJSON)
if healthy {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
w.Write(data)
}

func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) ([]*healthPair, error) {
var pairs []*healthPair
if commandID != nil {
cmd, found := h.Commands[*commandID]
if !found {
return pairs, nil
}
return h.resolveHealthPairsForCommand(ctx, cmd)
}
for _, cmd := range h.Commands {
cmdPairs, err := h.resolveHealthPairsForCommand(ctx, cmd)
if err != nil {
return pairs, err
}
pairs = append(pairs, cmdPairs...)
}
return pairs, nil
}

func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) ([]*healthPair, error) {
var pairs []*healthPair
if cmd == nil {
return pairs, nil
}
if !cmd.HealthCheck {
return pairs, nil
}
// check DB for command status to avoid unnecessary health checks for inactive commands
dbCmd, err := h.getCommandStatus(ctx, cmd)
if err != nil {
return pairs, err
}
if dbCmd.(*command.Command).Status != status.Active {
return pairs, nil
}
for _, cl := range h.Clusters {
if cl.Status != status.Active {
continue
}
if cl.Tags.Contains(cmd.ClusterTags) {
pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]})
}
}
return pairs, nil
}

func (h *Heimdall) runHealthChecks(ctx context.Context, pairs []*healthPair) []healthCheckResult {
results := make([]healthCheckResult, len(pairs))
sem := make(chan struct{}, healthCheckConcurrency)
var wg sync.WaitGroup
for i, pair := range pairs {
wg.Add(1)
go func(i int, pair *healthPair) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
results[i] = h.checkPair(ctx, pair)
}(i, pair)
}
wg.Wait()
return results
}

func (h *Heimdall) checkPair(ctx context.Context, pair *healthPair) healthCheckResult {
start := time.Now()
res := healthCheckResult{CommandID: pair.cmd.ID, ClusterID: pair.cluster.ID}

var err error
if hc, ok := pair.handler.(plugin.HealthChecker); ok {
err = hc.HealthCheck(ctx, pair.cluster)
} else {
err = h.pluginProbe(ctx, pair.cluster, pair.handler)
}
Comment on lines +168 to +173
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback path for handlers that don’t implement plugin.HealthChecker calls handler.Execute(...) as a probe. This is risky because Execute may be stateful/side-effecting and may assume a fully-populated job.Job (some built-in commands call j.Context.Unmarshal(...) without a nil check, which would panic here). Safer options are to require HealthChecker for commands that opt into health checks, or make the fallback return a clear "health check not implemented" error instead of executing the command.

Copilot uses AI. Check for mistakes.

res.LatencyMs = time.Since(start).Milliseconds()
if err != nil {
res.Status = healthStatusError
res.Error = err.Error()
} else {
res.Status = healthStatusOK
}
return res
}

func (h *Heimdall) pluginProbe(ctx context.Context, cl *cluster.Cluster, handler plugin.Handler) error {
j := &job.Job{}
j.ID = uuid.NewString()
j.User = healthCheckUser

tmpDir, err := os.MkdirTemp("", "heimdall-health-*")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)

runtime := &plugin.Runtime{
WorkingDirectory: tmpDir,
ResultDirectory: tmpDir + separator + "result",
Version: h.Version,
UserAgent: fmt.Sprintf(formatUserAgent, h.Version),
}

if err := runtime.Set(); err != nil {
return err
}
defer runtime.Stdout.Close()
defer runtime.Stderr.Close()

return handler.Execute(ctx, runtime, j, cl)
}
2 changes: 2 additions & 0 deletions internal/pkg/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func (h *Heimdall) Start() error {
apiRouter.Methods(methodGET).PathPrefix(`/command/statuses`).HandlerFunc(payloadHandler(h.getCommandStatuses))
apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.getCommandStatus))
apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.updateCommandStatus))
apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler)
apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/health`).HandlerFunc(h.commandHealthHandler)
apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.submitCommand))
apiRouter.Methods(methodGET).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.getCommand))
apiRouter.Methods(methodGET).PathPrefix(`/commands`).HandlerFunc(payloadHandler(h.getCommands))
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type Janitor struct {
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"`
CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"`
CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"`
db *database.Database
commandHandlers map[string]plugin.Handler
clusters cluster.Clusters
commandHandlers map[string]plugin.Handler
clusters cluster.Clusters
}

func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.Handler, clusters cluster.Clusters) error {
Expand Down
49 changes: 24 additions & 25 deletions internal/pkg/object/command/clickhouse/column_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,32 +148,31 @@ func handleDecimal(nullable bool) (any, func() any) {
}
}
func handleTuple(nullable bool) (any, func() any) {
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
}


func handleArray(nullable bool) (any, func() any) {
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
}

func handleDefault(nullable bool) (any, func() any) {
Expand Down Expand Up @@ -207,8 +206,8 @@ func unwrapCHType(t string) (base string, nullable bool) {
return "Array", nullable
}
if strings.HasPrefix(s, "Tuple(") {
return "Tuple", nullable
}
return "Tuple", nullable
}

// Decimal(N,S) normalize to "Decimal"
if isDecimal(s) {
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/object/command/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func splitAndTrimQueries(query string) []string {
return queries
}

func (p *postgresCommandContext)Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error{
func (p *postgresCommandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
// implement me
return nil
}
}
1 change: 1 addition & 0 deletions pkg/object/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Command struct {
Plugin string `yaml:"plugin,omitempty" json:"plugin,omitempty"`
IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"`
ClusterTags *set.Set[string] `yaml:"cluster_tags,omitempty" json:"cluster_tags,omitempty"`
HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"`
Handler plugin.Handler `yaml:"-" json:"-"`
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ type Handler interface {
Execute(context.Context, *Runtime, *job.Job, *cluster.Cluster) error
Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error
}

type HealthChecker interface {
HealthCheck(ctx context.Context, c *cluster.Cluster) error
}
Loading