Skip to content

[RFC] add concurrency strategies for overlapping messages (queue, debounce, concurrent)#277

Open
cramforce wants to merge 4 commits intomainfrom
concurrency-strategies
Open

[RFC] add concurrency strategies for overlapping messages (queue, debounce, concurrent)#277
cramforce wants to merge 4 commits intomainfrom
concurrency-strategies

Conversation

@cramforce
Copy link
Collaborator

@cramforce cramforce commented Mar 20, 2026

Summary

  • Add concurrency option to ChatConfig with four strategies: drop (default), queue, debounce, concurrent
  • Add lockScope option (thread | channel) to control lock granularity — defaults to channel for WhatsApp and Telegram (breaking change)
  • Add enqueue/dequeue/queueDepth to StateAdapter interface with implementations across all state adapters
  • Add MessageContext to handler signatures so handlers can see skipped messages

Problem

Two independent problems with the current lock-and-drop model:

1. Overlapping messages are silently lost. When a message arrives while a handler is processing, the SDK throws LockError and the message is discarded. For AI chatbots, this means user follow-ups sent while the model is streaming are permanently lost.

2. Channel-based platforms lock too broadly. The lock key is always threadId. For WhatsApp and Telegram, threadId === channelId, so the entire conversation is locked. For Telegram forum topics, each topic gets its own lock even though the group chat is the natural conversation unit.

Concurrency Strategies

`drop` (default, unchanged)

Lock acquired or `LockError` thrown. Backward compatible.

`queue`

Messages enqueued while a handler is running. On completion, the queue is drained: only the latest message is dispatched, with all intermediate messages provided as `context.skipped`:

const chat = new Chat({ concurrency: 'queue', ... });

chat.onSubscribedMessage(async (thread, message, context) => {
  if (context?.skipped.length) {
    const allMessages = [...context.skipped, message];
  }
});

`debounce`

Every message (including the first) starts/resets a debounce timer. Only the final message in a burst is processed. Uses `waitUntil` to keep the function alive during the debounce window.

const chat = new Chat({
  concurrency: { strategy: 'debounce', debounceMs: 1500 },
  ...
});

`concurrent`

No locking. Every message processed immediately in parallel.

Fine-grained config

interface ConcurrencyConfig {
  strategy: 'drop' | 'queue' | 'debounce' | 'concurrent';
  maxQueueSize?: number;            // Default: 10
  onQueueFull?: 'drop-oldest' | 'drop-newest';  // Default: 'drop-oldest'
  queueEntryTtlMs?: number;         // Default: 90_000 (90s)
  debounceMs?: number;               // Default: 1500
  maxConcurrent?: number;            // Default: Infinity
}

Lock Scope

Problem

For channel-based platforms (WhatsApp, Telegram), `threadId === channelId` in most cases. For Telegram forum topics, each topic previously got an independent lock (`telegram:-100:456`), but the natural conversation unit is the group chat (`telegram:-100`).

Solution

New `lockScope` property on adapters and `ChatConfig`:

  • `thread` (default): lock per `threadId` — per reply chain (Slack), per topic (Telegram forums)
  • `channel`: lock per `channelId` — per channel (Slack), per group chat (Telegram)

Breaking change: WhatsApp and Telegram default to `channel`

// adapter-whatsapp
readonly lockScope = 'channel';

// adapter-telegram
readonly lockScope = 'channel';

For WhatsApp DMs, this is a no-op (`threadId === channelId` already).

For Telegram forum topics, this changes behavior: all topics in a group now share one lock instead of independent per-topic locks. This matches the mental model that the group chat is a single conversation.

Users can override per-instance:

const chat = new Chat({
  lockScope: 'thread',  // Override adapter default
  ...
});

Or dynamically with an async resolver:

const chat = new Chat({
  lockScope: async ({ adapter, isDM, threadId, channelId }) => {
    if (adapter.name === 'telegram' && isDM) return 'thread';
    return 'channel';
  },
  ...
});

StateAdapter additions

interface StateAdapter {
  enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise<number>;
  dequeue(threadId: string): Promise<QueueEntry | null>;
  queueDepth(threadId: string): Promise<number>;
}

Implemented in all four adapters:

  • MemoryStateAdapter — in-process array
  • RedisStateAdapter — Lua script (RPUSH + LTRIM + PEXPIRE)
  • IoRedisStateAdapter — same Lua approach
  • PostgresStateAdapter — `chat_state_queues` table with atomic dequeue, expired entry purging

Handler signature change

All message handler types now accept optional `MessageContext`:

type MentionHandler = (thread, message, context?) => void | Promise<void>;
type SubscribedMessageHandler = (thread, message, context?) => void | Promise<void>;
type DirectMessageHandler = (thread, message, channel, context?) => void | Promise<void>;
type MessageHandler = (thread, message, context?) => void | Promise<void>;

Existing handlers that don't use `context` are unaffected.

Observability

All strategies emit structured log events at `info` level:

