diff --git a/pkg/apiserver/controllers/v1/decisions.go b/pkg/apiserver/controllers/v1/decisions.go index 86dd9845071..2ace800b520 100644 --- a/pkg/apiserver/controllers/v1/decisions.go +++ b/pkg/apiserver/controllers/v1/decisions.go @@ -11,7 +11,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/crowdsecurity/crowdsec/pkg/database/ent" - "github.com/crowdsecurity/crowdsec/pkg/fflag" "github.com/crowdsecurity/crowdsec/pkg/models" ) @@ -37,6 +36,21 @@ func FormatDecisions(decisions []*ent.Decision) []*models.Decision { return results } +func formatOneDecision(dbDecision *ent.Decision) *models.Decision { + duration := dbDecision.Until.Sub(time.Now().UTC()).Round(time.Second).String() + + return &models.Decision{ + ID: int64(dbDecision.ID), + Duration: &duration, + Scenario: &dbDecision.Scenario, + Scope: &dbDecision.Scope, + Value: &dbDecision.Value, + Type: &dbDecision.Type, + Origin: &dbDecision.Origin, + UUID: dbDecision.UUID, + } +} + func (c *Controller) GetDecision(gctx *gin.Context) { var ( results []*models.Decision @@ -140,13 +154,13 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) { gctx.JSON(http.StatusOK, deleteDecisionResp) } -func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFunc func(context.Context, map[string][]string) ([]*ent.Decision, error)) error { - // respBuffer := bytes.NewBuffer([]byte{}) +func writeStartupDecisions(gctx *gin.Context, now time.Time, filters map[string][]string, dbFunc func(context.Context, time.Time, map[string][]string) ([]*ent.Decision, error)) error { limit := 30000 // FIXME : make it configurable needComma := false lastId := 0 ctx := gctx.Request.Context() + enc := json.NewEncoder(gctx.Writer) limitStr := strconv.Itoa(limit) filters["limit"] = []string{limitStr} @@ -157,36 +171,29 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun filters["id_gt"] = []string{lastIdStr} } - data, err := dbFunc(ctx, filters) + data, err := dbFunc(ctx, now, filters) if err != nil { return err } - if len(data) > 0 { - lastId = data[len(data)-1].ID + for _, d := range data { + if needComma { + gctx.Writer.WriteString(",") + } else { + needComma = true + } - results := FormatDecisions(data) - for _, decision := range results { - decisionJSON, _ := json.Marshal(decision) - - if needComma { - // respBuffer.Write([]byte(",")) - gctx.Writer.WriteString(",") - } else { - needComma = true - } - //respBuffer.Write(decisionJSON) - //_, err := gctx.Writer.Write(respBuffer.Bytes()) - _, err := gctx.Writer.Write(decisionJSON) - if err != nil { - gctx.Writer.Flush() - - return err - } - // respBuffer.Reset() + if err := enc.Encode(formatOneDecision(d)); err != nil { + gctx.Writer.Flush() + + return err } } + if len(data) > 0 { + lastId = data[len(data)-1].ID + } + log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) if len(data) < limit { @@ -199,13 +206,13 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun return nil } -func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPull *time.Time, dbFunc func(context.Context, *time.Time, map[string][]string) ([]*ent.Decision, error)) error { - // respBuffer := bytes.NewBuffer([]byte{}) +func writeDeltaDecisions(gctx *gin.Context, now time.Time, filters map[string][]string, lastPull *time.Time, dbFunc func(context.Context, time.Time, *time.Time, map[string][]string) ([]*ent.Decision, error)) error { limit := 30000 // FIXME : make it configurable needComma := false lastId := 0 ctx := gctx.Request.Context() + enc := json.NewEncoder(gctx.Writer) limitStr := strconv.Itoa(limit) filters["limit"] = []string{limitStr} @@ -216,37 +223,30 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul filters["id_gt"] = []string{lastIdStr} } - data, err := dbFunc(ctx, lastPull, filters) + data, err := dbFunc(ctx, now, lastPull, filters) if err != nil { return err } - if len(data) > 0 { - lastId = data[len(data)-1].ID + for _, d := range data { + if needComma { + gctx.Writer.WriteString(",") + } else { + needComma = true + } + + if err := enc.Encode(formatOneDecision(d)); err != nil { + gctx.Writer.Flush() - results := FormatDecisions(data) - for _, decision := range results { - decisionJSON, _ := json.Marshal(decision) - - if needComma { - // respBuffer.Write([]byte(",")) - gctx.Writer.WriteString(",") - } else { - needComma = true - } - //respBuffer.Write(decisionJSON) - //_, err := gctx.Writer.Write(respBuffer.Bytes()) - _, err := gctx.Writer.Write(decisionJSON) - if err != nil { - gctx.Writer.Flush() - - return err - } - // respBuffer.Reset() + return err } } - log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) + if len(data) > 0 { + lastId = data[len(data)-1].ID + } + + log.Debugf("delta: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) if len(data) < limit { gctx.Writer.Flush() @@ -258,9 +258,11 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul return nil } -func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error { +func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer, filters map[string][]string) error { var err error + now := time.Now().UTC() + gctx.Writer.Header().Set("Content-Type", "application/json") gctx.Writer.Header().Set("Transfer-Encoding", "chunked") gctx.Writer.WriteHeader(http.StatusOK) @@ -269,7 +271,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B // if the blocker just started, return all decisions if val, ok := gctx.Request.URL.Query()["startup"]; ok && val[0] == "true" { // Active decisions - err := writeStartupDecisions(gctx, filters, c.DBClient.QueryAllDecisionsWithFilters) + err := writeStartupDecisions(gctx, now, filters, c.DBClient.QueryAllDecisionsWithFilters) if err != nil { log.Errorf("failed sending new decisions for startup: %v", err) gctx.Writer.WriteString(`], "deleted": []}`) @@ -280,7 +282,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B gctx.Writer.WriteString(`], "deleted": [`) // Expired decisions - err = writeStartupDecisions(gctx, filters, c.DBClient.QueryExpiredDecisionsWithFilters) + err = writeStartupDecisions(gctx, now, filters, c.DBClient.QueryExpiredDecisionsWithFilters) if err != nil { log.Errorf("failed sending expired decisions for startup: %v", err) gctx.Writer.WriteString(`]}`) @@ -292,7 +294,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B gctx.Writer.WriteString(`]}`) gctx.Writer.Flush() } else { - err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryNewDecisionsSinceWithFilters) + err = writeDeltaDecisions(gctx, now, filters, bouncerInfo.LastPull, c.DBClient.QueryNewDecisionsSinceWithFilters) if err != nil { log.Errorf("failed sending new decisions for delta: %v", err) gctx.Writer.WriteString(`], "deleted": []}`) @@ -303,7 +305,14 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B gctx.Writer.WriteString(`], "deleted": [`) - err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryExpiredDecisionsSinceWithFilters) + // Use a 2-second overlap to avoid missing decisions that expired around the last pull time + var expiredSince *time.Time + if bouncerInfo.LastPull != nil { + since := bouncerInfo.LastPull.Add(-2 * time.Second) + expiredSince = &since + } + + err = writeDeltaDecisions(gctx, now, filters, expiredSince, c.DBClient.QueryExpiredDecisionsSinceWithFilters) if err != nil { log.Errorf("failed sending expired decisions for delta: %v", err) gctx.Writer.WriteString("]}") @@ -319,81 +328,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B return nil } -func (c *Controller) StreamDecisionNonChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error { - var ( - data []*ent.Decision - err error - ) - - ctx := gctx.Request.Context() - - ret := make(map[string][]*models.Decision, 0) - ret["new"] = []*models.Decision{} - ret["deleted"] = []*models.Decision{} - - if val, ok := gctx.Request.URL.Query()["startup"]; ok { - if val[0] == "true" { - data, err = c.DBClient.QueryAllDecisionsWithFilters(ctx, filters) - if err != nil { - log.Errorf("failed querying decisions: %v", err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - // data = KeepLongestDecision(data) - ret["new"] = FormatDecisions(data) - - // getting expired decisions - data, err = c.DBClient.QueryExpiredDecisionsWithFilters(ctx, filters) - if err != nil { - log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - - ret["deleted"] = FormatDecisions(data) - - gctx.JSON(http.StatusOK, ret) - - return nil - } - } - - // getting new decisions - data, err = c.DBClient.QueryNewDecisionsSinceWithFilters(ctx, bouncerInfo.LastPull, filters) - if err != nil { - log.Errorf("unable to query new decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - // data = KeepLongestDecision(data) - ret["new"] = FormatDecisions(data) - - since := time.Time{} - if bouncerInfo.LastPull != nil { - since = bouncerInfo.LastPull.Add(-2 * time.Second) - } - - // getting expired decisions - data, err = c.DBClient.QueryExpiredDecisionsSinceWithFilters(ctx, &since, filters) // do we want to give exactly lastPull time ? - if err != nil { - log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - - return err - } - - ret["deleted"] = FormatDecisions(data) - gctx.JSON(http.StatusOK, ret) - - return nil -} - func (c *Controller) StreamDecision(gctx *gin.Context) { - var err error - streamStartTime := time.Now().UTC() bouncerInfo, err := getBouncerFromContext(gctx) @@ -416,11 +351,7 @@ func (c *Controller) StreamDecision(gctx *gin.Context) { filters["scopes"] = []string{"ip,range"} } - if fflag.ChunkedDecisionsStream.IsEnabled() { - err = c.StreamDecisionChunked(gctx, bouncerInfo, streamStartTime, filters) - } else { - err = c.StreamDecisionNonChunked(gctx, bouncerInfo, streamStartTime, filters) - } + err = c.streamDecisions(gctx, bouncerInfo, filters) if err == nil { // Only update the last pull time if no error occurred when sending the decisions to avoid missing decisions diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index 1ec7a2b1784..2f2534ed235 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -24,13 +24,13 @@ type DecisionsByScenario struct { Type string } -func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, filter map[string][]string) ([]*ent.Decision, error) { +func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, now time.Time, filter map[string][]string) ([]*ent.Decision, error) { // Do not select all fields. // This can get pretty expensive network-wise if there are a lot of decisions and you are using a remote database query := c.Ent.Decision.Query(). Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). Where( - decision.UntilGT(time.Now().UTC()), + decision.UntilGT(now), ) // Allow a bouncer to ask for non-deduplicated results if v, ok := filter["dedup"]; !ok || v[0] != "false" { @@ -54,11 +54,11 @@ func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, filter map[st return data, nil } -func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, filter map[string][]string) ([]*ent.Decision, error) { +func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, now time.Time, filter map[string][]string) ([]*ent.Decision, error) { query := c.Ent.Decision.Query(). Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). Where( - decision.UntilLT(time.Now().UTC()), + decision.UntilLT(now), ) // Allow a bouncer to ask for non-deduplicated results if v, ok := filter["dedup"]; !ok || v[0] != "false" { @@ -66,14 +66,13 @@ func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, filter ma } query, err := applyDecisionFilter(query, filter) - - query = query.Order(ent.Asc(decision.FieldID)) - if err != nil { c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err) return []*ent.Decision{}, fmt.Errorf("get expired decisions with filters: %w", QueryFail) } + query = query.Order(ent.Asc(decision.FieldID)) + data, err := query.All(ctx) if err != nil { c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err) @@ -166,10 +165,12 @@ func longestDecisionForScopeTypeValue(s *sql.Selector) { ) } -func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { - query := c.Ent.Decision.Query().Where( - decision.UntilLT(time.Now().UTC()), - ) +func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, now time.Time, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { + query := c.Ent.Decision.Query(). + Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). + Where( + decision.UntilLT(now), + ) if since != nil { query = query.Where(decision.UntilGT(*since)) @@ -197,10 +198,12 @@ func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, sinc return data, nil } -func (c *Client) QueryNewDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { - query := c.Ent.Decision.Query().Where( - decision.UntilGT(time.Now().UTC()), - ) +func (c *Client) QueryNewDecisionsSinceWithFilters(ctx context.Context, now time.Time, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) { + query := c.Ent.Decision.Query(). + Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID). + Where( + decision.UntilGT(now), + ) errorMsg := "new decisions" diff --git a/pkg/fflag/crowdsec.go b/pkg/fflag/crowdsec.go index c8b71a97dc2..8a5788c7478 100644 --- a/pkg/fflag/crowdsec.go +++ b/pkg/fflag/crowdsec.go @@ -6,7 +6,7 @@ var Crowdsec = FeatureRegister{EnvPrefix: "CROWDSEC_FEATURE_"} var ( DisableHttpRetryBackoff = &Feature{Name: "disable_http_retry_backoff", Description: "Disable http retry backoff"} - ChunkedDecisionsStream = &Feature{Name: "chunked_decisions_stream", Description: "Enable chunked decisions stream"} + ChunkedDecisionsStream = &Feature{Name: "chunked_decisions_stream", Description: "Enable chunked decisions stream", State: DeprecatedState} PapiClient = &Feature{Name: "papi_client", Description: "Enable Polling API client", State: DeprecatedState} // The state will be set to deprecated for linux only. Re2GrokSupport = &Feature{Name: "re2_grok_support", Description: "Enable RE2 support for GROK patterns"}