[RFC] add concurrency strategies for overlapping messages (queue, debounce, concurrent)#277
Open
[RFC] add concurrency strategies for overlapping messages (queue, debounce, concurrent)#277
Conversation
Contributor
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
…ounce, concurrent)
## Problem
When multiple messages arrive on the same thread while a handler is still
processing, the SDK has only one behavior: **lock-and-drop**. The incoming
message is silently discarded (or force-released, which creates uncontrolled
concurrency). This is insufficient for most real-world use cases:
- **AI chatbots** lose user follow-up messages sent while the model is streaming
- **Customer support bots** miss messages entirely, breaking conversation flow
- **Collaborative editing bots** need to coalesce rapid corrections into one action
## Solution
Introduce a new `concurrency` option on `ChatConfig` with four strategies:
### `'drop'` (default, backward-compatible)
Existing behavior. Lock acquired or `LockError` thrown. No changes.
### `'queue'`
Messages that arrive while a handler is running are enqueued in the state
adapter. When the current handler finishes, the queue is drained: **only the
latest message is dispatched**, with all intermediate messages provided as
`context.skipped`. This gives the handler full visibility into what happened
while it was busy, without forcing it to re-process every message sequentially.
```typescript
const chat = new Chat({
concurrency: 'queue',
// ...
});
chat.onNewMention(async (thread, message, context) => {
if (context && context.skipped.length > 0) {
// "You sent 4 messages while I was thinking. Responding to your latest."
const allMessages = [...context.skipped, message];
// Pass all messages to the LLM for full context
}
});
```
Flow:
```
A arrives → acquire lock → process A
B arrives → lock busy → enqueue B
C arrives → lock busy → enqueue C
D arrives → lock busy → enqueue D
A done → drain: [B, C, D] → handler(D, { skipped: [B, C] })
D done → queue empty → release lock
```
### `'debounce'`
Every message (including the first) starts or resets a debounce timer. Only the
**final message in a burst** is processed. The lock-holding function stays alive
through `waitUntil` during the debounce window.
```typescript
const chat = new Chat({
concurrency: { strategy: 'debounce', debounceMs: 1500 },
// ...
});
```
Flow:
```
A arrives → acquire lock → store A as pending → sleep(debounceMs)
B arrives → lock busy → overwrite pending with B (A dropped)
C arrives → lock busy → overwrite pending with C (B dropped)
... debounceMs elapses with no new message ...
→ process C → release lock
```
### `'concurrent'`
No locking at all. Every message is processed immediately in its own handler
invocation. Suitable for stateless handlers (lookups, translations) where
thread ordering doesn't matter.
```typescript
const chat = new Chat({
concurrency: 'concurrent',
// ...
});
```
## API Surface
### ChatConfig
```typescript
interface ChatConfig {
concurrency?: ConcurrencyStrategy | ConcurrencyConfig;
/** @deprecated Use `concurrency` instead */
onLockConflict?: 'force' | 'drop' | ((threadId, message) => ...);
}
type ConcurrencyStrategy = 'drop' | 'queue' | 'debounce' | 'concurrent';
interface ConcurrencyConfig {
strategy: ConcurrencyStrategy;
maxQueueSize?: number; // Default: 10
onQueueFull?: 'drop-oldest' | 'drop-newest'; // Default: 'drop-oldest'
queueEntryTtlMs?: number; // Default: 90_000 (90s)
debounceMs?: number; // Default: 1500
maxConcurrent?: number; // Default: Infinity
}
```
### MessageContext (new, passed to handlers)
```typescript
interface MessageContext {
skipped: Message[]; // Intermediate messages, chronological
totalSinceLastHandler: number; // skipped.length + 1
}
```
All handler types (`MentionHandler`, `MessageHandler`, `SubscribedMessageHandler`,
`DirectMessageHandler`) now accept an optional `MessageContext` as their last
parameter. Existing handlers that don't use it are unaffected.
### StateAdapter (new methods)
```typescript
interface StateAdapter {
enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise<number>;
dequeue(threadId: string): Promise<QueueEntry | null>;
queueDepth(threadId: string): Promise<number>;
}
```
Implemented across all four state adapters:
- **MemoryStateAdapter**: in-process array
- **RedisStateAdapter**: Lua script (RPUSH + LTRIM + PEXPIRE)
- **IoRedisStateAdapter**: same Lua approach
- **PostgresStateAdapter**: new `chat_state_queues` table with atomic dequeue
## Architecture
`handleIncomingMessage` was refactored into composable pieces:
- `dispatchToHandlers()` — shared handler dispatch logic (mention detection,
subscription routing, pattern matching). Extracted from the old monolithic
method so all strategies reuse it.
- `handleDrop()` — original lock-or-fail path (preserves `onLockConflict` compat)
- `handleQueueOrDebounce()` — enqueue if busy, drain or debounce after
- `handleConcurrent()` — skip locking entirely
- `drainQueue()` — collect all pending, dispatch latest with skipped context
- `debounceLoop()` — sleep/check/repeat until no new messages arrive
## Queue Entry TTL
Queued messages have a configurable TTL (`queueEntryTtlMs`, default 90s). Stale
entries are discarded on dequeue with a `message-expired` log event. This
prevents unbounded accumulation and ensures handlers don't process messages
that are no longer relevant.
## Observability
All strategies emit structured log events at `info` level:
| Event | Strategy | Data |
|-----------------------|------------------|---------------------------------------|
| `message-queued` | queue | threadId, messageId, queueDepth |
| `message-dequeued` | queue, debounce | threadId, messageId, skippedCount |
| `message-dropped` | drop, queue | threadId, messageId, reason |
| `message-expired` | queue, debounce | threadId, messageId |
| `message-superseded` | debounce | threadId, droppedId |
| `message-debouncing` | debounce | threadId, messageId, debounceMs |
| `message-debounce-reset` | debounce | threadId, messageId |
## Backward Compatibility
- Default remains `'drop'` — zero breaking changes for existing users
- `onLockConflict` continues to work but is marked `@deprecated`
- Handler signatures are backward-compatible (new `context` param is optional)
- Deduplication always runs regardless of strategy
## Files Changed
- `packages/chat/src/types.ts` — new types, updated handler signatures
- `packages/chat/src/chat.ts` — strategy routing, drain/debounce loops
- `packages/chat/src/index.ts` — export new types
- `packages/chat/src/mock-adapter.ts` — queue methods for test mock
- `packages/state-memory/src/index.ts` — in-memory queue
- `packages/state-redis/src/index.ts` — Redis queue (Lua)
- `packages/state-ioredis/src/index.ts` — ioredis queue (Lua)
- `packages/state-pg/src/index.ts` — Postgres queue table
- `packages/chat/src/chat.test.ts` — tests for queue, debounce, concurrent
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e methods Add tests across all state adapters and the Chat class: **MemoryStateAdapter** (8 new tests): - enqueue/dequeue single entry - dequeue from empty queue returns null - dequeue from nonexistent thread returns null - queueDepth returns 0 for empty queue - FIFO ordering across multiple entries - maxSize trimming (keeps newest) - maxSize=1 debounce behavior (last-write-wins) - queue isolation by thread - queue cleared on disconnect **PostgresStateAdapter** (8 new tests): - INSERT query for enqueue - overflow trimming query - depth return value - parsed entry from dequeue - null from empty dequeue - atomic DELETE-RETURNING for dequeue - queueDepth return value - zero depth for empty queue **RedisStateAdapter / IoRedisStateAdapter** (3+3 existence checks): - enqueue, dequeue, queueDepth method existence **Chat concurrency** (5 new tests): - drop-newest policy when queue is full - drop-oldest policy evicts oldest entries - expired entries skipped during drain - onNewMessage pattern handlers receive context - onSubscribedMessage handlers receive skipped context Total new tests: 27 (780 chat + 33 memory + 59 pg) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
0ad1cf7 to
0df362d
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
concurrencyoption toChatConfigwith four strategies:drop(default),queue,debounce,concurrentlockScopeoption (thread|channel) to control lock granularity — defaults tochannelfor WhatsApp and Telegram (breaking change)enqueue/dequeue/queueDepthtoStateAdapterinterface with implementations across all state adaptersMessageContextto handler signatures so handlers can see skipped messagesProblem
Two independent problems with the current lock-and-drop model:
1. Overlapping messages are silently lost. When a message arrives while a handler is processing, the SDK throws
LockErrorand the message is discarded. For AI chatbots, this means user follow-ups sent while the model is streaming are permanently lost.2. Channel-based platforms lock too broadly. The lock key is always
threadId. For WhatsApp and Telegram,threadId === channelId, so the entire conversation is locked. For Telegram forum topics, each topic gets its own lock even though the group chat is the natural conversation unit.Concurrency Strategies
`drop` (default, unchanged)
Lock acquired or `LockError` thrown. Backward compatible.
`queue`
Messages enqueued while a handler is running. On completion, the queue is drained: only the latest message is dispatched, with all intermediate messages provided as `context.skipped`:
`debounce`
Every message (including the first) starts/resets a debounce timer. Only the final message in a burst is processed. Uses `waitUntil` to keep the function alive during the debounce window.
`concurrent`
No locking. Every message processed immediately in parallel.
Fine-grained config
Lock Scope
Problem
For channel-based platforms (WhatsApp, Telegram), `threadId === channelId` in most cases. For Telegram forum topics, each topic previously got an independent lock (`telegram:-100:456`), but the natural conversation unit is the group chat (`telegram:-100`).
Solution
New `lockScope` property on adapters and `ChatConfig`:
Breaking change: WhatsApp and Telegram default to `channel`
For WhatsApp DMs, this is a no-op (`threadId === channelId` already).
For Telegram forum topics, this changes behavior: all topics in a group now share one lock instead of independent per-topic locks. This matches the mental model that the group chat is a single conversation.
Users can override per-instance:
Or dynamically with an async resolver:
StateAdapter additions
Implemented in all four adapters:
Handler signature change
All message handler types now accept optional `MessageContext`:
Existing handlers that don't use `context` are unaffected.
Observability
All strategies emit structured log events at `info` level:
Backward Compatibility
Test plan