Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Querier: Add active API requests tracker logging to help with OOMKill troubleshooting. #7216
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
51 changes: 39 additions & 12 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/request_tracker"
)

const (
Expand Down Expand Up @@ -285,35 +286,61 @@ func NewQuerierHandler(

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
var instantQueryHandler http.Handler
var rangedQueryHandler http.Handler
var legacyAPIHandler http.Handler
if requestTracker != nil {
apiHandler = request_tracker.NewRequestWrapper(promRouter, requestTracker, &request_tracker.ApiExtractor{})
legacyAPIHandler = request_tracker.NewRequestWrapper(legacyPromRouter, requestTracker, &request_tracker.ApiExtractor{})
instantQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.InstantQueryHandler), requestTracker, &request_tracker.InstantQueryExtractor{})
rangedQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.RangeQueryHandler), requestTracker, &request_tracker.RangedQueryExtractor{})

httpHeaderMiddleware := &HTTPHeaderMiddleware{
TargetHeaders: cfg.HTTPRequestHeadersToLog,
RequestIdHeader: cfg.RequestIdHeader,
}
apiHandler = httpHeaderMiddleware.Wrap(apiHandler)
legacyAPIHandler = httpHeaderMiddleware.Wrap(legacyAPIHandler)
instantQueryHandler = httpHeaderMiddleware.Wrap(instantQueryHandler)
rangedQueryHandler = httpHeaderMiddleware.Wrap(rangedQueryHandler)
} else {
apiHandler = promRouter
legacyAPIHandler = legacyPromRouter
instantQueryHandler = queryAPI.Wrap(queryAPI.InstantQueryHandler)
rangedQueryHandler = queryAPI.Wrap(queryAPI.RangeQueryHandler)
}

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(apiHandler)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyAPIHandler)

if cfg.buildInfoEnabled {
router.Path(path.Join(prefix, "/api/v1/status/buildinfo")).Methods("GET").Handler(promRouter)
Expand Down
123 changes: 123 additions & 0 deletions pkg/util/request_tracker/request_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package request_tracker

import (
"encoding/json"
"net/http"
"strings"
"time"
"unicode/utf8"

"github.com/cortexproject/cortex/pkg/util/requestmeta"
"github.com/cortexproject/cortex/pkg/util/users"
)

type Extractor interface {
Extract(r *http.Request) []byte
}

type DefaultExtractor struct{}

type ApiExtractor struct{}

type InstantQueryExtractor struct{}

type RangedQueryExtractor struct{}

func generateCommonMap(r *http.Request) map[string]interface{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add more headers including user agent, dashboard ID and panel ID

ctx := r.Context()
entryMap := make(map[string]interface{})
entryMap["timestamp-sec"] = time.Now().Unix()
entryMap["Path"] = r.URL.Path
entryMap["Method"] = r.Method
entryMap["TenantID"], _ = users.TenantID(ctx)
entryMap["RequestID"] = requestmeta.RequestIdFromContext(ctx)
entryMap["UserAgent"] = r.Header.Get("User-Agent")
entryMap["DashboardUID"] = r.Header.Get("X-Dashboard-UID")
entryMap["PanelId"] = r.Header.Get("X-Panel-Id")

return entryMap
}

func (e *DefaultExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)

return generateJSONEntry(entryMap)
}

func (e *ApiExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["limit"] = r.URL.Query().Get("limit")
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")

matches := r.URL.Query()["match[]"]
entryMap["number-of-matches"] = len(matches)
matchesStr := strings.Join(matches, ",")

return generateJSONEntryWithTruncatedField(entryMap, "matches", matchesStr)
}

func (e *InstantQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["time"] = r.URL.Query().Get("time")
return generateJSONEntryWithTruncatedField(entryMap, "query", r.URL.Query().Get("query"))
}

func (e *RangedQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")
entryMap["step"] = r.URL.Query().Get("step")
return generateJSONEntryWithTruncatedField(entryMap, "query", r.URL.Query().Get("query"))
}

func generateJSONEntry(entryMap map[string]interface{}) []byte {
jsonEntry, err := json.Marshal(entryMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure that the generated byte entries we always log queries and matchers at the end? If they are too long other parameters might be truncated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were always trimming the query/matchers to fit the rest of the space using trimStringByBytes()

if err != nil {
return []byte{}
}

return jsonEntry
}

func generateJSONEntryWithTruncatedField(entryMap map[string]interface{}, fieldName, fieldValue string) []byte {
entryMap[fieldName] = ""
minEntryJSON := generateJSONEntry(entryMap)
entryMap[fieldName] = trimForJsonMarshal(fieldValue, maxEntrySize-(len(minEntryJSON)+1))
return generateJSONEntry(entryMap)
}

func trimStringByBytes(bytesStr []byte, size int) string {
trimIndex := len(bytesStr)
if size < len(bytesStr) {
for !utf8.RuneStart(bytesStr[size]) {
size--
}
trimIndex = size
}

return string(bytesStr[:trimIndex])
}

func trimForJsonMarshal(field string, size int) string {
fieldValueEncoded, err := json.Marshal(field)
if err != nil {
return ""
}
fieldValueEncoded = fieldValueEncoded[1 : len(fieldValueEncoded)-1]
fieldValueEncodedTrimmed := trimStringByBytes(fieldValueEncoded, size)
fieldValueEncodedTrimmed = "\"" + removeHalfCutEscapeChar(fieldValueEncodedTrimmed) + "\""

Check failure

Code scanning / CodeQL

Potentially unsafe quoting Critical

If this
JSON value
contains a double quote, it could break out of the enclosing quotes.
var fieldValue string
print(fieldValueEncodedTrimmed)
json.Unmarshal([]byte(fieldValueEncodedTrimmed), &fieldValue)

return fieldValue
}

func removeHalfCutEscapeChar(str string) string {
trailingBashslashCount := len(str) - len(strings.TrimRight(str, "\\"))
if trailingBashslashCount%2 == 1 {
str = str[0 : len(str)-1]
}
return str
}
Loading
Loading