Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions pkg/alert_rule/alert_rule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package alertrule

import (
"crypto/sha256"
"encoding/base64"
"fmt"
"regexp"
"sort"
"strings"
"unicode/utf8"

"github.com/openshift/monitoring-plugin/pkg/managementlabels"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
)

var promLabelNameRegexp = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`)

func GetAlertingRuleId(alertRule *monitoringv1.Rule) string {
var name string
var kind string
if alertRule.Alert != "" {
name = alertRule.Alert
kind = "alert"
} else if alertRule.Record != "" {
name = alertRule.Record
kind = "record"
} else {
return ""
}

expr := normalizeExpr(alertRule.Expr.String())
forDuration := ""
if alertRule.For != nil {
forDuration = strings.TrimSpace(string(*alertRule.For))
}

labelsBlock := normalizedBusinessLabelsBlock(alertRule.Labels)

// Canonical payload is intentionally derived from rule spec (expr/for/labels) and identity (kind/name),
// and excludes annotations and openshift_io_* provenance/system labels.
canonicalPayload := strings.Join([]string{kind, name, expr, forDuration, labelsBlock}, "\n---\n")

// Generate SHA256 hash
hash := sha256.Sum256([]byte(canonicalPayload))

return "rid_" + base64.RawURLEncoding.EncodeToString(hash[:])
}

func normalizeExpr(expr string) string {
// Collapse consecutive whitespace so cosmetic formatting changes do not churn ids.
return strings.Join(strings.Fields(strings.TrimSpace(expr)), " ")
}

func normalizedBusinessLabelsBlock(in map[string]string) string {
if len(in) == 0 {
return ""
}

lines := make([]string, 0, len(in))
for k, v := range in {
key := strings.TrimSpace(k)
if key == "" {
continue
}
if strings.HasPrefix(key, "openshift_io_") || key == managementlabels.AlertNameLabel {
// Skip system labels
continue
}
if !promLabelNameRegexp.MatchString(strings.TrimSpace(key)) {
continue
}
if v == "" {
// Align with specHash behavior: drop empty values
continue
}
if !utf8.ValidString(v) {
continue
}

lines = append(lines, fmt.Sprintf("%s=%s", key, v))
}

sort.Strings(lines)
return strings.Join(lines, "\n")
}
102 changes: 102 additions & 0 deletions pkg/k8s/alert_relabel_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package k8s

import (
"context"
"fmt"

osmv1 "github.com/openshift/api/monitoring/v1"
osmv1client "github.com/openshift/client-go/monitoring/clientset/versioned"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)

type alertRelabelConfigManager struct {
clientset *osmv1client.Clientset
arcInformer cache.SharedIndexInformer
}

func newAlertRelabelConfigManager(ctx context.Context, clientset *osmv1client.Clientset) (*alertRelabelConfigManager, error) {
arcInformer := cache.NewSharedIndexInformer(
alertRelabelConfigListWatchForAllNamespaces(clientset),
&osmv1.AlertRelabelConfig{},
0,
cache.Indexers{},
)

arcm := &alertRelabelConfigManager{
clientset: clientset,
arcInformer: arcInformer,
}

go arcm.arcInformer.Run(ctx.Done())

if !cache.WaitForNamedCacheSync("AlertRelabelConfig informer", ctx.Done(), arcm.arcInformer.HasSynced) {
return nil, fmt.Errorf("failed to sync AlertRelabelConfig informer")
}

return arcm, nil
}

func alertRelabelConfigListWatchForAllNamespaces(clientset *osmv1client.Clientset) *cache.ListWatch {
return cache.NewListWatchFromClient(clientset.MonitoringV1().RESTClient(), "alertrelabelconfigs", "", fields.Everything())
}

func (arcm *alertRelabelConfigManager) List(ctx context.Context, namespace string) ([]osmv1.AlertRelabelConfig, error) {
arcs := arcm.arcInformer.GetStore().List()

alertRelabelConfigs := make([]osmv1.AlertRelabelConfig, 0, len(arcs))
for _, item := range arcs {
arc, ok := item.(*osmv1.AlertRelabelConfig)
if !ok {
continue
}
if namespace != "" && arc.Namespace != namespace {
continue
}
alertRelabelConfigs = append(alertRelabelConfigs, *arc)
}

return alertRelabelConfigs, nil
}

func (arcm *alertRelabelConfigManager) Get(ctx context.Context, namespace string, name string) (*osmv1.AlertRelabelConfig, bool, error) {
arc, err := arcm.clientset.MonitoringV1().AlertRelabelConfigs(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil, false, nil
}

return nil, false, err
}

return arc, true, nil
}

func (arcm *alertRelabelConfigManager) Create(ctx context.Context, arc osmv1.AlertRelabelConfig) (*osmv1.AlertRelabelConfig, error) {
created, err := arcm.clientset.MonitoringV1().AlertRelabelConfigs(arc.Namespace).Create(ctx, &arc, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create AlertRelabelConfig %s/%s: %w", arc.Namespace, arc.Name, err)
}

return created, nil
}

func (arcm *alertRelabelConfigManager) Update(ctx context.Context, arc osmv1.AlertRelabelConfig) error {
_, err := arcm.clientset.MonitoringV1().AlertRelabelConfigs(arc.Namespace).Update(ctx, &arc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update AlertRelabelConfig %s/%s: %w", arc.Namespace, arc.Name, err)
}

return nil
}

func (arcm *alertRelabelConfigManager) Delete(ctx context.Context, namespace string, name string) error {
err := arcm.clientset.MonitoringV1().AlertRelabelConfigs(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete AlertRelabelConfig %s: %w", name, err)
}

return nil
}
127 changes: 127 additions & 0 deletions pkg/k8s/alerting_health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package k8s

import (
"context"
"fmt"
"strings"
"sync"

"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

const (
clusterMonitoringConfigMap = "cluster-monitoring-config"
clusterMonitoringConfigKey = "config.yaml"
)

type clusterMonitoringConfig struct {
EnableUserWorkload bool `yaml:"enableUserWorkload"`
}

// clusterMonitoringConfigManager watches the cluster-monitoring-config ConfigMap
// via an informer and caches the parsed enableUserWorkload value so that
// AlertingHealth never needs a live API call.
type clusterMonitoringConfigManager struct {
informer cache.SharedIndexInformer

mu sync.RWMutex
enabled bool
err error
}

func newClusterMonitoringConfigManager(ctx context.Context, clientset *kubernetes.Clientset) (*clusterMonitoringConfigManager, error) {
informer := cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
"configmaps",
ClusterMonitoringNamespace,
fields.OneTermEqualSelector("metadata.name", clusterMonitoringConfigMap),
),
&corev1.ConfigMap{},
0,
cache.Indexers{},
)

m := &clusterMonitoringConfigManager{
informer: informer,
}

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
m.handleUpdate(cm)
},
UpdateFunc: func(_, newObj interface{}) {
cm, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}
m.handleUpdate(cm)
},
DeleteFunc: func(_ interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
m.enabled = false
m.err = nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to add event handler to cluster-monitoring-config informer: %w", err)
}

go informer.Run(ctx.Done())

if !cache.WaitForNamedCacheSync("ClusterMonitoringConfig informer", ctx.Done(), informer.HasSynced) {
return nil, fmt.Errorf("failed to sync ClusterMonitoringConfig informer")
}

return m, nil
}

func (m *clusterMonitoringConfigManager) handleUpdate(cm *corev1.ConfigMap) {
m.mu.Lock()
defer m.mu.Unlock()

raw, ok := cm.Data[clusterMonitoringConfigKey]
if !ok || strings.TrimSpace(raw) == "" {
m.enabled = false
m.err = nil
return
}

var cfg clusterMonitoringConfig
if err := yaml.Unmarshal([]byte(raw), &cfg); err != nil {
m.enabled = false
m.err = fmt.Errorf("parse cluster monitoring config.yaml: %w", err)
return
}

m.enabled = cfg.EnableUserWorkload
m.err = nil
}

func (m *clusterMonitoringConfigManager) userWorkloadEnabled() (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.enabled, m.err
}

// AlertingHealth returns alerting route health and UWM enablement status.
func (c *client) AlertingHealth(ctx context.Context) (AlertingHealth, error) {
health := c.prometheusAlerts.alertingHealth(ctx)

enabled, err := c.clusterMonitoringConfig.userWorkloadEnabled()
if err != nil {
return health, fmt.Errorf("failed to determine user workload enablement: %w", err)
}
health.UserWorkloadEnabled = enabled

return health, nil
}
Loading