Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
Expand Down
14 changes: 14 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,20 @@ querier:
# mixed block types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# If true, classify query timeouts as 4XX (user error) or 5XX (system error)
# based on phase timing.
# CLI flag: -querier.timeout-classification-enabled
[timeout_classification_enabled: <boolean> | default = false]

# The total time before the querier proactively cancels a query for timeout
# classification. Set this a few seconds less than the querier timeout.
# CLI flag: -querier.timeout-classification-deadline
[timeout_classification_deadline: <duration> | default = 1m59s]

# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 1m30s]
```

### `blocks_storage_config`
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4975,6 +4975,20 @@ thanos_engine:
# types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# If true, classify query timeouts as 4XX (user error) or 5XX (system error)
# based on phase timing.
# CLI flag: -querier.timeout-classification-enabled
[timeout_classification_enabled: <boolean> | default = false]

# The total time before the querier proactively cancels a query for timeout
# classification. Set this a few seconds less than the querier timeout.
# CLI flag: -querier.timeout-classification-deadline
[timeout_classification_deadline: <duration> | default = 1m59s]

# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 1m30s]
```

### `query_frontend_config`
Expand Down
57 changes: 57 additions & 0 deletions docs/guides/timeout-classification.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Timeout Classification

Timeout classification lets Cortex distinguish between query timeouts caused by expensive user queries (4XX) and those caused by system issues (5XX). When enabled, queries that spend most of their time in PromQL evaluation are returned as `422 Unprocessable Entity` instead of `503 Service Unavailable`, giving callers a clear signal to simplify the query rather than retry.

## How It Works

When a query (instant/ranged, other apis are unchanged) arrives at the querier, the feature:

1. Subtracts any time the query spent waiting in the scheduler queue from the configured deadline.
2. Sets a proactive context timeout using the remaining budget, so the querier cancels the query slightly before the PromQL engine's own timeout fires.
3. On timeout, inspects phase timings (storage fetch time vs. total time) to compute eval time.
4. If eval time exceeds the configured threshold, the timeout is classified as a user error (4XX). Otherwise it remains a system error (5XX).

This means expensive queries that burn their budget in PromQL evaluation get a `422`, while other queries remain a `5XX`.

* Note that due to different query shards not returning at the same time, the first returned timed out shard gets to decide whether the query will be converted to 4XX.

## Configuration

Enable the feature and set the three related flags:
Comment thread
yeya24 marked this conversation as resolved.

```yaml
querier:
timeout_classification_enabled: true
timeout_classification_deadline: 1m59s
timeout_classification_eval_threshold: 1m30s
```

| Flag | Default | Description |
|---|---|---|
| `timeout_classification_enabled` | `false` | Enable 5XX-to-4XX conversion based on phase timing. |
| `timeout_classification_deadline` | `1m59s` | Proactive cancellation deadline. Set this a few seconds less than the querier timeout. |
| `timeout_classification_eval_threshold` | `1m30s` | Eval time above which a timeout is classified as user error (4XX). Must be ≤ the deadline. |

### Constraints

- `timeout_classification_deadline` must be positive and strictly less than `querier.timeout`.
- `timeout_classification_eval_threshold` must be positive and ≤ `timeout_classification_deadline`.
- Query stats must be enabled (`query_stats_enabled: true` on the frontend handler) for classification to work.

## Tuning

- The deadline should be close to but below the querier timeout so the proactive cancellation fires first. A gap of 1–2 seconds is typical.
- The eval threshold controls sensitivity. A lower threshold classifies more timeouts as user errors; a higher threshold is more conservative. Start with the default and adjust based on your workload.
- Monitor the `decision` field in the timeout classification log line (`query shard timed out with classification`) to see how queries are being classified before enabling the conversion.

## Observability

When a query times out and query stats is active, the querier emits a warning-level log line containing:

- `queue_wait_time` — time spent in the scheduler queue
- `query_storage_wall_time` — time spent fetching data from storage
- `eval_time` — computed as `total_time - query_storage_wall_time`
- `decision` — `0` for 5XX (system), `1` for 4XX (user)
- `conversion_enabled` — whether the status code conversion is active

These fields are logged regardless of whether conversion is enabled, so you can observe classification behavior in dry-run mode by setting `timeout_classification_enabled: false` and reviewing the logs.
6 changes: 5 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ func NewQuerierHandler(
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
api.Register(legacyPromRouter)

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, stats.PhaseTrackerConfig{
TotalTimeout: querierCfg.TimeoutClassificationDeadline,
EvalTimeThreshold: querierCfg.TimeoutClassificationEvalThreshold,
Enabled: querierCfg.TimeoutClassificationEnabled,
})

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
Expand Down
158 changes: 144 additions & 14 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import (
"github.com/cortexproject/cortex/pkg/distributed_execution"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/api"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
timeoutClassification stats.PhaseTrackerConfig
}

