diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index a52b03f..d9bebf9 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -11,12 +11,13 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.22.7 + go-version: 1.25.5 - name: Build run: go build -v ./... - name: Run tests - run: | - go test -v ..\..\pkg\consumer\ - go test -v ..\..\pkg\producer\ + run: go test -v ./pkg/consumer/... ./pkg/producer/... + + - name: Run all tests + run: go test -v ./... diff --git a/README.md b/README.md index 43290db..de47817 100644 --- a/README.md +++ b/README.md @@ -9,136 +9,250 @@ ## Description -Redsumer is a GO library that provides a simple way to consume and produce messages from a Redis Stream. It is designed to be used in a microservices architecture, where a service needs to consume messages from a Redis Stream and process them. It is built on top of the [go-redis]("https://github.com/redis/go-redis") library. +Redsumer is a Go library that abstracts Redis Stream consumption. It provides horizontal scalability, adaptive protection against Redis overload when the queue is idle or stalled, priority for new messages when the PEL is not progressing, and a simple contract: the library handles infrastructure, the user handles business logic. -## Installation +Built on top of [valkey-go](https://github.com/valkey-io/valkey-go). + +## Features -Use the package manager [go get](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) to install Redsumer. +- **Adaptive ratio** — interleaves new messages and PEL processing at a configurable ratio; automatically reduces PEL attempt frequency when the PEL is stalled +- **Adaptive backoff** — exponential-style wait when the queue is completely empty, configurable via a slice of durations +- **PEL stall detection** — compares PEL size across full XAUTOCLAIM traversals and adjusts both ratio and wait accordingly +- **Blocking XREADGROUP** — efficient wait for new messages without busy-polling +- **Auto group creation and recreation** — creates the consumer group on startup; recreates it automatically if deleted at runtime +- **Context-aware sleeps** — all waits respect context cancellation for clean shutdown +- **Horizontal scaling** — multiple instances under the same consumer group, Redis guarantees each message is delivered to exactly one instance +- **Observability** — `Stats()` exposes current adaptive state indices + +## Installation ```bash -go get github.com/enerBit/redsumer +go get github.com/enerBit/redsumer/v4 ``` -## Usage - -### Consuming messages from a Redis Stream +## Consumer usage -```golang +```go package main import ( - "context" + "context" "fmt" - "time" + "log" - "github.com/enerBit/redsumer/v3/pkg/client" - "github.com/enerBit/redsumer/v3/pkg/consumer" + "github.com/enerBit/redsumer/v4/pkg/client" + "github.com/enerBit/redsumer/v4/pkg/consumer" ) func main() { - // Redis client configuration - redisArgs := client.RedisArgs{ - RedisHost: "localhost", - RedisPort: 6379, - Db: 0, - } + ctx := context.Background() + + c := &consumer.Consumer{ + // Redis connection + Client: &client.ClientArgs{ + Host: "localhost", + Port: "6379", + }, - var claimBatchSize int64 = 1 - var pendingBatchSize int64 = 1 - consumerArgs := consumer.ConsumerArgs{ - // stream, group and consumer names - StreamName: "stream_name", - GroupName: "group_name", - ConsumerName: "consumer_name", - // batch of messages to new messages - BatchSize: 1, - // batch of messages to claim, if is nil, it will dont claim messages - ClaimBatchSize: &claimBatchSize, - // batch of messages to pending, if is nil, it will dont pending messages - PendingBatchSize: &pendingBatchSize, - // time to block the connection - Block: time.Millisecond * 1, - // MinDurationToClaim is the minimum time that a message must be in the pending state to be claimed - MinDurationToClaim: time.Second * 1, - // IdleStillMine is the time that a message is still mine after the last ack - IdleStillMine: 0, - // MaxTries is the maximum number of tries to wait for the stream to be created - Tries: []int{1, 2, 3, 10, 15}, + // Stream identity + StreamName: "my-stream", + GroupName: "my-group", + ConsumerName: consumer.DefaultConsumerName(), // hostname-pid; must be unique per instance + + // How long to wait for new messages before triggering PEL (ms) + BlockMs: 2000, + + // XAUTOCLAIM settings + ClaimMinIdleMs: 30000, // claim messages idle for > 30s + ClaimBatch: 10, + + // XREADGROUP batch size + BatchSize: 10, + + // Idle threshold for StillMine check (ms) + IdleStillMine: 5000, + + // Retries while waiting for the stream to exist (seconds between each) + Tries: []int{1, 2, 5, 10}, + + // Adaptive slices — index 0 is the healthy-state value; + // index advances when the PEL is stalled, resets when it progresses. + RatioSlice: []int{5, 10, 20, 50}, // new messages per PEL batch + PelWaitSlice: []int{0, 1, 5, 30}, // seconds before each PEL attempt + BackoffSlice: []int{1, 2, 5, 10, 30}, // seconds when queue is completely empty } - ctx := context.Background() - // Create a new consumer - consumerClient, err := consumer.NewConsumer(ctx, redisArgs, consumerArgs) - if err != nil { - panic(err) - } + if err := c.InitConsumer(ctx); err != nil { + log.Fatal(err) + } for { - // Consume messages, get messages news, pending and claimed - messages, err := consumerClient.Consume(ctx) + messages, err := c.Consume(ctx) + if err != nil { + log.Println("consume error:", err) + continue + } + + for _, msg := range messages { + // Optional: verify the message is still assigned to this consumer + if ok, _ := c.StillMine(ctx, msg.ID); !ok { + fmt.Println("message reclaimed by another consumer:", msg.ID) + continue + } + + fmt.Println("processing:", msg.ID, msg.Values) + + // Acknowledge when processing is complete + if err := c.AcknowledgeMessage(ctx, msg.ID); err != nil { + log.Println("ack error:", err) + } + } + + // Optional: inspect adaptive state + s := c.Stats() + fmt.Printf("ratioIdx=%d pelWaitIdx=%d backoffIdx=%d prevPelSize=%d\n", + s.RatioIdx, s.PelWaitIdx, s.BackoffIdx, s.PrevPelSize) + } +} +``` + +### Graceful shutdown + +Pass a cancellable context. All sleeps (backoff, PEL wait) will unblock immediately when the context is cancelled. + +```go +package main + +import ( + "context" + "errors" + "github.com/enerBit/redsumer/v4/pkg/consumer" + "log" + "os" + "os/signal" + "syscall" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + // cancel on SIGTERM / SIGINT + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + <-sigCh + cancel() + }() + c, err := consumer.New(consumer.Config{ + // TODO: fill in configuration + }) + if err != nil { + log.Fatal(err) + } + for { + msgs, err := c.Consume(ctx) if err != nil { - fmt.Println(err) - } - // Process messages - for _, message := range messages { - // Check if the message is still mine - if ok, _ := client.StillMine(ctx, message.ID); !ok { - fmt.Println("Message", message.ID, "is not mine anymore") - continue + if errors.Is(err, context.Canceled) { + break } - fmt.Println(message.ID, message.Values) - // Acknowledge the message - err = consumerClient.Ack(ctx, message.ID) - if err != nil { - fmt.Println(err) - } + log.Println(err) + continue } + _ = msgs + // handle msgs ... } } - ``` -### Producing messages to a Redis Stream +## Producer usage -```golang +```go package main import ( - "context" - "time" + "context" + "log" - "github.com/enerBit/redsumer/v3/pkg/producer" + "github.com/enerBit/redsumer/v4/pkg/client" + "github.com/enerBit/redsumer/v4/pkg/producer" ) func main() { - // Redis client configuration - redisArgs := producer.RedisArgs{ - RedisHost: "localhost", - RedisPort: 6379, - Db: 0, - } - // Producer configuration - producerArgs := producer.ProducerArgs{StreamName: "stream_name"} ctx := context.Background() - // Create a new producer - producer, err := producer.NewProducer(ctx, redisArgs, producerArgs) - if err != nil { - panic(err) - } - // Produce a message - err = producer.Produce(ctx, map[string]interface{}{ - "key": "value", - }) + + p := &producer.Producer{ + Client: &client.ClientArgs{ + Host: "localhost", + Port: "6379", + }, + } + + if err := p.Client.InitClient(ctx); err != nil { + log.Fatal(err) + } + + err := p.Produce(ctx, map[string]string{ + "event": "order.created", + "id": "42", + }, "my-stream") if err != nil { - panic(err) + log.Fatal(err) } } ``` +## Configuration reference + +| Field | Type | Description | +|---|---|---| +| `StreamName` | `string` | Redis stream name | +| `GroupName` | `string` | Consumer group name | +| `ConsumerName` | `string` | Unique consumer name per instance (use `DefaultConsumerName()`) | +| `BlockMs` | `int64` | XREADGROUP blocking wait (ms) | +| `ClaimMinIdleMs` | `int64` | Minimum idle time before XAUTOCLAIM reclaims a message (ms) | +| `ClaimBatch` | `int64` | Messages per XAUTOCLAIM batch | +| `BatchSize` | `int64` | Messages per XREADGROUP batch | +| `IdleStillMine` | `int64` | Idle threshold for `StillMine` check (ms) | +| `Tries` | `[]int` | Seconds between retries while waiting for stream existence | +| `RatioSlice` | `[]int` | New messages per PEL batch. Advances when PEL stalls. e.g. `[5, 10, 20, 50]` | +| `PelWaitSlice` | `[]int` | Seconds before each PEL attempt. Advances when PEL stalls. e.g. `[0, 1, 5, 30]` | +| `BackoffSlice` | `[]int` | Seconds when queue is completely empty. e.g. `[1, 2, 5, 10, 30]` | + +All slice fields must have at least one element. `BatchSize` and `ClaimBatch` must be > 0. Validation runs in `InitConsumer` before any network connection is attempted. + +## Adaptive loop behaviour + +Each call to `Consume()` represents one iteration. The caller drives the outer `for` loop. + +**Phase 1 — new messages (`XREADGROUP BLOCK`)** +Reads up to `BatchSize` new messages, blocking for up to `BlockMs` ms. Accumulates a counter. When the counter reaches `RatioSlice[ratioIdx]`, the PEL phase runs and the counter resets. If no new messages arrive, the PEL phase runs immediately. + +**Phase 2 — PEL (`XAUTOCLAIM`)** +Claims one batch of up to `ClaimBatch` messages that have been idle for at least `ClaimMinIdleMs` ms. A `PelWaitSlice[pelWaitIdx]` second wait is applied before each attempt. + +When the cursor wraps back to `0-0` (full PEL traversal complete), the current PEL size is compared with the previous traversal: +- **PEL size changed** → `ratioIdx` and `pelWaitIdx` reset to 0 (PEL is progressing) +- **PEL size unchanged** → both indices advance (PEL is stalled; reduce frequency) + +**Phase 3 — backoff** +Applies only when both phases return empty. Sleeps for `BackoffSlice[backoffIdx]` seconds and advances the index. Resets when messages arrive. + +## Horizontal scaling + +Multiple instances under the same `GroupName` — Redis delivers each message to exactly one instance. The `ConsumerName` must be unique per instance; `DefaultConsumerName()` generates `hostname-pid`. + +## What the library does NOT do + +- No business logic on messages +- No retry decision — unacked messages stay in the PEL and are reclaimed by XAUTOCLAIM after `ClaimMinIdleMs` +- No internal concurrency — sequential batch processing; parallelism is the user's responsibility +- No DLQ or dead-letter stream +- No state persistence between restarts +- No circuit breaker + ## Contributing -Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change. -Please make sure to update tests as appropriate. +Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change. Please make sure to update tests as appropriate. ## License + [MIT](https://choosealicense.com/licenses/mit/) diff --git a/go.mod b/go.mod index 2497265..607e6f2 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ -module github.com/enerBit/redsumer/v3 +module github.com/enerBit/redsumer/v4 -go 1.22.7 +go 1.25.5 require ( - github.com/valkey-io/valkey-go v1.0.45 - github.com/valkey-io/valkey-go/mock v1.0.45 - go.uber.org/mock v0.4.0 + github.com/valkey-io/valkey-go v1.0.73 + github.com/valkey-io/valkey-go/mock v1.0.73 + go.uber.org/mock v0.6.0 ) -require golang.org/x/sys v0.24.0 // indirect +require golang.org/x/sys v0.42.0 // indirect diff --git a/go.sum b/go.sum index b3e3799..28d4e67 100644 --- a/go.sum +++ b/go.sum @@ -1,20 +1,22 @@ -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= -github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= -github.com/valkey-io/valkey-go v1.0.45 h1:d2ksu+FvKEy9pU9CCMZ94ABTLm2kNHU0jxEJZRqpFA4= -github.com/valkey-io/valkey-go v1.0.45/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/onsi/gomega v1.38.3 h1:eTX+W6dobAYfFeGC2PV6RwXRu/MyT+cQguijutvkpSM= +github.com/onsi/gomega v1.38.3/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4= +github.com/valkey-io/valkey-go v1.0.73 h1:lztOPT0amtR6mwUkeNDcLepdYFdgVpJe/99EohfrmJ4= +github.com/valkey-io/valkey-go v1.0.73/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k= github.com/valkey-io/valkey-go/mock v1.0.45 h1:jHqf5ItZwIJQi2iX1hsTQlKOJ/WtEe59sipg7VGo8UU= github.com/valkey-io/valkey-go/mock v1.0.45/go.mod h1:v0H4l0bEIBy3FpMcYQR+a4gYViNJavRKJNRd0en/4lM= +github.com/valkey-io/valkey-go/mock v1.0.73 h1:mGwXV7m4uGGUxQ8yGVSGRDgrVQDd76IJGh/+1L5n+Y8= +github.com/valkey-io/valkey-go/mock v1.0.73/go.mod h1:/gPl8Rnb/aI05L/O1Ss0hliwQdn1tjmTGq19hodqz1E= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= +go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= diff --git a/pkg/client/client.go b/pkg/client/client.go index 3d676ac..4eaee76 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -23,6 +23,11 @@ var cacheMutex = sync.RWMutex{} // It then sends a PING command to the Valkey server to check the connection. // The function returns any error encountered during client creation or the PING command. func (r *ClientArgs) InitClient(ctx context.Context) error { + // Allow injecting a pre-built/mocked client (e.g. for unit tests). + if r.Instance != nil { + return nil + } + redisAddress := fmt.Sprintf("%s:%s", r.Host, r.Port) // Check if we already have a client for this address diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index a7fc409..fb5269a 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -3,18 +3,23 @@ package client import ( "context" "testing" + + "github.com/valkey-io/valkey-go/mock" + "go.uber.org/mock/gomock" ) func TestNewRedisClient(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + mockClient := mock.NewClient(ctrl) c := ClientArgs{ - Host: "localhost", - Port: "6378", + Instance: mockClient, } - err := c.InitClient(ctx) - if err != nil { - t.Errorf("Error: %v", err) + if err := c.InitClient(ctx); err != nil { + t.Fatalf("InitClient failed: %v", err) } } diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index 1930a0c..3a9461e 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -3,12 +3,14 @@ package consumer import ( "context" "errors" + "fmt" + "os" "strconv" "strings" "time" - "github.com/enerBit/redsumer/v3/pkg/client" - errors_custom "github.com/enerBit/redsumer/v3/pkg/errors" + "github.com/enerBit/redsumer/v4/pkg/client" + errors_custom "github.com/enerBit/redsumer/v4/pkg/errors" "github.com/valkey-io/valkey-go" ) @@ -18,124 +20,198 @@ const ( consumer_NOGROUP = "NOGROUP No such key" ) +// Consumer holds configuration and internal adaptive state for consuming a Redis Stream. +// All slice fields must have at least one element; BatchSize and ClaimBatch must be > 0. type Consumer struct { - Client *client.ClientArgs - - Tries []int - + // --- User configuration --- + Client *client.ClientArgs StreamName string GroupName string ConsumerName string + Tries []int // seconds between retries while waiting for stream existence + + BlockMs int64 // XREADGROUP BLOCK timeout (ms) + ClaimMinIdleMs int64 // XAUTOCLAIM min idle time (ms) + ClaimBatch int64 // XAUTOCLAIM batch size + BatchSize int64 // XREADGROUP batch size + IdleStillMine int64 // XPENDING idle threshold used by StillMine + + // Adaptive slices (length >= 1 enforced in InitConsumer) + RatioSlice []int // new messages per PEL batch; index advances when PEL stalls. e.g. [5,10,20,50] + PelWaitSlice []int // seconds before each PEL attempt; advances when stalled. e.g. [0,1,5,30] + BackoffSlice []int // seconds when queue completely empty. e.g. [1,2,5,10,30] + + // --- Internal adaptive state --- + nextIdAutoClaim string + newMsgCounter int + ratioIdx int + pelWaitIdx int + backoffIdx int + prevPelSize int64 // -1 = no full traversal recorded yet + pelTraversalComplete bool +} - BatchSizeNewMessage *int64 - BatchSizePending *int64 - BatchSizeAutoClaim *int64 +// ConsumerStats is a snapshot of the consumer's internal adaptive state for observability. +type ConsumerStats struct { + RatioIdx int + PelWaitIdx int + BackoffIdx int + PrevPelSize int64 +} + +// Stats returns a snapshot of the current adaptive state indices. +func (c *Consumer) Stats() ConsumerStats { + return ConsumerStats{ + RatioIdx: c.ratioIdx, + PelWaitIdx: c.pelWaitIdx, + BackoffIdx: c.backoffIdx, + PrevPelSize: c.prevPelSize, + } +} - IdleStillMine int64 - MinIdleAutoClaim int64 +// DefaultConsumerName returns a unique consumer name built from hostname and PID. +// Recommended for horizontal scaling (each instance must have a unique name). +func DefaultConsumerName() string { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + return hostname + "-" + strconv.Itoa(os.Getpid()) +} - latestPendingMessageId string - nextIdAutoClaim string +// validateConfig checks all required configuration fields before connecting. +func validateConfig(c *Consumer) error { + if c.Client == nil { + return fmt.Errorf("%w: client must not be nil", errors_custom.ErrInvalidConfig) + } + if strings.TrimSpace(c.StreamName) == "" { + return fmt.Errorf("%w: stream_name must not be empty", errors_custom.ErrInvalidConfig) + } + if strings.TrimSpace(c.GroupName) == "" { + return fmt.Errorf("%w: group_name must not be empty", errors_custom.ErrInvalidConfig) + } + if strings.TrimSpace(c.ConsumerName) == "" { + return fmt.Errorf("%w: consumer_name must not be empty", errors_custom.ErrInvalidConfig) + } + if len(c.RatioSlice) == 0 { + return fmt.Errorf("%w: ratio_slice must not be empty", errors_custom.ErrInvalidConfig) + } + if len(c.PelWaitSlice) == 0 { + return fmt.Errorf("%w: pel_wait_slice must not be empty", errors_custom.ErrInvalidConfig) + } + if len(c.BackoffSlice) == 0 { + return fmt.Errorf("%w: backoff_slice must not be empty", errors_custom.ErrInvalidConfig) + } + if c.BatchSize <= 0 { + return fmt.Errorf("%w: batch_size must be > 0", errors_custom.ErrInvalidConfig) + } + if c.ClaimBatch <= 0 { + return fmt.Errorf("%w: claim_batch must be > 0", errors_custom.ErrInvalidConfig) + } + return nil } -// InitConsumer creates a new Consumer instance. -// If any error occurs during the process, it returns nil and the error. -// Otherwise, it returns the created Consumer instance and nil error. +// InitConsumer validates configuration, initializes the Redis client, resets internal state, +// and creates the consumer group if it does not already exist. func (c *Consumer) InitConsumer(ctx context.Context) error { - err := c.Client.InitClient(ctx) - if err != nil { + if err := validateConfig(c); err != nil { return err } - c.latestPendingMessageId = consumer_INITIAL_STREAM_ID - c.nextIdAutoClaim = consumer_INITIAL_STREAM_ID - - err = c.initGroup(ctx) - if err != nil { + if err := c.Client.InitClient(ctx); err != nil { return err } - return nil + c.nextIdAutoClaim = consumer_INITIAL_STREAM_ID + c.newMsgCounter = 0 + c.ratioIdx = 0 + c.pelWaitIdx = 0 + c.backoffIdx = 0 + c.prevPelSize = -1 + c.pelTraversalComplete = false + + return c.initGroup(ctx) } -// exist checks if a key exists in the Valkey client. -// It returns an error if the key does not exist. +// exist checks whether a stream key exists in Redis. func (c *Consumer) exist(ctx context.Context, key string) error { cmd := c.Client.Instance.B().Exists().Key(key).Build() e, err := c.Client.Instance.Do(ctx, cmd).ToInt64() if err != nil { return err } - if e != 1 { return errors_custom.ErrKeyNotFound } - return nil } -// createGroup creates a consumer group for processing messages from a stream. -// It waits for the stream to be available and then creates the group using the provided arguments. -// If the group already exists, it returns without an error. -// If any error occurs during the process, it is returned. +// waitForStream retries existence checks using c.Tries delays (in seconds). +// Returns ErrStreamNotFound if the stream never appears. +func (c *Consumer) waitForStream(ctx context.Context) error { + for _, waitTime := range c.Tries { + err := c.exist(ctx, c.StreamName) + if err == nil { + return nil + } + // Only retry when the key truly does not exist. + if !errors.Is(err, errors_custom.ErrKeyNotFound) { + return err + } + wait := time.Second * time.Duration(waitTime) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(wait): + // continue to next retry + } + } + return errors_custom.ErrStreamNotFound +} + +// initGroup waits for the stream and creates the consumer group. +// A BUSYGROUP error (group already exists) is treated as success. func (c *Consumer) initGroup(ctx context.Context) error { - err := c.waitForStream(ctx) - if err != nil { + if err := c.waitForStream(ctx); err != nil { return err } cmd := c.Client.Instance.B().XgroupCreate().Key(c.StreamName).Group(c.GroupName).Id(consumer_INITIAL_STREAM_ID).Build() - err = c.Client.Instance.Do(ctx, cmd).Error() + err := c.Client.Instance.Do(ctx, cmd).Error() if err != nil { var errV *valkey.ValkeyError if errors.As(err, &errV) { if errV.IsBusyGroup() { return nil - } else { - return errV } + return errV } return err } - return nil } -// waitForStream waits for the stream to be ready by checking its existence in the Valkey client. -// It retries for the specified number of times with a delay between each attempt. -// If the stream is ready, it returns nil. Otherwise, it returns an error. -func (c *Consumer) waitForStream(ctx context.Context) error { - for _, waitTime := range c.Tries { - err := c.exist(ctx, c.StreamName) - if err == nil { - return nil - } - time.Sleep(time.Second * time.Duration(waitTime)) +// validateError recreates the consumer group if err signals NOGROUP (group was deleted at runtime). +// Returns nil so the caller can retry; returns the original error otherwise. +func (c *Consumer) validateError(ctx context.Context, err error) error { + if strings.Contains(err.Error(), consumer_NOGROUP) { + return c.initGroup(ctx) } - return errors_custom.ErrStreamNotFound + return err } -// StillMine checks if a message with the given messageId is still pending for consumption. -// It returns true if the message is still pending, otherwise false. -// If there is an error while checking, it returns the error. +// StillMine reports whether the message identified by messageID is still in the PEL +// for this consumer (idle for at least IdleStillMine ms). func (c *Consumer) StillMine(ctx context.Context, messageID string) (bool, error) { - var isMine bool - cmd := c.Client.Instance.B().Xpending().Key(c.StreamName).Group(c.GroupName).Idle(c.IdleStillMine).Start(messageID).End(messageID).Count(1).Consumer(c.ConsumerName).Build() v, err := c.Client.Instance.Do(ctx, cmd).ToArray() if err != nil { - return isMine, err - } - - if len(v) != 0 { - isMine = true + return false, err } - - return isMine, nil + return len(v) != 0, nil } -// Ack acknowledges a message with the given message ID in the consumer group. -// It returns an error if there was a problem acknowledging the message. +// AcknowledgeMessage sends XACK for a single message ID. func (c *Consumer) AcknowledgeMessage(ctx context.Context, messageID string) error { cmd := c.Client.Instance.B().Xack().Key(c.StreamName).Group(c.GroupName).Id(messageID).Build() v, err := c.Client.Instance.Do(ctx, cmd).AsBool() @@ -148,142 +224,185 @@ func (c *Consumer) AcknowledgeMessage(ctx context.Context, messageID string) err return nil } -// newMessages retrieves new messages from a Valkey stream for the consumer. -// It uses the AsXRead command to read messages from the specified stream, -// using the consumer group and name provided in the Consumer struct. -// The function returns a slice of Valkey.XRangeEntry, which contains the retrieved messages, -// and an error if any occurred during the retrieval process. -func (c *Consumer) NewMessages(ctx context.Context) ([]valkey.XRangeEntry, error) { - cmd := c.Client.Instance.B().Xreadgroup().Group(c.GroupName, c.ConsumerName).Count(*c.BatchSizeNewMessage).Streams().Key(c.StreamName).Id(consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR).Build() +// newMessages fetches up to BatchSize new (never-delivered) messages from the stream, +// blocking for up to BlockMs milliseconds. +// Returns nil, nil when the block timeout expires with no messages. +func (c *Consumer) newMessages(ctx context.Context) ([]valkey.XRangeEntry, error) { + cmd := c.Client.Instance.B().Xreadgroup().Group(c.GroupName, c.ConsumerName).Count(c.BatchSize).Block(c.BlockMs).Streams().Key(c.StreamName).Id(consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR).Build() v, err := c.Client.Instance.Do(ctx, cmd).AsXRead() if err != nil { var errV *valkey.ValkeyError if errors.As(err, &errV) { - if !errV.IsNil() { - return nil, errV + if errV.IsNil() { + return nil, nil // block timeout with no messages is not an error } - } else { - return nil, err + return nil, errV } + return nil, err } - - fields := v[c.StreamName] - return fields, nil + return v[c.StreamName], nil } -// pendingMessages retrieves pending messages from a Valkey stream. -// It returns a slice of Valkey.XRangeEntry representing the pending messages and an error if any. -func (c *Consumer) PendingMessages(ctx context.Context) ([]valkey.XRangeEntry, error) { - cmd := c.Client.Instance.B().Xreadgroup().Group(c.GroupName, c.ConsumerName).Count(*c.BatchSizePending).Streams().Key(c.StreamName).Id(c.latestPendingMessageId).Build() - v, err := c.Client.Instance.Do(ctx, cmd).AsXRead() - if err != nil { - var errV *valkey.ValkeyError - if errors.As(err, &errV) { - if !errV.IsNil() { - return nil, errV - } - } else { - return nil, err - } - } - - fields := v[c.StreamName] - if len(fields) != 0 { - c.latestPendingMessageId = fields[len(fields)-1].ID - } else { - c.latestPendingMessageId = consumer_INITIAL_STREAM_ID - } - return fields, nil -} +// autoClaimMessages runs one XAUTOCLAIM batch starting from the current cursor. +// Sets pelTraversalComplete = true when the returned cursor resets to "0-0". +func (c *Consumer) autoClaimMessages(ctx context.Context) ([]valkey.XRangeEntry, error) { + c.pelTraversalComplete = false -// claimedMessages returns a slice of claimed messages from the Valkey stream. -// It uses the XAutoClaim method of the Valkey client to automatically claim messages -// from the specified stream and group. The minimum idle duration to claim a message -// is determined by the MinDurationToClaim field of the ConsumerArgs struct. -// The Start field specifies the ID of the first message to claim, and the Count field -// determines the number of messages to claim in a batch. The Consumer field specifies -// the name of the consumer. If an error occurs during the claiming process, it is returned. -// If the error is not equal to the Valkey_NIL error, it is returned as is. -// Otherwise, the claimed messages are returned along with a nil error. -func (c *Consumer) AutoClaimMessages(ctx context.Context) ([]valkey.XRangeEntry, error) { - cmd := c.Client.Instance.B().Xautoclaim().Key(c.StreamName).Group(c.GroupName).Consumer(c.ConsumerName).MinIdleTime(strconv.FormatInt(c.MinIdleAutoClaim, 10)).Start(c.nextIdAutoClaim).Count(*c.BatchSizeAutoClaim).Build() + cmd := c.Client.Instance.B().Xautoclaim().Key(c.StreamName).Group(c.GroupName).Consumer(c.ConsumerName).MinIdleTime(strconv.FormatInt(c.ClaimMinIdleMs, 10)).Start(c.nextIdAutoClaim).Count(c.ClaimBatch).Build() v, err := c.Client.Instance.Do(ctx, cmd).ToArray() if err != nil { return nil, err } - nextMessage, err := v[0].ToString() + nextID, err := v[0].ToString() if err != nil { return nil, err } + c.nextIdAutoClaim = nextID + if c.nextIdAutoClaim == consumer_INITIAL_STREAM_ID { + c.pelTraversalComplete = true + } - c.nextIdAutoClaim = nextMessage - - e, err := v[1].AsXRange() + entries, err := v[1].AsXRange() if err != nil { return nil, err } + return entries, nil +} - return e, nil +// pelSize returns the total number of pending messages in the group via XPENDING summary form. +func (c *Consumer) pelSize(ctx context.Context) (int64, error) { + cmd := c.Client.Instance.B().Xpending().Key(c.StreamName).Group(c.GroupName).Build() + v, err := c.Client.Instance.Do(ctx, cmd).ToArray() + if err != nil { + return 0, err + } + if len(v) == 0 { + return 0, nil + } + return v[0].ToInt64() } -// validateError checks if the given error contains a specific error message and performs an action accordingly. -// If the error message contains NOGROUP, it calls the createGroup method to create a group. -// Otherwise, it returns the original error. -func (c *Consumer) validateError(ctx context.Context, err error) error { - if strings.Contains(err.Error(), consumer_NOGROUP) { - return c.initGroup(ctx) +// contextSleep sleeps for d, returning ctx.Err() immediately if the context is cancelled. +func contextSleep(ctx context.Context, d time.Duration) error { + if d <= 0 { + return nil } - return err + timer := time.NewTimer(d) + defer func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + }() + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// advanceIdx returns idx+1 capped at maxIdx (stays at maxIdx indefinitely). +func advanceIdx(idx, maxIdx int) int { + if idx < maxIdx { + return idx + 1 + } + return maxIdx } -// Consume consumes messages from Valkey. -// It first tries to fetch new messages, then pending messages, and finally claimed messages. -// If any messages are found, they are returned along with a nil error. -// If no messages are found, it returns nil and nil error. +// Consume runs one iteration of the adaptive consumption loop and returns a batch of messages. +// +// Phase 1 — new messages: XREADGROUP BLOCK up to BlockMs. +// Phase 2 — PEL (XAUTOCLAIM): triggered when the ratio counter is met or no new messages arrived. +// +// After a full PEL traversal the PEL size is compared with the previous traversal: +// - changed → reset ratioIdx and pelWaitIdx to 0 (PEL is progressing) +// - unchanged → advance both indices (PEL is stalled, reduce attempt frequency) +// +// Phase 3 — backoff: applied only when both phases return empty; index advances each call. +// +// New messages take priority when both phases return results in the same iteration. +// The caller drives the outer loop. func (c *Consumer) Consume(ctx context.Context) ([]valkey.XRangeEntry, error) { retry: - if c.BatchSizeNewMessage != nil { - messages, err := c.NewMessages(ctx) - if err != nil { - err = c.validateError(ctx, err) - if err == nil { - goto retry - } - return nil, err - } - if len(messages) != 0 { - return messages, nil + // ── Phase 1: new messages ───────────────────────────────────────────── + msgs, err := c.newMessages(ctx) + if err != nil { + if err = c.validateError(ctx, err); err == nil { + goto retry } + return nil, err } - if c.BatchSizePending != nil { - messages, err := c.PendingMessages(ctx) - if err != nil { - err = c.validateError(ctx, err) - if err == nil { - goto retry - } - return nil, err + if len(msgs) > 0 { + c.newMsgCounter += len(msgs) + c.backoffIdx = 0 // messages are flowing; reset backoff + + if c.newMsgCounter < c.RatioSlice[c.ratioIdx] { + // Ratio threshold not yet met; skip PEL this call. + return msgs, nil } - if len(messages) != 0 { - return messages, nil + } + + // ── Phase 2: PEL ────────────────────────────────────────────────────── + // Reached when: ratio met OR no new messages arrived. + c.newMsgCounter = 0 + + if err := contextSleep(ctx, time.Duration(c.PelWaitSlice[c.pelWaitIdx])*time.Second); err != nil { + return nil, err + } + + pelMsgs, err := c.autoClaimMessages(ctx) + if err != nil { + if err = c.validateError(ctx, err); err == nil { + goto retry } + return nil, err } - if c.BatchSizeAutoClaim != nil { - messages, err := c.AutoClaimMessages(ctx) - if err != nil { - err = c.validateError(ctx, err) - if err == nil { + if c.pelTraversalComplete { + currentSize, sizeErr := c.pelSize(ctx) + if sizeErr != nil { + if sizeErr = c.validateError(ctx, sizeErr); sizeErr == nil { goto retry } - return nil, err + return nil, sizeErr } - if len(messages) != 0 { - return messages, nil + if currentSize != c.prevPelSize { + // PEL size changed (progressed or grew) → reset protection indices. + c.ratioIdx = 0 + c.pelWaitIdx = 0 + } else { + // PEL unchanged → advance both indices to reduce attempt frequency. + c.ratioIdx = advanceIdx(c.ratioIdx, len(c.RatioSlice)-1) + c.pelWaitIdx = advanceIdx(c.pelWaitIdx, len(c.PelWaitSlice)-1) } + c.prevPelSize = currentSize } + if len(msgs) > 0 && len(pelMsgs) > 0 { + // Return new messages first, then reclaimed PEL messages. + combined := append(msgs, pelMsgs...) + c.backoffIdx = 0 + return combined, nil + } + + if len(msgs) > 0 { + // New messages take priority; PEL cursor advanced as a side effect. + return msgs, nil + } + if len(pelMsgs) > 0 { + c.backoffIdx = 0 + return pelMsgs, nil + } + + // ── Phase 3: backoff (both phases empty) ────────────────────────────── + if err := contextSleep(ctx, time.Duration(c.BackoffSlice[c.backoffIdx])*time.Second); err != nil { + return nil, err + } + c.backoffIdx = advanceIdx(c.backoffIdx, len(c.BackoffSlice)-1) return nil, nil } diff --git a/pkg/consumer/consumer_test.go b/pkg/consumer/consumer_test.go index 1f3c5ff..34e527e 100644 --- a/pkg/consumer/consumer_test.go +++ b/pkg/consumer/consumer_test.go @@ -2,10 +2,13 @@ package consumer import ( "context" + "errors" "strconv" "testing" + "time" - "github.com/enerBit/redsumer/v3/pkg/client" + "github.com/enerBit/redsumer/v4/pkg/client" + errors_custom "github.com/enerBit/redsumer/v4/pkg/errors" "github.com/valkey-io/valkey-go" "github.com/valkey-io/valkey-go/mock" "go.uber.org/mock/gomock" @@ -17,91 +20,75 @@ const ( consumerName string = "consumer-test" ) +// baseConsumer returns a Consumer with minimal valid configuration and initialized +// internal state for tests (mirrors what InitConsumer sets up). +func baseConsumer(db valkey.Client) *Consumer { + return &Consumer{ + Client: &client.ClientArgs{Instance: db}, + StreamName: streamName, + GroupName: groupName, + ConsumerName: consumerName, + Tries: []int{0}, + BatchSize: 1, + ClaimBatch: 1, + BlockMs: 0, + ClaimMinIdleMs: 100, + RatioSlice: []int{5}, + PelWaitSlice: []int{0}, + BackoffSlice: []int{0}, + // internal state (mirrors InitConsumer initialization) + nextIdAutoClaim: consumer_INITIAL_STREAM_ID, + prevPelSize: -1, + } +} + +// ── Group / stream init ─────────────────────────────────────────────────────── + func TestCreateGroupSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(1))) db.EXPECT().Do(ctx, mock.Match("XGROUP", "CREATE", streamName, groupName, consumer_INITIAL_STREAM_ID)).Return(mock.ErrorResult(nil)) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - Tries: []int{1}, - } - - err := c.initGroup(ctx) - if err != nil { - t.Fatalf("expected nil error, got %v", err) + if err := baseConsumer(db).initGroup(ctx); err != nil { + t.Fatalf("expected nil, got %v", err) } } func TestCreateGroupErrorBusyGroup(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(1))) db.EXPECT().Do(ctx, mock.Match("XGROUP", "CREATE", streamName, groupName, consumer_INITIAL_STREAM_ID)).Return(mock.Result(mock.ValkeyError("BUSYGROUP Consumer Group name already exists"))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - Tries: []int{1}, - } - - err := c.initGroup(ctx) - if err != nil { - t.Fatalf("expected nil error, got %v", err) + if err := baseConsumer(db).initGroup(ctx); err != nil { + t.Fatalf("expected nil for BUSYGROUP, got %v", err) } } func TestCreateGroupError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(1))) db.EXPECT().Do(ctx, mock.Match("XGROUP", "CREATE", streamName, groupName, consumer_INITIAL_STREAM_ID)).Return(mock.Result(mock.ValkeyError("error"))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - Tries: []int{1}, - } - - err := c.initGroup(ctx) - if err == nil { - t.Fatalf("expected error, got nil") + if err := baseConsumer(db).initGroup(ctx); err == nil { + t.Fatal("expected error, got nil") } } func TestWaitForStreamSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) @@ -110,388 +97,703 @@ func TestWaitForStreamSuccess(t *testing.T) { db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(0))) db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(1))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - Tries: []int{1, 2, 3, 4}, - } - - err := c.waitForStream(ctx) - if err != nil { - t.Fatalf("expected nil error, got %v", err) + c := baseConsumer(db) + c.Tries = []int{0, 0, 0, 0} + if err := c.waitForStream(ctx); err != nil { + t.Fatalf("expected nil, got %v", err) } } func TestWaitForStreamError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) - db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(0))) - db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(0))) - db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(0))) - db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(0))) + db.EXPECT().Do(ctx, mock.Match("EXISTS", streamName)).Return(mock.Result(mock.ValkeyInt64(0))).Times(4) + + c := baseConsumer(db) + c.Tries = []int{0, 0, 0, 0} + if err := c.waitForStream(ctx); err == nil { + t.Fatal("expected error, got nil") + } +} + +// ── validateConfig ──────────────────────────────────────────────────────────── - clientArg := &client.ClientArgs{ - Instance: db, +func TestInitConsumer_EmptyRatioSlice(t *testing.T) { + c := &Consumer{BatchSize: 1, ClaimBatch: 1, RatioSlice: []int{}, PelWaitSlice: []int{0}, BackoffSlice: []int{0}} + if !errors.Is(validateConfig(c), errors_custom.ErrInvalidConfig) { + t.Fatal("expected ErrInvalidConfig for empty RatioSlice") } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - Tries: []int{1, 2, 3, 4}, +} + +func TestInitConsumer_EmptyPelWaitSlice(t *testing.T) { + c := &Consumer{BatchSize: 1, ClaimBatch: 1, RatioSlice: []int{5}, PelWaitSlice: []int{}, BackoffSlice: []int{0}} + if !errors.Is(validateConfig(c), errors_custom.ErrInvalidConfig) { + t.Fatal("expected ErrInvalidConfig for empty PelWaitSlice") } +} - err := c.waitForStream(ctx) - if err == nil { - t.Fatalf("expected error, got nil") +func TestInitConsumer_EmptyBackoffSlice(t *testing.T) { + c := &Consumer{BatchSize: 1, ClaimBatch: 1, RatioSlice: []int{5}, PelWaitSlice: []int{0}, BackoffSlice: []int{}} + if !errors.Is(validateConfig(c), errors_custom.ErrInvalidConfig) { + t.Fatal("expected ErrInvalidConfig for empty BackoffSlice") } } +func TestInitConsumer_ZeroBatchSize(t *testing.T) { + c := &Consumer{BatchSize: 0, ClaimBatch: 1, RatioSlice: []int{5}, PelWaitSlice: []int{0}, BackoffSlice: []int{0}} + if !errors.Is(validateConfig(c), errors_custom.ErrInvalidConfig) { + t.Fatal("expected ErrInvalidConfig for BatchSize=0") + } +} + +func TestInitConsumer_ZeroClaimBatch(t *testing.T) { + c := &Consumer{BatchSize: 1, ClaimBatch: 0, RatioSlice: []int{5}, PelWaitSlice: []int{0}, BackoffSlice: []int{0}} + if !errors.Is(validateConfig(c), errors_custom.ErrInvalidConfig) { + t.Fatal("expected ErrInvalidConfig for ClaimBatch=0") + } +} + +func TestInitConsumer_ValidConfig(t *testing.T) { + c := &Consumer{BatchSize: 1, ClaimBatch: 1, RatioSlice: []int{5}, PelWaitSlice: []int{0}, BackoffSlice: []int{0}} + if err := validateConfig(c); err != nil { + t.Fatalf("expected nil for valid config, got %v", err) + } +} + +// ── StillMine ───────────────────────────────────────────────────────────────── + func TestStillMineSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) messageId := "1676389477-0" - var idleStillMine int64 = 1 + var idle int64 = 1 - db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName, "IDLE", strconv.FormatInt(idleStillMine, 10), messageId, messageId, "1", consumerName)).Return(mock.Result(mock.ValkeyArray(valkey.ValkeyMessage{}))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - IdleStillMine: idleStillMine, - } + db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName, "IDLE", strconv.FormatInt(idle, 10), messageId, messageId, "1", consumerName)). + Return(mock.Result(mock.ValkeyArray(valkey.ValkeyMessage{}))) + + c := baseConsumer(db) + c.IdleStillMine = idle ok, err := c.StillMine(ctx, messageId) if err != nil { - t.Fatalf("expected nil error, got %v", err) + t.Fatalf("expected nil, got %v", err) } - if !ok { - t.Fatalf("expected true, got false") + t.Fatal("expected true, got false") } } func TestStillMineError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) messageId := "1676389477-0" - var idleStillMine int64 = 1 + var idle int64 = 1 - db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName, "IDLE", strconv.FormatInt(idleStillMine, 10), messageId, messageId, "1", consumerName)).Return(mock.Result(mock.ValkeyError("error"))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - IdleStillMine: idleStillMine, - } + db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName, "IDLE", strconv.FormatInt(idle, 10), messageId, messageId, "1", consumerName)). + Return(mock.Result(mock.ValkeyError("error"))) + + c := baseConsumer(db) + c.IdleStillMine = idle _, err := c.StillMine(ctx, messageId) if err == nil { - t.Fatalf("expected error, got nil") + t.Fatal("expected error, got nil") } } func TestStillMineFalse(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) messageId := "1676389477-0" - var idleStillMine int64 = 1 + var idle int64 = 1 - db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName, "IDLE", strconv.FormatInt(idleStillMine, 10), messageId, messageId, "1", consumerName)).Return(mock.Result(mock.ValkeyArray())) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - IdleStillMine: idleStillMine, - } + db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName, "IDLE", strconv.FormatInt(idle, 10), messageId, messageId, "1", consumerName)). + Return(mock.Result(mock.ValkeyArray())) + + c := baseConsumer(db) + c.IdleStillMine = idle ok, err := c.StillMine(ctx, messageId) if err != nil { - t.Fatalf("expected nil error, got %v", err) + t.Fatalf("expected nil, got %v", err) } - if ok { - t.Fatalf("expected false, got true") + t.Fatal("expected false, got true") } } +// ── AcknowledgeMessage ──────────────────────────────────────────────────────── + func TestAckSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) messageId := "1676389477-0" - db.EXPECT().Do(ctx, mock.Match("XACK", streamName, groupName, messageId)).Return(mock.Result(mock.ValkeyInt64(1))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - } - err := c.AcknowledgeMessage(ctx, messageId) - if err != nil { - t.Fatalf("expected nil error, got %v", err) + if err := baseConsumer(db).AcknowledgeMessage(ctx, messageId); err != nil { + t.Fatalf("expected nil, got %v", err) } } func TestAckError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) messageId := "1676389477-0" - db.EXPECT().Do(ctx, mock.Match("XACK", streamName, groupName, messageId)).Return(mock.Result(mock.ValkeyError("error"))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - } - err := c.AcknowledgeMessage(ctx, messageId) - if err == nil { - t.Fatalf("expected error, got nil") + if err := baseConsumer(db).AcknowledgeMessage(ctx, messageId); err == nil { + t.Fatal("expected error, got nil") } } +// ── newMessages ─────────────────────────────────────────────────────────────── + func TestNewMessagesSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) - batchSize := int64(1) + db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "BLOCK", "0", "STREAMS", streamName, consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR)). + Return(mock.Result(mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyNil(), mock.ValkeyNil())))) - db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "STREAMS", streamName, consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR)).Return(mock.Result(mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyNil(), mock.ValkeyNil())))) - clientArg := &client.ClientArgs{ - Instance: db, - } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - BatchSizeNewMessage: &batchSize, + _, err := baseConsumer(db).newMessages(ctx) + if err != nil { + t.Fatalf("expected nil, got %v", err) } +} + +func TestNewMessagesTimeout(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) - _, err := c.NewMessages(ctx) + db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "BLOCK", "0", "STREAMS", streamName, consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR)). + Return(mock.Result(mock.ValkeyNil())) + + msgs, err := baseConsumer(db).newMessages(ctx) if err != nil { - t.Fatalf("expected nil error, got %v", err) + t.Fatalf("block timeout must not be an error, got %v", err) + } + if msgs != nil { + t.Fatalf("expected nil msgs on timeout, got %v", msgs) } } func TestNewMessagesError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "BLOCK", "0", "STREAMS", streamName, consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR)). + Return(mock.Result(mock.ValkeyError("error"))) + + _, err := baseConsumer(db).newMessages(ctx) + if err == nil { + t.Fatal("expected error, got nil") + } +} + +// ── autoClaimMessages ───────────────────────────────────────────────────────── +func TestAutoClaimMessagesSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() ctx := context.Background() db := mock.NewClient(ctrl) - batchSize := int64(1) + messageId := "1676389477-0" + nextCursor := "9999-0" + db.EXPECT().Do(ctx, mock.Match("XAUTOCLAIM", streamName, groupName, consumerName, "100", consumer_INITIAL_STREAM_ID, "COUNT", "1")). + Return(mock.Result(mock.ValkeyArray( + mock.ValkeyString(nextCursor), + mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyString(messageId), mock.ValkeyMap(make(map[string]valkey.ValkeyMessage)))), + ))) + + c := baseConsumer(db) + c.nextIdAutoClaim = consumer_INITIAL_STREAM_ID - db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "STREAMS", streamName, consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR)).Return(mock.Result(mock.ValkeyError("error"))) - clientArg := &client.ClientArgs{ - Instance: db, + _, err := c.autoClaimMessages(ctx) + if err != nil { + t.Fatalf("expected nil, got %v", err) } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - BatchSizeNewMessage: &batchSize, + if c.pelTraversalComplete { + t.Fatal("expected pelTraversalComplete=false when cursor != 0-0") } - - _, err := c.NewMessages(ctx) - if err == nil { - t.Fatalf("expected error, got nil") + if c.nextIdAutoClaim != nextCursor { + t.Fatalf("expected cursor %q, got %q", nextCursor, c.nextIdAutoClaim) } } -func TestPendingMessagesSuccess(t *testing.T) { +func TestAutoClaimMessagesCursorReset(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() db := mock.NewClient(ctrl) - db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "STREAMS", streamName, consumer_INITIAL_STREAM_ID)).Return(mock.Result(mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyNil(), mock.ValkeyNil())))) - clientArg := &client.ClientArgs{ - Instance: db, + messageId := "1676389477-0" + db.EXPECT().Do(ctx, mock.Match("XAUTOCLAIM", streamName, groupName, consumerName, "100", consumer_INITIAL_STREAM_ID, "COUNT", "1")). + Return(mock.Result(mock.ValkeyArray( + mock.ValkeyString(consumer_INITIAL_STREAM_ID), + mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyString(messageId), mock.ValkeyMap(make(map[string]valkey.ValkeyMessage)))), + ))) + + c := baseConsumer(db) + c.nextIdAutoClaim = consumer_INITIAL_STREAM_ID + + _, err := c.autoClaimMessages(ctx) + if err != nil { + t.Fatalf("expected nil, got %v", err) + } + if !c.pelTraversalComplete { + t.Fatal("expected pelTraversalComplete=true when cursor resets to 0-0") } +} - var S int64 = 1 - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - BatchSizePending: &S, - latestPendingMessageId: consumer_INITIAL_STREAM_ID, +func TestAutoClaimMessagesError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, mock.Match("XAUTOCLAIM", streamName, groupName, consumerName, "100", consumer_INITIAL_STREAM_ID, "COUNT", "1")). + Return(mock.Result(mock.ValkeyError("error"))) + + c := baseConsumer(db) + c.nextIdAutoClaim = consumer_INITIAL_STREAM_ID + + _, err := c.autoClaimMessages(ctx) + if err == nil { + t.Fatal("expected error, got nil") } +} - _, err := c.PendingMessages(ctx) +// ── pelSize ─────────────────────────────────────────────────────────────────── + +func TestPelSizeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName)). + Return(mock.Result(mock.ValkeyArray(mock.ValkeyInt64(5), mock.ValkeyNil(), mock.ValkeyNil(), mock.ValkeyNil()))) + + size, err := baseConsumer(db).pelSize(ctx) if err != nil { - t.Fatalf("expected nil error, got %v", err) + t.Fatalf("expected nil, got %v", err) + } + if size != 5 { + t.Fatalf("expected 5, got %d", size) } } -func TestPendingMessagesError(t *testing.T) { +func TestPelSizeZero(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName)). + Return(mock.Result(mock.ValkeyArray(mock.ValkeyInt64(0), mock.ValkeyNil(), mock.ValkeyNil(), mock.ValkeyNil()))) + + size, err := baseConsumer(db).pelSize(ctx) + if err != nil { + t.Fatalf("expected nil, got %v", err) + } + if size != 0 { + t.Fatalf("expected 0, got %d", size) + } +} +func TestPelSizeError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() ctx := context.Background() db := mock.NewClient(ctrl) - db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "STREAMS", streamName, consumer_INITIAL_STREAM_ID)).Return(mock.Result(mock.ValkeyError("error"))) - clientArg := &client.ClientArgs{ - Instance: db, + db.EXPECT().Do(ctx, mock.Match("XPENDING", streamName, groupName)). + Return(mock.Result(mock.ValkeyError("error"))) + + _, err := baseConsumer(db).pelSize(ctx) + if err == nil { + t.Fatal("expected error, got nil") } +} - var S int64 = 1 - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - BatchSizePending: &S, - latestPendingMessageId: consumer_INITIAL_STREAM_ID, +// ── contextSleep ────────────────────────────────────────────────────────────── + +func TestContextSleep_Expiry(t *testing.T) { + if err := contextSleep(context.Background(), time.Millisecond); err != nil { + t.Fatalf("expected nil, got %v", err) } +} - _, err := c.PendingMessages(ctx) - if err == nil { - t.Fatalf("expected error, got nil") +func TestContextSleep_Cancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := contextSleep(ctx, time.Hour); !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) } } -func TestAutoClaimedSuccess(t *testing.T) { +func TestContextSleep_ZeroDuration(t *testing.T) { + if err := contextSleep(context.Background(), 0); err != nil { + t.Fatalf("expected nil for zero duration, got %v", err) + } +} + +// ── advanceIdx ──────────────────────────────────────────────────────────────── + +func TestAdvanceIdx_BelowMax(t *testing.T) { + if advanceIdx(2, 4) != 3 { + t.Fatal("expected 3") + } +} + +func TestAdvanceIdx_AtMax(t *testing.T) { + if advanceIdx(4, 4) != 4 { + t.Fatal("expected 4 (stay at max)") + } +} + +func TestAdvanceIdx_ZeroMax(t *testing.T) { + if advanceIdx(0, 0) != 0 { + t.Fatal("expected 0") + } +} + +// ── Stats ───────────────────────────────────────────────────────────────────── + +func TestStats(t *testing.T) { + c := &Consumer{ratioIdx: 1, pelWaitIdx: 2, backoffIdx: 3, prevPelSize: 42} + s := c.Stats() + if s.RatioIdx != 1 || s.PelWaitIdx != 2 || s.BackoffIdx != 3 || s.PrevPelSize != 42 { + t.Fatalf("unexpected stats: %+v", s) + } +} + +// ── DefaultConsumerName ─────────────────────────────────────────────────────── + +func TestDefaultConsumerName(t *testing.T) { + name := DefaultConsumerName() + if name == "" { + t.Fatal("expected non-empty consumer name") + } + for i, ch := range name { + if ch == '-' && i > 0 { + return + } + } + t.Fatalf("expected hostname-pid format, got %q", name) +} + +// ── Consume helpers ─────────────────────────────────────────────────────────── + +// emptyXRead simulates a XREADGROUP block timeout (nil reply = no messages). +func emptyXRead() valkey.ValkeyResult { + return mock.Result(mock.ValkeyNil()) +} + +// oneMessageXRead simulates XREADGROUP returning one message for the stream. +func oneMessageXRead(msgID string) valkey.ValkeyResult { + return mock.Result(mock.ValkeyArray( + mock.ValkeyArray( + mock.ValkeyString(streamName), + mock.ValkeyArray( + mock.ValkeyArray(mock.ValkeyString(msgID), mock.ValkeyMap(map[string]valkey.ValkeyMessage{})), + ), + ), + )) +} + +// oneMessageXAutoclaim simulates XAUTOCLAIM returning one message and a next cursor. +func oneMessageXAutoclaim(msgID, nextCursor string) valkey.ValkeyResult { + return mock.Result(mock.ValkeyArray( + mock.ValkeyString(nextCursor), + mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyString(msgID), mock.ValkeyMap(make(map[string]valkey.ValkeyMessage)))), + )) +} + +// emptyXAutoclaim simulates XAUTOCLAIM returning no messages with a given next cursor. +func emptyXAutoclaim(nextCursor string) valkey.ValkeyResult { + return mock.Result(mock.ValkeyArray( + mock.ValkeyString(nextCursor), + mock.ValkeyArray(), + )) +} + +// pelSummaryResult simulates XPENDING summary with a given pending count. +func pelSummaryResult(count int64) valkey.ValkeyResult { + return mock.Result(mock.ValkeyArray(mock.ValkeyInt64(count), mock.ValkeyNil(), mock.ValkeyNil(), mock.ValkeyNil())) +} + +func xreadgroupMatcher() gomock.Matcher { + return mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "BLOCK", strconv.FormatInt(0, 10), "STREAMS", streamName, consumer_NEVER_DELIVERED_TO_OTHER_CONSUMERS_SO_FAR) +} + +func xautoclaimMatcher(startID string) gomock.Matcher { + return mock.Match("XAUTOCLAIM", streamName, groupName, consumerName, "100", startID, "COUNT", "1") +} + +func xpendingSummaryMatcher() gomock.Matcher { + return mock.Match("XPENDING", streamName, groupName) +} + +// ── Consume – adaptive loop ─────────────────────────────────────────────────── + +// TestConsume_NewMessages_BelowRatio: new messages below ratio → PEL not triggered. +func TestConsume_NewMessages_BelowRatio(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(oneMessageXRead("1-0")) + + c := baseConsumer(db) + c.RatioSlice = []int{5} + + msgs, err := c.Consume(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(msgs) != 1 { + t.Fatalf("expected 1 message, got %d", len(msgs)) + } + if c.newMsgCounter != 1 { + t.Fatalf("expected newMsgCounter=1, got %d", c.newMsgCounter) + } +} + +// TestConsume_RatioMet_TriggersPEL: reaching the ratio threshold triggers XAUTOCLAIM. +func TestConsume_RatioMet_TriggersPEL(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() ctx := context.Background() db := mock.NewClient(ctrl) - messageId := "1676389477-0" - db.EXPECT().Do(ctx, mock.Match("XAUTOCLAIM", streamName, groupName, consumerName, "100", consumer_INITIAL_STREAM_ID, "COUNT", "1")).Return(mock.Result(mock.ValkeyArray(mock.ValkeyString(messageId), mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyString(messageId), mock.ValkeyMap(make(map[string]valkey.ValkeyMessage))))))) - clientArg := &client.ClientArgs{ - Instance: db, - } - var S int64 = 1 - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - MinIdleAutoClaim: 100, - nextIdAutoClaim: consumer_INITIAL_STREAM_ID, - BatchSizeAutoClaim: &S, - } - - _, err := c.AutoClaimMessages(ctx) + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(oneMessageXRead("1-0")) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(emptyXAutoclaim(consumer_INITIAL_STREAM_ID)) + db.EXPECT().Do(ctx, xpendingSummaryMatcher()).Return(pelSummaryResult(0)) + + c := baseConsumer(db) + c.RatioSlice = []int{1} // every single new message triggers PEL + + msgs, err := c.Consume(ctx) if err != nil { - t.Fatalf("expected nil error, got %v", err) + t.Fatalf("unexpected error: %v", err) + } + if len(msgs) != 1 { + t.Fatalf("expected 1 message, got %d", len(msgs)) + } + if c.newMsgCounter != 0 { + t.Fatalf("expected newMsgCounter reset to 0, got %d", c.newMsgCounter) } } -func TestAutoClaimedError(t *testing.T) { +// TestConsume_NoNewMessages_TriggersPEL: empty XREADGROUP immediately triggers PEL. +func TestConsume_NoNewMessages_TriggersPEL(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(emptyXRead()) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(oneMessageXAutoclaim("2-0", "9-0")) + + c := baseConsumer(db) + msgs, err := c.Consume(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(msgs) != 1 { + t.Fatalf("expected 1 PEL message, got %d", len(msgs)) + } +} + +// TestConsume_BothEmpty_Backoff: both phases empty → backoff index advances. +func TestConsume_BothEmpty_Backoff(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() ctx := context.Background() db := mock.NewClient(ctrl) - db.EXPECT().Do(ctx, mock.Match("XAUTOCLAIM", streamName, groupName, consumerName, "100", consumer_INITIAL_STREAM_ID, "COUNT", "1")).Return(mock.Result(mock.ValkeyError("error"))) - clientArg := &client.ClientArgs{ - Instance: db, + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(emptyXRead()) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(emptyXAutoclaim(consumer_INITIAL_STREAM_ID)) + db.EXPECT().Do(ctx, xpendingSummaryMatcher()).Return(pelSummaryResult(0)) + + c := baseConsumer(db) + c.BackoffSlice = []int{0, 0, 0} + + msgs, err := c.Consume(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) } - var S int64 = 1 - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - MinIdleAutoClaim: 100, - nextIdAutoClaim: consumer_INITIAL_STREAM_ID, - BatchSizeAutoClaim: &S, + if msgs != nil { + t.Fatalf("expected nil msgs, got %v", msgs) } + if c.backoffIdx != 1 { + t.Fatalf("expected backoffIdx=1, got %d", c.backoffIdx) + } +} - _, err := c.AutoClaimMessages(ctx) - if err == nil { - t.Fatalf("expected error, got nil") +// TestConsume_BackoffAtMax_StaysAtMax: backoffIdx does not exceed slice bounds. +func TestConsume_BackoffAtMax_StaysAtMax(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(emptyXRead()) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(emptyXAutoclaim(consumer_INITIAL_STREAM_ID)) + db.EXPECT().Do(ctx, xpendingSummaryMatcher()).Return(pelSummaryResult(0)) + + c := baseConsumer(db) + c.BackoffSlice = []int{0} // single value; must stay at index 0 + + c.Consume(ctx) //nolint:errcheck + if c.backoffIdx != 0 { + t.Fatalf("expected backoffIdx=0 (at max), got %d", c.backoffIdx) } } -func TestConsumeOnlyPending(t *testing.T) { +// TestConsume_BackoffResets: backoffIdx resets when new messages arrive. +func TestConsume_BackoffResets(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(oneMessageXRead("1-0")) + + c := baseConsumer(db) + c.backoffIdx = 3 + c.RatioSlice = []int{5} // ratio not met; no PEL call expected + + c.Consume(ctx) //nolint:errcheck + if c.backoffIdx != 0 { + t.Fatalf("expected backoffIdx reset to 0, got %d", c.backoffIdx) + } +} + +// TestConsume_PELTraversal_SizeChanged_ResetsIndices: PEL size change resets protection indices. +func TestConsume_PELTraversal_SizeChanged_ResetsIndices(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() ctx := context.Background() db := mock.NewClient(ctrl) - var S int64 = 1 - db.EXPECT().Do(ctx, mock.Match("XREADGROUP", "GROUP", groupName, consumerName, "COUNT", "1", "STREAMS", streamName, consumer_INITIAL_STREAM_ID)). - Return(mock.Result(mock.ValkeyArray(mock.ValkeyArray(mock.ValkeyNil(), mock.ValkeyNil())))) + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(emptyXRead()) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(emptyXAutoclaim(consumer_INITIAL_STREAM_ID)) + db.EXPECT().Do(ctx, xpendingSummaryMatcher()).Return(pelSummaryResult(8)) // changed from 10 - clientArg := &client.ClientArgs{ - Instance: db, + c := baseConsumer(db) + c.RatioSlice = []int{5, 10, 20, 50} + c.PelWaitSlice = []int{0, 0, 0, 0} // zero to keep test fast; index behavior is what we're testing + c.ratioIdx = 2 + c.pelWaitIdx = 2 + c.prevPelSize = 10 + + c.Consume(ctx) //nolint:errcheck + + if c.ratioIdx != 0 || c.pelWaitIdx != 0 { + t.Fatalf("expected indices reset to 0, got ratioIdx=%d pelWaitIdx=%d", c.ratioIdx, c.pelWaitIdx) } - c := &Consumer{ - Client: clientArg, - StreamName: streamName, - GroupName: groupName, - ConsumerName: consumerName, - BatchSizePending: &S, - latestPendingMessageId: consumer_INITIAL_STREAM_ID, + if c.prevPelSize != 8 { + t.Fatalf("expected prevPelSize=8, got %d", c.prevPelSize) } +} + +// TestConsume_PELTraversal_SizeUnchanged_AdvancesIndices: unchanged PEL size advances indices. +func TestConsume_PELTraversal_SizeUnchanged_AdvancesIndices(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(emptyXRead()) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(emptyXAutoclaim(consumer_INITIAL_STREAM_ID)) + db.EXPECT().Do(ctx, xpendingSummaryMatcher()).Return(pelSummaryResult(10)) // same as prevPelSize + + c := baseConsumer(db) + c.RatioSlice = []int{5, 10, 20, 50} + c.PelWaitSlice = []int{0, 0, 0, 0} // zero to keep test fast; index behavior is what we're testing + c.ratioIdx = 0 + c.pelWaitIdx = 0 + c.prevPelSize = 10 + + c.Consume(ctx) //nolint:errcheck + + if c.ratioIdx != 1 || c.pelWaitIdx != 1 { + t.Fatalf("expected indices advanced to 1, got ratioIdx=%d pelWaitIdx=%d", c.ratioIdx, c.pelWaitIdx) + } +} + +// TestConsume_PELTraversal_AtMax_StaysAtMax: protection indices do not exceed slice bounds. +func TestConsume_PELTraversal_AtMax_StaysAtMax(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(emptyXRead()) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(emptyXAutoclaim(consumer_INITIAL_STREAM_ID)) + db.EXPECT().Do(ctx, xpendingSummaryMatcher()).Return(pelSummaryResult(10)) + + c := baseConsumer(db) + c.RatioSlice = []int{5, 50} + c.PelWaitSlice = []int{0, 0} // zero to keep test fast; index behavior is what we're testing + c.ratioIdx = 1 // already at max + c.pelWaitIdx = 1 // already at max + c.prevPelSize = 10 + + c.Consume(ctx) //nolint:errcheck + + if c.ratioIdx != 1 || c.pelWaitIdx != 1 { + t.Fatalf("expected indices to stay at max=1, got ratioIdx=%d pelWaitIdx=%d", c.ratioIdx, c.pelWaitIdx) + } +} + +// TestConsume_ContextCancelledDuringBackoff: context cancellation during backoff returns error. +func TestConsume_ContextCancelledDuringBackoff(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancel := context.WithCancel(context.Background()) + db := mock.NewClient(ctrl) + + db.EXPECT().Do(ctx, xreadgroupMatcher()).Return(emptyXRead()) + db.EXPECT().Do(ctx, xautoclaimMatcher(consumer_INITIAL_STREAM_ID)).Return(emptyXAutoclaim(consumer_INITIAL_STREAM_ID)) + db.EXPECT().Do(ctx, xpendingSummaryMatcher()).Return(pelSummaryResult(0)) + + cancel() // cancel before Consume runs so the backoff sleep exits immediately + + c := baseConsumer(db) + c.BackoffSlice = []int{60} // long sleep; would block without cancellation _, err := c.Consume(ctx) - if err != nil { - t.Fatalf("expected nil error, got %v", err) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) } } diff --git a/pkg/errors/error.go b/pkg/errors/error.go index db0fcb3..dea1530 100644 --- a/pkg/errors/error.go +++ b/pkg/errors/error.go @@ -4,7 +4,8 @@ import "errors" var ( ErrStreamNotFound = errors.New("stream key not found") - ErrKeyNotFound = errors.New("key not found") + ErrKeyNotFound = errors.New("key not found") ErrGroupNotCreated = errors.New("group not created") - ErrNoAckedMessage = errors.New("no acked message") + ErrNoAckedMessage = errors.New("no acked message") + ErrInvalidConfig = errors.New("invalid consumer configuration") ) diff --git a/pkg/producer/producer.go b/pkg/producer/producer.go index 84d04f7..6f050b4 100644 --- a/pkg/producer/producer.go +++ b/pkg/producer/producer.go @@ -3,7 +3,7 @@ package producer import ( "context" - "github.com/enerBit/redsumer/v3/pkg/client" + "github.com/enerBit/redsumer/v4/pkg/client" ) type Producer struct { diff --git a/pkg/producer/producer_test.go b/pkg/producer/producer_test.go index 305d350..e5f120c 100644 --- a/pkg/producer/producer_test.go +++ b/pkg/producer/producer_test.go @@ -4,15 +4,13 @@ import ( "context" "testing" - "github.com/enerBit/redsumer/v3/pkg/client" + "github.com/enerBit/redsumer/v4/pkg/client" "github.com/valkey-io/valkey-go/mock" "go.uber.org/mock/gomock" ) const ( streamName string = "stream-test" - groupName string = "group-test" - consumerName string = "consumer-test" ) func TestProduceSuccess(t *testing.T) {