Event Strategy Data
`message-queued` queue threadId, lockKey, messageId, queueDepth
`message-dequeued` queue, debounce threadId, lockKey, messageId, skippedCount
`message-dropped` drop, queue (full) threadId, lockKey, messageId, reason
`message-expired` queue, debounce threadId, lockKey, messageId
`message-superseded` debounce threadId, lockKey, droppedId
`message-debouncing` debounce threadId, lockKey, messageId, debounceMs

Backward Compatibility

  • Default concurrency remains `drop` — no behavior change unless configured
  • `onLockConflict` is deprecated but continues to work
  • Handler signatures are backward-compatible (new `context` param is optional)
  • Deduplication always runs regardless of strategy
  • Breaking: WhatsApp and Telegram adapters now default to `lockScope: 'channel'`

Test plan

  • Queue: processes latest message with skipped context after handler finishes
  • Queue: enqueues when lock is held, drains on release
  • Queue: drop-newest when queue is full
  • Queue: drop-oldest evicts oldest entries
  • Queue: expired entries skipped during drain
  • Queue: works with `onSubscribedMessage` handlers
  • Queue: works with `onNewMessage` pattern handlers
  • Debounce: first message debounces (not processed immediately)
  • Debounce: only last message in burst is processed
  • Concurrent: processes without acquiring lock
  • Lock scope: thread scope uses threadId as lock key
  • Lock scope: channel scope uses channelId as lock key (adapter default)
  • Lock scope: channel scope uses channelId as lock key (config override)
  • Lock scope: async resolver function
  • Lock scope: queue operations use channel-scoped lock key
  • State adapters: enqueue/dequeue/queueDepth (memory — 8 behavioral tests)
  • State adapters: enqueue/dequeue/queueDepth (pg — 12 mock-based tests)
  • State adapters: method existence (redis, ioredis)
  • Existing `onLockConflict` tests still pass

@vercel
Copy link
Contributor

vercel bot commented Mar 20, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
chat Ready Ready Preview, Comment, Open in v0 Mar 21, 2026 2:01am
chat-sdk-nextjs-chat Ready Ready Preview, Comment, Open in v0 Mar 21, 2026 2:01am

cramforce and others added 2 commits March 20, 2026 16:13
…ounce, concurrent)

## Problem

When multiple messages arrive on the same thread while a handler is still
processing, the SDK has only one behavior: **lock-and-drop**. The incoming
message is silently discarded (or force-released, which creates uncontrolled
concurrency). This is insufficient for most real-world use cases:

- **AI chatbots** lose user follow-up messages sent while the model is streaming
- **Customer support bots** miss messages entirely, breaking conversation flow
- **Collaborative editing bots** need to coalesce rapid corrections into one action

## Solution

Introduce a new `concurrency` option on `ChatConfig` with four strategies:

### `'drop'` (default, backward-compatible)

Existing behavior. Lock acquired or `LockError` thrown. No changes.

### `'queue'`

Messages that arrive while a handler is running are enqueued in the state
adapter. When the current handler finishes, the queue is drained: **only the
latest message is dispatched**, with all intermediate messages provided as
`context.skipped`. This gives the handler full visibility into what happened
while it was busy, without forcing it to re-process every message sequentially.

```typescript
const chat = new Chat({
  concurrency: 'queue',
  // ...
});

chat.onNewMention(async (thread, message, context) => {
  if (context && context.skipped.length > 0) {
    // "You sent 4 messages while I was thinking. Responding to your latest."
    const allMessages = [...context.skipped, message];
    // Pass all messages to the LLM for full context
  }
});
```

Flow:
```
A arrives  → acquire lock → process A
B arrives  → lock busy → enqueue B
C arrives  → lock busy → enqueue C
D arrives  → lock busy → enqueue D
A done     → drain: [B, C, D] → handler(D, { skipped: [B, C] })
D done     → queue empty → release lock
```

### `'debounce'`

Every message (including the first) starts or resets a debounce timer. Only the
**final message in a burst** is processed. The lock-holding function stays alive
through `waitUntil` during the debounce window.

```typescript
const chat = new Chat({
  concurrency: { strategy: 'debounce', debounceMs: 1500 },
  // ...
});
```

Flow:
```
A arrives  → acquire lock → store A as pending → sleep(debounceMs)
B arrives  → lock busy → overwrite pending with B (A dropped)
C arrives  → lock busy → overwrite pending with C (B dropped)
             ... debounceMs elapses with no new message ...
           → process C → release lock
```

### `'concurrent'`

No locking at all. Every message is processed immediately in its own handler
invocation. Suitable for stateless handlers (lookups, translations) where
thread ordering doesn't matter.

```typescript
const chat = new Chat({
  concurrency: 'concurrent',
  // ...
});
```

## API Surface

### ChatConfig