func NewQueryAPI(
Expand All @@ -42,15 +45,17 @@ func NewQueryAPI(
logger log.Logger,
codecs []v1.Codec,
CORSOrigin *regexp.Regexp,
timeoutClassification stats.PhaseTrackerConfig,
) *QueryAPI {
return &QueryAPI{
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
timeoutClassification: timeoutClassification,
}
}

Expand Down Expand Up @@ -84,6 +89,11 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -95,6 +105,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -138,6 +157,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand All @@ -159,6 +185,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -170,6 +201,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -211,6 +251,13 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand Down Expand Up @@ -281,6 +328,89 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data any, w
}
}

// applyTimeoutClassification creates a proactive context timeout that fires before
// the PromQL engine's own timeout, adjusted for queue wait time. Returns the
// (possibly wrapped) context, an optional cancel func, and an optional early-exit
// result when the entire timeout budget was already consumed in the queue.
func applyTimeoutClassification(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig) (context.Context, context.CancelFunc, *apiFuncResult) {
if !cfg.Enabled {
return ctx, nil, nil
}
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
effectiveTimeout := cfg.TotalTimeout - queueWaitTime
if effectiveTimeout <= 0 {
return ctx, nil, &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable,
"query timed out: query spent too long in scheduler queue")}, nil, nil}
}
ctx, cancel := context.WithTimeout(ctx, effectiveTimeout)
return ctx, cancel, nil
}

// classifyTimeout inspects phase timings after a context cancellation/timeout
// and returns an apiFuncResult if the timeout should be converted to a 4XX user error.
// Returns nil if no conversion applies and the caller should use the default error path.
func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig, warnings annotations.Annotations, closer func()) *apiFuncResult {
if !stats.IsEnabled(ctx) {
return nil
}

queryStats.SetQueryEnd(time.Now())

decision := stats.DecideTimeoutResponse(queryStats, cfg)

fetchTime := queryStats.LoadQueryStorageWallTime()
queryEnd := queryStats.LoadQueryEnd()
totalTime := queryEnd.Sub(queryStats.LoadQueryStart())
evalTime := totalTime - fetchTime
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
level.Warn(q.logger).Log(
"msg", "query shard timed out with classification",
"request_id", requestmeta.RequestIdFromContext(ctx),
"query_start", queryStats.LoadQueryStart(),
"query_end", queryEnd,
"queue_wait_time", queueWaitTime,
"query_storage_wall_time", fetchTime,
"eval_time", evalTime,
"total_time", totalTime,
"wall_time", queryStats.LoadWallTime(),
"response_series", queryStats.LoadResponseSeries(),
"fetched_series_count", queryStats.LoadFetchedSeries(),
"fetched_chunk_bytes", queryStats.LoadFetchedChunkBytes(),
"fetched_data_bytes", queryStats.LoadFetchedDataBytes(),
"fetched_samples_count", queryStats.LoadFetchedSamples(),
"fetched_chunks_count", queryStats.LoadFetchedChunks(),
"split_queries", queryStats.LoadSplitQueries(),
"store_gateway_touched_postings_count", queryStats.LoadStoreGatewayTouchedPostings(),
"store_gateway_touched_posting_bytes", queryStats.LoadStoreGatewayTouchedPostingBytes(),
"scanned_samples", queryStats.LoadScannedSamples(),
"peak_samples", queryStats.LoadPeakSamples(),
"decision", decision,
"conversion_enabled", cfg.Enabled,
)

if cfg.Enabled && decision == stats.UserError4XX {
return &apiFuncResult{nil, &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity,
"query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer}
}

if cfg.Enabled {
return &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusGatewayTimeout,
"%s", ErrUpstreamRequestTimeout)}, warnings, closer}
}

return nil
}

func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
for _, codec := range q.codecs {
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/queryapi/query_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func Test_CustomAPI(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})

router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
Expand Down Expand Up @@ -244,7 +244,7 @@ func Test_InvalidCodec(t *testing.T) {
},
}

queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"))
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})
router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))

Expand Down Expand Up @@ -285,7 +285,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) {
},
}

queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})

router := mux.NewRouter()
router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
Expand Down Expand Up @@ -441,7 +441,7 @@ func Test_Logicalplan_Requests(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})
router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler))
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/queryapi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

var (
ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time")
ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer")
ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time")
ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer")
ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
ErrUpstreamRequestTimeout = "upstream request timeout"
)

func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) {
Expand Down
Loading
Loading