Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [BUGFIX] Fix memory leak in `ReuseWriteRequestV2` by explicitly clearing the `Symbols` backing array string pointers before returning the object to `sync.Pool`. #7373
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
* [BUGFIX] KV store: Fix false-positive `status_code="500"` metrics for HA tracker CAS operations when using memberlist. #7408
* [BUGFIX] Tenant Federation: Fix `unsupported character` error when `tenant-federation.regex-matcher-enabled` is enabled and the input regex matches 0 or 1 existing tenant. #7424

## 1.21.0 in progress

Expand Down
140 changes: 140 additions & 0 deletions integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,146 @@ func TestRegexResolver_NewlyCreatedTenant(t *testing.T) {
require.Equal(t, expectedVector, result.(model.Vector))
}

// Test that when the regex resolver is enabled, and 0 or 1 tenants are matched.
// See issue 7413, https://github.com/cortexproject/cortex/issues/7413
func Test_TenantFederationRegexResolver_WhenSingleTenantMatched(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

memcached := e2ecache.NewMemcached()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-limits.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
"-tenant-federation.regex-matcher-enabled": "true",
"-tenant-federation.user-sync-interval": "5s",

// to upload block quickly
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),

// store gateway
"-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(),
"-querier.max-fetched-series-per-query": "1",
})

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

// Start ingester and distributor.
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(ingester, distributor))

// Wait until distributor have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Start the query-frontend.
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

// Start the querier and store-gateway
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

// Start queriers.
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
require.NoError(t, s.WaitReady(queryFrontend))

// Wait until the querier and store-gateway have updated ring
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))

clientForMatchOneTenant, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

var series []prompb.TimeSeries
now := time.Now()
series, expectedResult := generateSeries("series_1", now)
// To ship series_1 block
series2, _ := generateSeries("series_2", now.Add(blockRangePeriod*2))
metadata := []prompb.MetricMetadata{
{
MetricFamilyName: "series_1",
Help: "help",
Unit: "total",
},
}

res, err := clientForMatchOneTenant.Push(series, metadata...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = clientForMatchOneTenant.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// wait to upload blocks
require.NoError(t, ingester.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_ingester_shipper_uploads_total"}, e2e.WaitMissingMetrics))

// wait to update knownUsers
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}, e2e.WaitMissingMetrics))


clientForMatchOneTenant, err = e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+")
require.NoError(t, err)

// query
result, err := clientForMatchOneTenant.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
require.Equal(t, expectedResult, result.(model.Vector))

// label names
start := now.Add(-time.Minute * 5)
end := now
labelNames, err := clientForMatchOneTenant.LabelNames(start, end, "series_1")
require.NoError(t, err)
require.Len(t, labelNames, 1)

// label value
labelValues, err := clientForMatchOneTenant.LabelValues("__name__", start, end, []string{"series_1"})
require.NoError(t, err)
require.Len(t, labelValues, 1)

// metadata
metadataResult, err := clientForMatchOneTenant.Metadata("series_1", "")
require.NoError(t, err)
require.Len(t, metadataResult, 1)

clientForMatchZeroTenant, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-11111.+")
require.NoError(t, err)

// query
result, err = clientForMatchZeroTenant.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())

// label names
labelNames, err = clientForMatchZeroTenant.LabelNames(start, end, "series_1")
require.NoError(t, err)
require.Len(t, labelNames, 0)

// label value
labelValues, err = clientForMatchZeroTenant.LabelValues("__name__", start, end, []string{"series_1"})
require.NoError(t, err)
require.Len(t, labelValues, 0)

// metadata
metadataResult, err = clientForMatchZeroTenant.Metadata("series_1", "")
require.NoError(t, err)
require.Len(t, metadataResult, 0)
}

func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTenantFederationConfig) {
const numUsers = 10
const blockRangePeriod = 5 * time.Second
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (m *mergeQuerier) LabelValues(ctx context.Context, name string, hints *stor

// by pass when only single querier is returned
if m.byPassWithSingleQuerier && len(queriers) == 1 {
return queriers[0].LabelValues(ctx, name, hints, matchers...)
return queriers[0].LabelValues(user.InjectOrgID(ctx, ids[0]), name, hints, matchers...)
}
log, _ := spanlogger.New(ctx, "mergeQuerier.LabelValues")
defer log.Finish()
Expand Down Expand Up @@ -202,7 +202,7 @@ func (m *mergeQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints

// by pass when only single querier is returned
if m.byPassWithSingleQuerier && len(queriers) == 1 {
return queriers[0].LabelNames(ctx, hints, matchers...)
return queriers[0].LabelNames(user.InjectOrgID(ctx, ids[0]), hints, matchers...)
}
log, _ := spanlogger.New(ctx, "mergeQuerier.LabelNames")
defer log.Finish()
Expand Down Expand Up @@ -349,7 +349,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora

// by pass when only single querier is returned
if m.byPassWithSingleQuerier && len(queriers) == 1 {
return queriers[0].Select(ctx, sortSeries, hints, matchers...)
return queriers[0].Select(user.InjectOrgID(ctx, ids[0]), sortSeries, hints, matchers...)
}

log, ctx := spanlogger.New(ctx, "mergeQuerier.Select")
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tenantfederation/metadata_merge_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context, req *client.
m.tenantsPerMetadataQuery.Observe(float64(len(tenantIds)))

if len(tenantIds) == 1 {
return m.upstream.MetricsMetadata(ctx, req)
return m.upstream.MetricsMetadata(user.InjectOrgID(ctx, tenantIds[0]), req)
}

jobs := make([]any, len(tenantIds))
Expand Down
Loading