```typescript
interface ChatConfig {
  concurrency?: ConcurrencyStrategy | ConcurrencyConfig;
  /** @deprecated Use `concurrency` instead */
  onLockConflict?: 'force' | 'drop' | ((threadId, message) => ...);
}

type ConcurrencyStrategy = 'drop' | 'queue' | 'debounce' | 'concurrent';

interface ConcurrencyConfig {
  strategy: ConcurrencyStrategy;
  maxQueueSize?: number;           // Default: 10
  onQueueFull?: 'drop-oldest' | 'drop-newest';  // Default: 'drop-oldest'
  queueEntryTtlMs?: number;        // Default: 90_000 (90s)
  debounceMs?: number;              // Default: 1500
  maxConcurrent?: number;           // Default: Infinity
}
```

### MessageContext (new, passed to handlers)

```typescript
interface MessageContext {
  skipped: Message[];               // Intermediate messages, chronological
  totalSinceLastHandler: number;    // skipped.length + 1
}
```

All handler types (`MentionHandler`, `MessageHandler`, `SubscribedMessageHandler`,
`DirectMessageHandler`) now accept an optional `MessageContext` as their last
parameter. Existing handlers that don't use it are unaffected.

### StateAdapter (new methods)

```typescript
interface StateAdapter {
  enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise<number>;
  dequeue(threadId: string): Promise<QueueEntry | null>;
  queueDepth(threadId: string): Promise<number>;
}
```

Implemented across all four state adapters:
- **MemoryStateAdapter**: in-process array
- **RedisStateAdapter**: Lua script (RPUSH + LTRIM + PEXPIRE)
- **IoRedisStateAdapter**: same Lua approach
- **PostgresStateAdapter**: new `chat_state_queues` table with atomic dequeue

## Architecture

`handleIncomingMessage` was refactored into composable pieces:

- `dispatchToHandlers()` — shared handler dispatch logic (mention detection,
  subscription routing, pattern matching). Extracted from the old monolithic
  method so all strategies reuse it.
- `handleDrop()` — original lock-or-fail path (preserves `onLockConflict` compat)
- `handleQueueOrDebounce()` — enqueue if busy, drain or debounce after
- `handleConcurrent()` — skip locking entirely
- `drainQueue()` — collect all pending, dispatch latest with skipped context
- `debounceLoop()` — sleep/check/repeat until no new messages arrive

## Queue Entry TTL

Queued messages have a configurable TTL (`queueEntryTtlMs`, default 90s). Stale
entries are discarded on dequeue with a `message-expired` log event. This
prevents unbounded accumulation and ensures handlers don't process messages
that are no longer relevant.

## Observability

All strategies emit structured log events at `info` level:

| Event                 | Strategy         | Data                                  |
|-----------------------|------------------|---------------------------------------|
| `message-queued`      | queue            | threadId, messageId, queueDepth       |
| `message-dequeued`    | queue, debounce  | threadId, messageId, skippedCount     |
| `message-dropped`     | drop, queue      | threadId, messageId, reason           |
| `message-expired`     | queue, debounce  | threadId, messageId                   |
| `message-superseded`  | debounce         | threadId, droppedId                   |
| `message-debouncing`  | debounce         | threadId, messageId, debounceMs       |
| `message-debounce-reset` | debounce      | threadId, messageId                   |

## Backward Compatibility

- Default remains `'drop'` — zero breaking changes for existing users
- `onLockConflict` continues to work but is marked `@deprecated`
- Handler signatures are backward-compatible (new `context` param is optional)
- Deduplication always runs regardless of strategy

## Files Changed

- `packages/chat/src/types.ts` — new types, updated handler signatures
- `packages/chat/src/chat.ts` — strategy routing, drain/debounce loops
- `packages/chat/src/index.ts` — export new types
- `packages/chat/src/mock-adapter.ts` — queue methods for test mock
- `packages/state-memory/src/index.ts` — in-memory queue
- `packages/state-redis/src/index.ts` — Redis queue (Lua)
- `packages/state-ioredis/src/index.ts` — ioredis queue (Lua)
- `packages/state-pg/src/index.ts` — Postgres queue table
- `packages/chat/src/chat.test.ts` — tests for queue, debounce, concurrent

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e methods

Add tests across all state adapters and the Chat class:

**MemoryStateAdapter** (8 new tests):
- enqueue/dequeue single entry
- dequeue from empty queue returns null
- dequeue from nonexistent thread returns null
- queueDepth returns 0 for empty queue
- FIFO ordering across multiple entries
- maxSize trimming (keeps newest)
- maxSize=1 debounce behavior (last-write-wins)
- queue isolation by thread
- queue cleared on disconnect

**PostgresStateAdapter** (8 new tests):
- INSERT query for enqueue
- overflow trimming query
- depth return value
- parsed entry from dequeue
- null from empty dequeue
- atomic DELETE-RETURNING for dequeue
- queueDepth return value
- zero depth for empty queue

**RedisStateAdapter / IoRedisStateAdapter** (3+3 existence checks):
- enqueue, dequeue, queueDepth method existence

**Chat concurrency** (5 new tests):
- drop-newest policy when queue is full
- drop-oldest policy evicts oldest entries
- expired entries skipped during drain
- onNewMessage pattern handlers receive context
- onSubscribedMessage handlers receive skipped context

Total new tests: 27 (780 chat + 33 memory + 59 pg)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant