From b7797e0ecb928e29b01beb46692b66051b808f75 Mon Sep 17 00:00:00 2001 From: Sebastien Blot Date: Tue, 7 Apr 2026 16:56:40 +0200 Subject: [PATCH 1/3] decisions stream: always use chunked transfer + avoid useless copies --- pkg/apiserver/controllers/v1/decisions.go | 182 +++++++--------------- pkg/database/decisions.go | 16 +- pkg/fflag/crowdsec.go | 2 +- 3 files changed, 66 insertions(+), 134 deletions(-) diff --git a/pkg/apiserver/controllers/v1/decisions.go b/pkg/apiserver/controllers/v1/decisions.go index 86dd9845071..f3f8810277a 100644 --- a/pkg/apiserver/controllers/v1/decisions.go +++ b/pkg/apiserver/controllers/v1/decisions.go @@ -11,7 +11,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/crowdsecurity/crowdsec/pkg/database/ent" - "github.com/crowdsecurity/crowdsec/pkg/fflag" "github.com/crowdsecurity/crowdsec/pkg/models" ) @@ -37,6 +36,21 @@ func FormatDecisions(decisions []*ent.Decision) []*models.Decision { return results } +func formatOneDecision(dbDecision *ent.Decision) *models.Decision { + duration := dbDecision.Until.Sub(time.Now().UTC()).Round(time.Second).String() + + return &models.Decision{ + ID: int64(dbDecision.ID), + Duration: &duration, + Scenario: &dbDecision.Scenario, + Scope: &dbDecision.Scope, + Value: &dbDecision.Value, + Type: &dbDecision.Type, + Origin: &dbDecision.Origin, + UUID: dbDecision.UUID, + } +} + func (c *Controller) GetDecision(gctx *gin.Context) { var ( results []*models.Decision @@ -141,12 +155,12 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) { } func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFunc func(context.Context, map[string][]string) ([]*ent.Decision, error)) error { - // respBuffer := bytes.NewBuffer([]byte{}) limit := 30000 // FIXME : make it configurable needComma := false lastId := 0 ctx := gctx.Request.Context() + enc := json.NewEncoder(gctx.Writer) limitStr := strconv.Itoa(limit) filters["limit"] = []string{limitStr} @@ -162,31 +176,24 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun return err } - if len(data) > 0 { - lastId = data[len(data)-1].ID + for _, d := range data { + if needComma { + gctx.Writer.WriteString(",") + } else { + needComma = true + } - results := FormatDecisions(data) - for _, decision := range results { - decisionJSON, _ := json.Marshal(decision) - - if needComma { - // respBuffer.Write([]byte(",")) - gctx.Writer.WriteString(",") - } else { - needComma = true - } - //respBuffer.Write(decisionJSON) - //_, err := gctx.Writer.Write(respBuffer.Bytes()) - _, err := gctx.Writer.Write(decisionJSON) - if err != nil { - gctx.Writer.Flush() - - return err - } - // respBuffer.Reset() + if err := enc.Encode(formatOneDecision(d)); err != nil { + gctx.Writer.Flush() + + return err } } + if len(data) > 0 { + lastId = data[len(data)-1].ID + } + log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) if len(data) < limit { @@ -200,12 +207,12 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun } func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPull *time.Time, dbFunc func(context.Context, *time.Time, map[string][]string) ([]*ent.Decision, error)) error { - // respBuffer := bytes.NewBuffer([]byte{}) limit := 30000 // FIXME : make it configurable needComma := false lastId := 0 ctx := gctx.Request.Context() + enc := json.NewEncoder(gctx.Writer) limitStr := strconv.Itoa(limit) filters["limit"] = []string{limitStr} @@ -221,32 +228,25 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul return err } - if len(data) > 0 { - lastId = data[len(data)-1].ID + for _, d := range data { + if needComma { + gctx.Writer.WriteString(",") + } else { + needComma = true + } + + if err := enc.Encode(formatOneDecision(d)); err != nil { + gctx.Writer.Flush() - results := FormatDecisions(data) - for _, decision := range results { - decisionJSON, _ := json.Marshal(decision) - - if needComma { - // respBuffer.Write([]byte(",")) - gctx.Writer.WriteString(",") - } else { - needComma = true - } - //respBuffer.Write(decisionJSON) - //_, err := gctx.Writer.Write(respBuffer.Bytes()) - _, err := gctx.Writer.Write(decisionJSON) - if err != nil { - gctx.Writer.Flush() - - return err - } - // respBuffer.Reset() + return err } } - log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) + if len(data) > 0 { + lastId = data[len(data)-1].ID + } + + log.Debugf("delta: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) if len(data) < limit { gctx.Writer.Flush() @@ -258,7 +258,7 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul return nil } -func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error { +func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer, filters map[string][]string) error { var err error gctx.Writer.Header().Set("Content-Type", "application/json") @@ -303,7 +303,13 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B gctx.Writer.WriteString(`], "deleted": [`) - err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryExpiredDecisionsSinceWithFilters) + var expiredSince *time.Time + if bouncerInfo.LastPull != nil { + since := bouncerInfo.LastPull.Add(-2 * time.Second) + expiredSince = &since + } + + err = writeDeltaDecisions(gctx, filters, expiredSince, c.DBClient.QueryExpiredDecisionsSinceWithFilters) if err != nil { log.Errorf("failed sending expired decisions for delta: %v", err) gctx.Writer.WriteString("]}") @@ -319,81 +325,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B return nil } -func (c *Controller) StreamDecisionNonChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error { - var ( - data []*ent.Decision - err error - ) - - ctx := gctx.Request.Context() - - ret := make(map[string][]*models.Decision, 0) - ret["new"] = []*models.Decision{} - ret["deleted"] = []*models.Decision{} - - if val, ok := gctx.Request.URL.Query()["startup"]; ok { - if val[0] == "true" { - data, err = c.DBClient.QueryAllDecisionsWithFilters(ctx, filters) - if err != nil { - log.Errorf("failed querying decisions: %v", err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - // data = KeepLongestDecision(data) - ret["new"] = FormatDecisions(data) - - // getting expired decisions - data, err = c.DBClient.QueryExpiredDecisionsWithFilters(ctx, filters) - if err != nil { - log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - - ret["deleted"] = FormatDecisions(data) - - gctx.JSON(http.StatusOK, ret) - - return nil - } - } - - // getting new decisions - data, err = c.DBClient.QueryNewDecisionsSinceWithFilters(ctx, bouncerInfo.LastPull, filters) - if err != nil { - log.Errorf("unable to query new decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - // data = KeepLongestDecision(data) - ret["new"] = FormatDecisions(data) - - since := time.Time{} - if bouncerInfo.LastPull != nil { - since = bouncerInfo.LastPull.Add(-2 * time.Second) - } - - // getting expired decisions - data, err = c.DBClient.QueryExpiredDecisionsSinceWithFilters(ctx, &since, filters) // do we want to give exactly lastPull time ? - if err != nil { - log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - - ret["deleted"] = FormatDecisions(data) - gctx.JSON(http.StatusOK, ret) - - return nil -} - func (c *Controller) StreamDecision(gctx *gin.Context) { - var err error - streamStartTime := time.Now().UTC() bouncerInfo, err := getBouncerFromContext(gctx) @@ -416,11 +348,7 @@ func (c *Controller) StreamDecision(gctx *gin.Context) { filters["scopes"] = []string{"ip,range"} } - if fflag.ChunkedDecisionsStream.IsEnabled() { - err = c.StreamDecisionChunked(gctx, bouncerInfo, streamStartTime, filters) - } else { - err = c.StreamDecisionNonChunked(gctx, bouncerInfo, streamStartTime, filters) - } + err = c.streamDecisions(gctx, bouncerInfo, filters) if err == nil { // Only update the last pull time if no error occurred when sending the decisions to avoid missing decisions diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index 1ec7a2b1784..301172cc39e 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -167,9 +167,11 @@ func longestDecisionForScopeTypeValue(s *sql.Selector) { } func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { - query := c.Ent.Decision.Query().Where( - decision.UntilLT(time.Now().UTC()), - ) + query := c.Ent.Decision.Query(). + Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). + Where( + decision.UntilLT(time.Now().UTC()), + ) if since != nil { query = query.Where(decision.UntilGT(*since)) @@ -198,9 +200,11 @@ func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, sinc } func (c *Client) QueryNewDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { - query := c.Ent.Decision.Query().Where( - decision.UntilGT(time.Now().UTC()), - ) + query := c.Ent.Decision.Query(). + Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). + Where( + decision.UntilGT(time.Now().UTC()), + ) errorMsg := "new decisions" diff --git a/pkg/fflag/crowdsec.go b/pkg/fflag/crowdsec.go index c8b71a97dc2..8a5788c7478 100644 --- a/pkg/fflag/crowdsec.go +++ b/pkg/fflag/crowdsec.go @@ -6,7 +6,7 @@ var Crowdsec = FeatureRegister{EnvPrefix: "CROWDSEC_FEATURE_"} var ( DisableHttpRetryBackoff = &Feature{Name: "disable_http_retry_backoff", Description: "Disable http retry backoff"} - ChunkedDecisionsStream = &Feature{Name: "chunked_decisions_stream", Description: "Enable chunked decisions stream"} + ChunkedDecisionsStream = &Feature{Name: "chunked_decisions_stream", Description: "Enable chunked decisions stream", State: DeprecatedState} PapiClient = &Feature{Name: "papi_client", Description: "Enable Polling API client", State: DeprecatedState} // The state will be set to deprecated for linux only. Re2GrokSupport = &Feature{Name: "re2_grok_support", Description: "Enable RE2 support for GROK patterns"} From 0ec860e7dd3aedd5d0c6fbc39e4e99102f09994e Mon Sep 17 00:00:00 2001 From: Sebastien Blot Date: Tue, 7 Apr 2026 17:15:48 +0200 Subject: [PATCH 2/3] check error before order by --- pkg/database/decisions.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index 301172cc39e..8eede6efdbf 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -66,14 +66,13 @@ func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, filter ma } query, err := applyDecisionFilter(query, filter) - - query = query.Order(ent.Asc(decision.FieldID)) - if err != nil { c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err) return []*ent.Decision{}, fmt.Errorf("get expired decisions with filters: %w", QueryFail) } + query = query.Order(ent.Asc(decision.FieldID)) + data, err := query.All(ctx) if err != nil { c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err) From 4c67090a45721a2d985787ec0b97730207a1e08a Mon Sep 17 00:00:00 2001 From: Sebastien Blot Date: Tue, 7 Apr 2026 17:17:04 +0200 Subject: [PATCH 3/3] use same time.Now() for entire decision stream --- pkg/apiserver/controllers/v1/decisions.go | 19 +++++++++++-------- pkg/database/decisions.go | 16 ++++++++-------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/apiserver/controllers/v1/decisions.go b/pkg/apiserver/controllers/v1/decisions.go index f3f8810277a..2ace800b520 100644 --- a/pkg/apiserver/controllers/v1/decisions.go +++ b/pkg/apiserver/controllers/v1/decisions.go @@ -154,7 +154,7 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) { gctx.JSON(http.StatusOK, deleteDecisionResp) } -func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFunc func(context.Context, map[string][]string) ([]*ent.Decision, error)) error { +func writeStartupDecisions(gctx *gin.Context, now time.Time, filters map[string][]string, dbFunc func(context.Context, time.Time, map[string][]string) ([]*ent.Decision, error)) error { limit := 30000 // FIXME : make it configurable needComma := false lastId := 0 @@ -171,7 +171,7 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun filters["id_gt"] = []string{lastIdStr} } - data, err := dbFunc(ctx, filters) + data, err := dbFunc(ctx, now, filters) if err != nil { return err } @@ -206,7 +206,7 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun return nil } -func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPull *time.Time, dbFunc func(context.Context, *time.Time, map[string][]string) ([]*ent.Decision, error)) error { +func writeDeltaDecisions(gctx *gin.Context, now time.Time, filters map[string][]string, lastPull *time.Time, dbFunc func(context.Context, time.Time, *time.Time, map[string][]string) ([]*ent.Decision, error)) error { limit := 30000 // FIXME : make it configurable needComma := false lastId := 0 @@ -223,7 +223,7 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul filters["id_gt"] = []string{lastIdStr} } - data, err := dbFunc(ctx, lastPull, filters) + data, err := dbFunc(ctx, now, lastPull, filters) if err != nil { return err } @@ -261,6 +261,8 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer, filters map[string][]string) error { var err error + now := time.Now().UTC() + gctx.Writer.Header().Set("Content-Type", "application/json") gctx.Writer.Header().Set("Transfer-Encoding", "chunked") gctx.Writer.WriteHeader(http.StatusOK) @@ -269,7 +271,7 @@ func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer // if the blocker just started, return all decisions if val, ok := gctx.Request.URL.Query()["startup"]; ok && val[0] == "true" { // Active decisions - err := writeStartupDecisions(gctx, filters, c.DBClient.QueryAllDecisionsWithFilters) + err := writeStartupDecisions(gctx, now, filters, c.DBClient.QueryAllDecisionsWithFilters) if err != nil { log.Errorf("failed sending new decisions for startup: %v", err) gctx.Writer.WriteString(`], "deleted": []}`) @@ -280,7 +282,7 @@ func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer gctx.Writer.WriteString(`], "deleted": [`) // Expired decisions - err = writeStartupDecisions(gctx, filters, c.DBClient.QueryExpiredDecisionsWithFilters) + err = writeStartupDecisions(gctx, now, filters, c.DBClient.QueryExpiredDecisionsWithFilters) if err != nil { log.Errorf("failed sending expired decisions for startup: %v", err) gctx.Writer.WriteString(`]}`) @@ -292,7 +294,7 @@ func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer gctx.Writer.WriteString(`]}`) gctx.Writer.Flush() } else { - err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryNewDecisionsSinceWithFilters) + err = writeDeltaDecisions(gctx, now, filters, bouncerInfo.LastPull, c.DBClient.QueryNewDecisionsSinceWithFilters) if err != nil { log.Errorf("failed sending new decisions for delta: %v", err) gctx.Writer.WriteString(`], "deleted": []}`) @@ -303,13 +305,14 @@ func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer gctx.Writer.WriteString(`], "deleted": [`) + // Use a 2-second overlap to avoid missing decisions that expired around the last pull time var expiredSince *time.Time if bouncerInfo.LastPull != nil { since := bouncerInfo.LastPull.Add(-2 * time.Second) expiredSince = &since } - err = writeDeltaDecisions(gctx, filters, expiredSince, c.DBClient.QueryExpiredDecisionsSinceWithFilters) + err = writeDeltaDecisions(gctx, now, filters, expiredSince, c.DBClient.QueryExpiredDecisionsSinceWithFilters) if err != nil { log.Errorf("failed sending expired decisions for delta: %v", err) gctx.Writer.WriteString("]}") diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index 8eede6efdbf..2f2534ed235 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -24,13 +24,13 @@ type DecisionsByScenario struct { Type string } -func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, filter map[string][]string) ([]*ent.Decision, error) { +func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, now time.Time, filter map[string][]string) ([]*ent.Decision, error) { // Do not select all fields. // This can get pretty expensive network-wise if there are a lot of decisions and you are using a remote database query := c.Ent.Decision.Query(). Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). Where( - decision.UntilGT(time.Now().UTC()), + decision.UntilGT(now), ) // Allow a bouncer to ask for non-deduplicated results if v, ok := filter["dedup"]; !ok || v[0] != "false" { @@ -54,11 +54,11 @@ func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, filter map[st return data, nil } -func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, filter map[string][]string) ([]*ent.Decision, error) { +func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, now time.Time, filter map[string][]string) ([]*ent.Decision, error) { query := c.Ent.Decision.Query(). Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). Where( - decision.UntilLT(time.Now().UTC()), + decision.UntilLT(now), ) // Allow a bouncer to ask for non-deduplicated results if v, ok := filter["dedup"]; !ok || v[0] != "false" { @@ -165,11 +165,11 @@ func longestDecisionForScopeTypeValue(s *sql.Selector) { ) } -func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { +func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, now time.Time, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { query := c.Ent.Decision.Query(). Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). Where( - decision.UntilLT(time.Now().UTC()), + decision.UntilLT(now), ) if since != nil { @@ -198,11 +198,11 @@ func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, sinc return data, nil } -func (c *Client) QueryNewDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { +func (c *Client) QueryNewDecisionsSinceWithFilters(ctx context.Context, now time.Time, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { query := c.Ent.Decision.Query(). Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). Where( - decision.UntilGT(time.Now().UTC()), + decision.UntilGT(now), ) errorMsg := "new decisions"