From 9abc62e091b6e8a269e126b5f37f9c55acfdb000 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 15 Apr 2026 15:02:06 +0900 Subject: [PATCH 1/2] fix regex resolver match 0 or 1 tenant bug Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + integration/querier_tenant_federation_test.go | 140 ++++++++++++++++++ .../tenantfederation/merge_queryable.go | 6 +- .../metadata_merge_querier.go | 2 +- 4 files changed, 145 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be9b56339ef..f9f8ef81d81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [BUGFIX] Fix memory leak in `ReuseWriteRequestV2` by explicitly clearing the `Symbols` backing array string pointers before returning the object to `sync.Pool`. #7373 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 * [BUGFIX] KV store: Fix false-positive `status_code="500"` metrics for HA tracker CAS operations when using memberlist. #7408 +* [BUGFIX] Tenant Federation: Fix `unsupported character` error when `tenant-federation.regex-matcher-enabled` is enabled and the input regex matches 0 or 1 existing tenant. #7424 ## 1.21.0 in progress diff --git a/integration/querier_tenant_federation_test.go b/integration/querier_tenant_federation_test.go index 46a33933ee2..4378d295383 100644 --- a/integration/querier_tenant_federation_test.go +++ b/integration/querier_tenant_federation_test.go @@ -126,6 +126,146 @@ func TestRegexResolver_NewlyCreatedTenant(t *testing.T) { require.Equal(t, expectedVector, result.(model.Vector)) } +// Test that when the regex resolver is enabled, and 0 or 1 tenants are matched. +// See issue 7413, https://github.com/cortexproject/cortex/issues/7413 +func Test_TenantFederationRegexResolver_WhenSingleTenantMatched(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + memcached := e2ecache.NewMemcached() + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-querier.cache-results": "true", + "-querier.split-queries-by-interval": "24h", + "-limits.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range + "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-tenant-federation.enabled": "true", + "-tenant-federation.regex-matcher-enabled": "true", + "-tenant-federation.user-sync-interval": "5s", + + // to upload block quickly + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + + // store gateway + "-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(), + "-querier.max-fetched-series-per-query": "1", + }) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start ingester and distributor. + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(ingester, distributor)) + + // Wait until distributor have updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Start the query-frontend. + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + // Start the querier and store-gateway + flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint() + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + // Start queriers. + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) + require.NoError(t, s.WaitReady(queryFrontend)) + + // Wait until the querier and store-gateway have updated ring + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + + clientForMatchOneTenant, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + var series []prompb.TimeSeries + now := time.Now() + series, expectedResult := generateSeries("series_1", now) + // To ship series_1 block + series2, _ := generateSeries("series_2", now.Add(blockRangePeriod*2)) + metadata := []prompb.MetricMetadata{ + { + MetricFamilyName: "series_1", + Help: "help", + Unit: "total", + }, + } + + res, err := clientForMatchOneTenant.Push(series, metadata...) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = clientForMatchOneTenant.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // wait to upload blocks + require.NoError(t, ingester.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_ingester_shipper_uploads_total"}, e2e.WaitMissingMetrics)) + + // wait to update knownUsers + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + + clientForMatchOneTenant, err = e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+") + require.NoError(t, err) + + // query + result, err := clientForMatchOneTenant.Query("series_1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + require.Equal(t, expectedResult, result.(model.Vector)) + + // label names + start := now.Add(-time.Minute * 5) + end := now + labelNames, err := clientForMatchOneTenant.LabelNames(start, end, "series_1") + require.NoError(t, err) + require.Len(t, labelNames, 1) + + // label value + labelValues, err := clientForMatchOneTenant.LabelValues("__name__", start, end, []string{"series_1"}) + require.NoError(t, err) + require.Len(t, labelValues, 1) + + // metadata + metadataResult, err := clientForMatchOneTenant.Metadata("series_1", "") + require.NoError(t, err) + require.Len(t, metadataResult, 1) + + clientForMatchZeroTenant, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-11111.+") + require.NoError(t, err) + + // query + result, err = clientForMatchZeroTenant.Query("series_1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + + // label names + labelNames, err = clientForMatchZeroTenant.LabelNames(start, end, "series_1") + require.NoError(t, err) + require.Len(t, labelNames, 0) + + // label value + labelValues, err = clientForMatchZeroTenant.LabelValues("__name__", start, end, []string{"series_1"}) + require.NoError(t, err) + require.Len(t, labelValues, 0) + + // metadata + metadataResult, err = clientForMatchZeroTenant.Metadata("series_1", "") + require.NoError(t, err) + require.Len(t, metadataResult, 0) +} + func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTenantFederationConfig) { const numUsers = 10 const blockRangePeriod = 5 * time.Second diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 111627a7478..e752d9dea74 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -161,7 +161,7 @@ func (m *mergeQuerier) LabelValues(ctx context.Context, name string, hints *stor // by pass when only single querier is returned if m.byPassWithSingleQuerier && len(queriers) == 1 { - return queriers[0].LabelValues(ctx, name, hints, matchers...) + return queriers[0].LabelValues(user.InjectOrgID(ctx, ids[0]), name, hints, matchers...) } log, _ := spanlogger.New(ctx, "mergeQuerier.LabelValues") defer log.Finish() @@ -202,7 +202,7 @@ func (m *mergeQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints // by pass when only single querier is returned if m.byPassWithSingleQuerier && len(queriers) == 1 { - return queriers[0].LabelNames(ctx, hints, matchers...) + return queriers[0].LabelNames(user.InjectOrgID(ctx, ids[0]), hints, matchers...) } log, _ := spanlogger.New(ctx, "mergeQuerier.LabelNames") defer log.Finish() @@ -349,7 +349,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora // by pass when only single querier is returned if m.byPassWithSingleQuerier && len(queriers) == 1 { - return queriers[0].Select(ctx, sortSeries, hints, matchers...) + return queriers[0].Select(user.InjectOrgID(ctx, ids[0]), sortSeries, hints, matchers...) } log, ctx := spanlogger.New(ctx, "mergeQuerier.Select") diff --git a/pkg/querier/tenantfederation/metadata_merge_querier.go b/pkg/querier/tenantfederation/metadata_merge_querier.go index b42cafac7bb..5b474ea174e 100644 --- a/pkg/querier/tenantfederation/metadata_merge_querier.go +++ b/pkg/querier/tenantfederation/metadata_merge_querier.go @@ -61,7 +61,7 @@ func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context, req *client. m.tenantsPerMetadataQuery.Observe(float64(len(tenantIds))) if len(tenantIds) == 1 { - return m.upstream.MetricsMetadata(ctx, req) + return m.upstream.MetricsMetadata(user.InjectOrgID(ctx, tenantIds[0]), req) } jobs := make([]any, len(tenantIds)) From 55e0fadb49ebc1c655e6cff3ae962db1823e289e Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 16 Apr 2026 14:30:52 +0900 Subject: [PATCH 2/2] fix test Signed-off-by: SungJin1212 --- integration/querier_tenant_federation_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/querier_tenant_federation_test.go b/integration/querier_tenant_federation_test.go index 4378d295383..d1e6d8c5012 100644 --- a/integration/querier_tenant_federation_test.go +++ b/integration/querier_tenant_federation_test.go @@ -214,7 +214,7 @@ func Test_TenantFederationRegexResolver_WhenSingleTenantMatched(t *testing.T) { require.NoError(t, ingester.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_ingester_shipper_uploads_total"}, e2e.WaitMissingMetrics)) // wait to update knownUsers - require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}, e2e.WaitMissingMetrics)) clientForMatchOneTenant, err = e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+") require.NoError(t, err) @@ -381,9 +381,9 @@ func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTe require.NoError(t, ingester.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_ingester_shipper_uploads_total"}, e2e.WaitMissingMetrics)) // wait to update knownUsers - require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}, e2e.WaitMissingMetrics)) if cfg.shuffleShardingEnabled { - require.NoError(t, querier2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + require.NoError(t, querier2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}, e2e.WaitMissingMetrics)) } // query all tenants