Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 65 additions & 134 deletions pkg/apiserver/controllers/v1/decisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/crowdsec/pkg/database/ent"
"github.com/crowdsecurity/crowdsec/pkg/fflag"
"github.com/crowdsecurity/crowdsec/pkg/models"
)

Expand All @@ -37,6 +36,21 @@ func FormatDecisions(decisions []*ent.Decision) []*models.Decision {
return results
}

func formatOneDecision(dbDecision *ent.Decision) *models.Decision {
duration := dbDecision.Until.Sub(time.Now().UTC()).Round(time.Second).String()

return &models.Decision{
ID: int64(dbDecision.ID),
Duration: &duration,
Scenario: &dbDecision.Scenario,
Scope: &dbDecision.Scope,
Value: &dbDecision.Value,
Type: &dbDecision.Type,
Origin: &dbDecision.Origin,
UUID: dbDecision.UUID,
}
}

func (c *Controller) GetDecision(gctx *gin.Context) {
var (
results []*models.Decision
Expand Down Expand Up @@ -140,13 +154,13 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) {
gctx.JSON(http.StatusOK, deleteDecisionResp)
}

func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFunc func(context.Context, map[string][]string) ([]*ent.Decision, error)) error {
// respBuffer := bytes.NewBuffer([]byte{})
func writeStartupDecisions(gctx *gin.Context, now time.Time, filters map[string][]string, dbFunc func(context.Context, time.Time, map[string][]string) ([]*ent.Decision, error)) error {
limit := 30000 // FIXME : make it configurable
needComma := false
lastId := 0

ctx := gctx.Request.Context()
enc := json.NewEncoder(gctx.Writer)

limitStr := strconv.Itoa(limit)
filters["limit"] = []string{limitStr}
Expand All @@ -157,36 +171,29 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun
filters["id_gt"] = []string{lastIdStr}
}

data, err := dbFunc(ctx, filters)
data, err := dbFunc(ctx, now, filters)
if err != nil {
return err
}

if len(data) > 0 {
lastId = data[len(data)-1].ID
for _, d := range data {
if needComma {
gctx.Writer.WriteString(",")
} else {
needComma = true
}

results := FormatDecisions(data)
for _, decision := range results {
decisionJSON, _ := json.Marshal(decision)

if needComma {
// respBuffer.Write([]byte(","))
gctx.Writer.WriteString(",")
} else {
needComma = true
}
//respBuffer.Write(decisionJSON)
//_, err := gctx.Writer.Write(respBuffer.Bytes())
_, err := gctx.Writer.Write(decisionJSON)
if err != nil {
gctx.Writer.Flush()

return err
}
// respBuffer.Reset()
if err := enc.Encode(formatOneDecision(d)); err != nil {
gctx.Writer.Flush()

return err
}
}

if len(data) > 0 {
lastId = data[len(data)-1].ID
}

log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId)

if len(data) < limit {
Expand All @@ -199,13 +206,13 @@ func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFun
return nil
}

func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPull *time.Time, dbFunc func(context.Context, *time.Time, map[string][]string) ([]*ent.Decision, error)) error {
// respBuffer := bytes.NewBuffer([]byte{})
func writeDeltaDecisions(gctx *gin.Context, now time.Time, filters map[string][]string, lastPull *time.Time, dbFunc func(context.Context, time.Time, *time.Time, map[string][]string) ([]*ent.Decision, error)) error {
limit := 30000 // FIXME : make it configurable
needComma := false
lastId := 0

ctx := gctx.Request.Context()
enc := json.NewEncoder(gctx.Writer)

limitStr := strconv.Itoa(limit)
filters["limit"] = []string{limitStr}
Expand All @@ -216,37 +223,30 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul
filters["id_gt"] = []string{lastIdStr}
}

data, err := dbFunc(ctx, lastPull, filters)
data, err := dbFunc(ctx, now, lastPull, filters)
if err != nil {
return err
}

if len(data) > 0 {
lastId = data[len(data)-1].ID
for _, d := range data {
if needComma {
gctx.Writer.WriteString(",")
} else {
needComma = true
}

if err := enc.Encode(formatOneDecision(d)); err != nil {
gctx.Writer.Flush()

results := FormatDecisions(data)
for _, decision := range results {
decisionJSON, _ := json.Marshal(decision)

if needComma {
// respBuffer.Write([]byte(","))
gctx.Writer.WriteString(",")
} else {
needComma = true
}
//respBuffer.Write(decisionJSON)
//_, err := gctx.Writer.Write(respBuffer.Bytes())
_, err := gctx.Writer.Write(decisionJSON)
if err != nil {
gctx.Writer.Flush()

return err
}
// respBuffer.Reset()
return err
}
}

log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId)
if len(data) > 0 {
lastId = data[len(data)-1].ID
}

log.Debugf("delta: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId)

if len(data) < limit {
gctx.Writer.Flush()
Expand All @@ -258,9 +258,11 @@ func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPul
return nil
}

func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error {
func (c *Controller) streamDecisions(gctx *gin.Context, bouncerInfo *ent.Bouncer, filters map[string][]string) error {
var err error

now := time.Now().UTC()

gctx.Writer.Header().Set("Content-Type", "application/json")
gctx.Writer.Header().Set("Transfer-Encoding", "chunked")
gctx.Writer.WriteHeader(http.StatusOK)
Expand All @@ -269,7 +271,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B
// if the blocker just started, return all decisions
if val, ok := gctx.Request.URL.Query()["startup"]; ok && val[0] == "true" {
// Active decisions
err := writeStartupDecisions(gctx, filters, c.DBClient.QueryAllDecisionsWithFilters)
err := writeStartupDecisions(gctx, now, filters, c.DBClient.QueryAllDecisionsWithFilters)
if err != nil {
log.Errorf("failed sending new decisions for startup: %v", err)
gctx.Writer.WriteString(`], "deleted": []}`)
Expand All @@ -280,7 +282,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B

gctx.Writer.WriteString(`], "deleted": [`)
// Expired decisions
err = writeStartupDecisions(gctx, filters, c.DBClient.QueryExpiredDecisionsWithFilters)
err = writeStartupDecisions(gctx, now, filters, c.DBClient.QueryExpiredDecisionsWithFilters)
if err != nil {
log.Errorf("failed sending expired decisions for startup: %v", err)
gctx.Writer.WriteString(`]}`)
Expand All @@ -292,7 +294,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B
gctx.Writer.WriteString(`]}`)
gctx.Writer.Flush()
} else {
err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryNewDecisionsSinceWithFilters)
err = writeDeltaDecisions(gctx, now, filters, bouncerInfo.LastPull, c.DBClient.QueryNewDecisionsSinceWithFilters)
if err != nil {
log.Errorf("failed sending new decisions for delta: %v", err)
gctx.Writer.WriteString(`], "deleted": []}`)
Expand All @@ -303,7 +305,14 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B

gctx.Writer.WriteString(`], "deleted": [`)

err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryExpiredDecisionsSinceWithFilters)
// Use a 2-second overlap to avoid missing decisions that expired around the last pull time
var expiredSince *time.Time
if bouncerInfo.LastPull != nil {
since := bouncerInfo.LastPull.Add(-2 * time.Second)
expiredSince = &since
}

err = writeDeltaDecisions(gctx, now, filters, expiredSince, c.DBClient.QueryExpiredDecisionsSinceWithFilters)
if err != nil {
log.Errorf("failed sending expired decisions for delta: %v", err)
gctx.Writer.WriteString("]}")
Expand All @@ -319,81 +328,7 @@ func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.B
return nil
}

func (c *Controller) StreamDecisionNonChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error {
var (
data []*ent.Decision
err error
)

ctx := gctx.Request.Context()

ret := make(map[string][]*models.Decision, 0)
ret["new"] = []*models.Decision{}
ret["deleted"] = []*models.Decision{}

if val, ok := gctx.Request.URL.Query()["startup"]; ok {
if val[0] == "true" {
data, err = c.DBClient.QueryAllDecisionsWithFilters(ctx, filters)
if err != nil {
log.Errorf("failed querying decisions: %v", err)
gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()})

return err
}
// data = KeepLongestDecision(data)
ret["new"] = FormatDecisions(data)

// getting expired decisions
data, err = c.DBClient.QueryExpiredDecisionsWithFilters(ctx, filters)
if err != nil {
log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err)
gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()})

return err
}

ret["deleted"] = FormatDecisions(data)

gctx.JSON(http.StatusOK, ret)

return nil
}
}

// getting new decisions
data, err = c.DBClient.QueryNewDecisionsSinceWithFilters(ctx, bouncerInfo.LastPull, filters)
if err != nil {
log.Errorf("unable to query new decision for '%s' : %v", bouncerInfo.Name, err)
gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()})

return err
}
// data = KeepLongestDecision(data)
ret["new"] = FormatDecisions(data)

since := time.Time{}
if bouncerInfo.LastPull != nil {
since = bouncerInfo.LastPull.Add(-2 * time.Second)
}

// getting expired decisions
data, err = c.DBClient.QueryExpiredDecisionsSinceWithFilters(ctx, &since, filters) // do we want to give exactly lastPull time ?
if err != nil {
log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err)
gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()})

