Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions pkg/leakybucket/manager_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import (
"context"
"fmt"
"runtime"
"testing"
"time"

log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)

Expand Down Expand Up @@ -106,6 +108,92 @@
}
}

// TestRaceSetMetaAndDistinct demonstrates a known data race between the pour goroutine
// writing to evt.Meta (via SetMeta in a filter) and a bucket goroutine reading
// from evt.Meta (via a distinct expression).
//
// The race occurs because events are sent to bucket goroutines by pointer without
// cloning the underlying maps. When the pour goroutine continues evaluating filters
// for subsequent holders (which may call SetMeta), it writes to the same Meta map
// that a previously-matched bucket's goroutine is concurrently reading.
//
// To verify the race still exists, run:
//
// go test -race -run TestRaceSetMetaAndDistinct ./pkg/leakybucket/ -count=1 -failfast
//
// See also: pkg/leakybucket/manager_run.go PourItemToBucket() line ~136
func TestRaceSetMetaAndDistinct(t *testing.T) {
t.Skip("Known race condition: concurrent Meta map access between pour and bucket goroutines (see comment above)")

if err := exprhelpers.Init(nil); err != nil {
t.Fatal(err)
}

bucketStore := NewBucketStore()
ctx := t.Context()

// holders[0]: leaky with distinct that READS evt.Meta
// holders[1]: leaky with filter that WRITES to evt.Meta via SetMeta
//
// The pour loop processes holders sequentially. After sending the event
// to bucket[0] (unbuffered channel), the pour goroutine continues to
// evaluate holders[1]'s filter (which calls SetMeta). Meanwhile,
// bucket[0]'s goroutine processes OnBucketPour and evaluates the
// distinct expression (which reads evt.Meta). Both access the same
// underlying map concurrently.
holders := []BucketFactory{
{
Spec: BucketSpec{
Name: "test_read_meta",
Description: "bucket with distinct that reads Meta",
Type: "leaky",
Capacity: 100,
LeakSpeed: "1m",
Filter: "true",
Distinct: `evt.Meta.target_techno`,
},
},
{
Spec: BucketSpec{
Name: "test_write_meta",
Description: "bucket with filter that writes Meta",
Type: "leaky",
Capacity: 100,
LeakSpeed: "1m",
Filter: `evt.SetMeta("target_techno", "test_value")`,
},
},
}

for idx := range holders {
if err := holders[idx].LoadBucket(); err != nil {
t.Fatalf("while loading (%d/%d): %s", idx, len(holders), err)
}
if err := holders[idx].Validate(); err != nil {
t.Fatalf("while validating (%d/%d): %s", idx, len(holders), err)
}
}

// Pour many events to trigger the race window.
// The race occurs between the pour goroutine (SetMeta write) and
// the bucket goroutine (distinct expression read) on the shared Meta map.
for i := 0; i < 1000; i++ {

Check failure on line 180 in pkg/leakybucket/manager_run_test.go

View workflow job for this annotation

GitHub Actions / Build + tests

for loop can be changed to use an integer range (Go 1.22+) (intrange)

Check failure on line 180 in pkg/leakybucket/manager_run_test.go

View workflow job for this annotation

GitHub Actions / Build + tests

for loop can be changed to use an integer range (Go 1.22+) (intrange)
in := pipeline.Event{
Parsed: map[string]string{"something": "something"},
Meta: map[string]string{"source_ip": "1.2.3.4"},
}
_, err := PourItemToHolders(ctx, in, holders, bucketStore, nil)
if err != nil {
t.Fatalf("while pouring item %d: %s", i, err)
}
// Yield to give bucket goroutines a chance to run concurrently
runtime.Gosched()
}

// Give bucket goroutines time to finish processing
time.Sleep(500 * time.Millisecond)
}

func TestShutdownBuckets(t *testing.T) {
var (
bucketStore = NewBucketStore()
Expand Down
Loading