diff --git a/CHANGELOG.md b/CHANGELOG.md index 11e3f03b89e..31b9f7d6d28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [BUGFIX] Fix memory leak in `ReuseWriteRequestV2` by explicitly clearing the `Symbols` backing array string pointers before returning the object to `sync.Pool`. #7373 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 * [BUGFIX] KV store: Fix false-positive `status_code="500"` metrics for HA tracker CAS operations when using memberlist. #7408 +* [BUGFIX] Memberlist: Skip nil values delivered by `WatchPrefix` when a key is deleted, preventing a panic in the HA tracker caused by a failed type assertion on a nil interface value. #7429 ## 1.21.0 in progress diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 552c582d2f9..5376bb91ee8 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -223,6 +223,66 @@ func TestHATracker_CleanupDeletesArePropagatedWithMemberlist(t *testing.T) { require.NotEmpty(t, broadcasts, "Cleanup Delete should generate a broadcast for tombstone propagation") } +// TestWatchPrefixNilPanicWithMemberlist reproduces the panic at ha_tracker.go:437: +// With memberlist, WatchPrefix delivers a nil value when a key is deleted +// (memberlist KV.get() returns nil for deleted/tombstone keys). +func TestWatchPrefixNilPanicWithMemberlist(t *testing.T) { + ctx := t.Context() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + + var kvCfg memberlist.KVConfig + flagext.DefaultValues(&kvCfg) + replicaDescCodec := GetReplicaDescCodec() + kvCfg.Codecs = []codec.Codec{replicaDescCodec} + + mkv := memberlist.NewKV(kvCfg, logger, &dnsProviderMock{}, reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, mkv)) + defer services.StopAndAwaitTerminated(ctx, mkv) //nolint:errcheck + + client, err := memberlist.NewClient(mkv, replicaDescCodec) + require.NoError(t, err) + + trackerCfg := HATrackerConfig{ + EnableHATracker: false, // to inject our client before starting the tracker + UpdateTimeout: time.Second, + UpdateTimeoutJitterMax: 0, + FailoverTimeout: 2 * time.Second, + KVStore: kv.Config{Store: "memberlist"}, + } + + tracker, err := NewHATracker(trackerCfg, nil, HATrackerStatusConfig{}, reg, "test", logger) + require.NoError(t, err) + tracker.cfg.EnableHATracker = true + tracker.client = client + + // Start the tracker — this starts the WatchPrefix loop in loop(). + require.NoError(t, services.StartAndAwaitRunning(ctx, tracker)) + defer services.StopAndAwaitTerminated(ctx, tracker) //nolint:errcheck + + userID := "user1" + cluster := "cluster1" + replica := "replica0" + key := userID + "/" + cluster + + now := time.Now() + require.NoError(t, tracker.CheckReplica(ctx, userID, cluster, replica, now)) + + test.Poll(t, 3*time.Second, nil, func() any { + tracker.electedLock.RLock() + defer tracker.electedLock.RUnlock() + if _, ok := tracker.elected[key]; !ok { + return fmt.Errorf("waiting for key to appear in elected cache") + } + return nil + }) + + require.NoError(t, client.Delete(ctx, key)) + + time.Sleep(500 * time.Millisecond) + require.Equal(t, services.Running, tracker.State(), "HATracker should still be running after receiving nil value from memberlist WatchPrefix") +} + // Test that values are set in the HATracker after WatchPrefix has found it in the KVStore. func TestWatchPrefixAssignment(t *testing.T) { t.Parallel() diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 3a61817dcc9..27005a5ed7d 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -853,6 +853,11 @@ func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, continue } + if val == nil { + // Skip nil that can be returned when the key is deleted. + continue + } + if !f(key, val) { return }