return err
}

ret["deleted"] = FormatDecisions(data)
gctx.JSON(http.StatusOK, ret)

return nil
}

func (c *Controller) StreamDecision(gctx *gin.Context) {
var err error

streamStartTime := time.Now().UTC()

bouncerInfo, err := getBouncerFromContext(gctx)
Expand All @@ -416,11 +351,7 @@ func (c *Controller) StreamDecision(gctx *gin.Context) {
filters["scopes"] = []string{"ip,range"}
}

if fflag.ChunkedDecisionsStream.IsEnabled() {
err = c.StreamDecisionChunked(gctx, bouncerInfo, streamStartTime, filters)
} else {
err = c.StreamDecisionNonChunked(gctx, bouncerInfo, streamStartTime, filters)
}
err = c.streamDecisions(gctx, bouncerInfo, filters)

if err == nil {
// Only update the last pull time if no error occurred when sending the decisions to avoid missing decisions
Expand Down
33 changes: 18 additions & 15 deletions pkg/database/decisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ type DecisionsByScenario struct {
Type string
}

func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, filter map[string][]string) ([]*ent.Decision, error) {
func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, now time.Time, filter map[string][]string) ([]*ent.Decision, error) {
// Do not select all fields.
// This can get pretty expensive network-wise if there are a lot of decisions and you are using a remote database
query := c.Ent.Decision.Query().
Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID).
Where(
decision.UntilGT(time.Now().UTC()),
decision.UntilGT(now),
)
// Allow a bouncer to ask for non-deduplicated results
if v, ok := filter["dedup"]; !ok || v[0] != "false" {
Expand All @@ -54,26 +54,25 @@ func (c *Client) QueryAllDecisionsWithFilters(ctx context.Context, filter map[st
return data, nil
}

func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, filter map[string][]string) ([]*ent.Decision, error) {
func (c *Client) QueryExpiredDecisionsWithFilters(ctx context.Context, now time.Time, filter map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().
Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID).
Where(
decision.UntilLT(time.Now().UTC()),
decision.UntilLT(now),
)
// Allow a bouncer to ask for non-deduplicated results
if v, ok := filter["dedup"]; !ok || v[0] != "false" {
query = query.Where(longestDecisionForScopeTypeValue)
}

query, err := applyDecisionFilter(query, filter)

query = query.Order(ent.Asc(decision.FieldID))

if err != nil {
c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err)
return []*ent.Decision{}, fmt.Errorf("get expired decisions with filters: %w", QueryFail)
}

query = query.Order(ent.Asc(decision.FieldID))

data, err := query.All(ctx)
if err != nil {
c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err)
Expand Down Expand Up @@ -166,10 +165,12 @@ func longestDecisionForScopeTypeValue(s *sql.Selector) {
)
}

func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilLT(time.Now().UTC()),
)
func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, now time.Time, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().
Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID).
Where(
decision.UntilLT(now),
)

if since != nil {
query = query.Where(decision.UntilGT(*since))
Expand Down Expand Up @@ -197,10 +198,12 @@ func (c *Client) QueryExpiredDecisionsSinceWithFilters(ctx context.Context, sinc
return data, nil
}

func (c *Client) QueryNewDecisionsSinceWithFilters(ctx context.Context, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().Where(
decision.UntilGT(time.Now().UTC()),
)
func (c *Client) QueryNewDecisionsSinceWithFilters(ctx context.Context, now time.Time, since *time.Time, filter map[string][]string) ([]*ent.Decision, error) {
query := c.Ent.Decision.Query().
Select(decision.FieldID, decision.FieldUntil, decision.FieldScenario, decision.FieldScope, decision.FieldValue, decision.FieldType, decision.FieldOrigin, decision.FieldUUID).
Where(
decision.UntilGT(now),
)

errorMsg := "new decisions"

Expand Down
Loading
Loading