diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 8e4babd6192..0fca0ee0e90 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -897,6 +897,108 @@ export namespace MessageV2 { return result } + // ── Lightweight conversation loading ────────────────────────────────── + // + // filterCompactedLazy avoids materializing the full WithParts[] array. + // Phase 1: scan message *info only* (no parts) newest→oldest to find + // the compaction boundary and collect message IDs. + // Phase 2: load parts only for messages after the boundary. + // + // For a 7,000-message session with no compaction this still loads all + // parts, but for compacted sessions it skips everything before the + // summary — which is the common case for long-running sessions. + + /** Scan info-only (no parts) newest→oldest. Returns message rows from + * the compaction boundary forward, in oldest-first order. */ + async function scanBoundary(sessionID: SessionID) { + const size = 50 + let before: string | undefined + const rows: (typeof MessageTable.$inferSelect)[] = [] + const completed = new Set() + + while (true) { + const cursor_before = before ? cursor.decode(before) : undefined + const where = cursor_before + ? and(eq(MessageTable.session_id, sessionID), older(cursor_before)) + : eq(MessageTable.session_id, sessionID) + const batch = Database.use((db) => + db + .select() + .from(MessageTable) + .where(where) + .orderBy(desc(MessageTable.time_created), desc(MessageTable.id)) + .limit(size + 1) + .all(), + ) + if (batch.length === 0) break + const more = batch.length > size + const page = more ? batch.slice(0, size) : batch + + let found = false + for (const row of page) { + rows.push(row) + const msg = info(row) + if ( + msg.role === "assistant" && + (msg as Assistant).summary && + (msg as Assistant).finish && + !(msg as Assistant).error + ) + completed.add(msg.parentID) + if (msg.role === "user" && completed.has(msg.id)) { + // Potential boundary — need to check parts for compaction type. + // Only load parts for THIS message to check. + const partRows = Database.use((db) => + db.select().from(PartTable).where(eq(PartTable.message_id, row.id)).all(), + ) + if (partRows.some((p) => (p.data as any).type === "compaction")) { + found = true + break + } + } + } + if (found || !more) break + const tail = page.at(-1)! + before = cursor.encode({ id: tail.id, time: tail.time_created }) + } + rows.reverse() + return rows + } + + /** Load conversation from compaction boundary forward, with full parts. + * For compacted sessions: two-pass (info scan → selective hydrate) is + * much cheaper. For uncompacted sessions: falls back to the original + * single-pass filterCompacted(stream()) to avoid the extra info scan. */ + export async function filterCompactedLazy(sessionID: SessionID) { + // Quick probe: check newest 50 message infos for any compaction summary. + // One DB query, no parts loaded. + const probe = Database.use((db) => + db + .select() + .from(MessageTable) + .where(eq(MessageTable.session_id, sessionID)) + .orderBy(desc(MessageTable.time_created), desc(MessageTable.id)) + .limit(50) + .all(), + ) + const compacted = probe.some((row) => { + const msg = info(row) + return ( + msg.role === "assistant" && (msg as Assistant).summary && (msg as Assistant).finish && !(msg as Assistant).error + ) + }) + if (!compacted) { + // No recent compaction summary — fall back to single-pass which + // loads parts alongside info (avoids 155+ wasted info-only queries + // for uncompacted sessions). + return filterCompacted(stream(sessionID)) + } + // Compacted session: two-pass is efficient — scan info to find boundary, + // then hydrate only messages after it. + const rows = await scanBoundary(sessionID) + return hydrate(rows) + } + export function fromError(e: unknown, ctx: { providerID: ProviderID }): NonNullable { switch (true) { case e instanceof DOMException && e.name === "AbortError": diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 5bde2608f0b..a1eabe37e13 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -294,11 +294,20 @@ export namespace SessionPrompt { let step = 0 const session = await Session.get(sessionID) + // filterCompactedLazy scans message info without loading parts to find + // the compaction boundary, then hydrates parts only for messages after + // it. For a 7K-message session with compaction at message #100, this + // loads ~100 messages' parts instead of all 7K. + let msgs = await MessageV2.filterCompactedLazy(sessionID) + let needsFullReload = false while (true) { + if (needsFullReload) { + msgs = await MessageV2.filterCompactedLazy(sessionID) + needsFullReload = false + } SessionStatus.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) if (abort.aborted) break - let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID)) let lastUser: MessageV2.User | undefined let lastAssistant: MessageV2.Assistant | undefined @@ -525,6 +534,7 @@ export namespace SessionPrompt { } satisfies MessageV2.TextPart) } + needsFullReload = true continue } @@ -539,6 +549,7 @@ export namespace SessionPrompt { overflow: task.overflow, }) if (result === "stop") break + needsFullReload = true continue } @@ -554,6 +565,7 @@ export namespace SessionPrompt { model: lastUser.model, auto: true, }) + needsFullReload = true continue } @@ -663,6 +675,24 @@ export namespace SessionPrompt { system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT) } + // Context-window windowing: only convert messages that fit in the + // LLM context window to ModelMessage format. This avoids creating + // ~300MB of wrapper objects for messages the provider will discard. + const budget = (model.limit.input || model.limit.context || 200_000) * 4 // chars + let used = 0 + let windowStart = msgs.length + for (let i = msgs.length - 1; i >= 0; i--) { + for (const part of msgs[i].parts) { + if (part.type === "text") used += part.text.length + else if (part.type === "tool" && part.state.status === "completed") + used += (part.state.output?.length ?? 0) + JSON.stringify(part.state.input).length + else if (part.type === "reasoning") used += part.text.length + } + if (used > budget) break + windowStart = i + } + const window = windowStart > 0 ? msgs.slice(windowStart) : msgs + const result = await processor.process({ user: lastUser, agent, @@ -671,7 +701,7 @@ export namespace SessionPrompt { sessionID, system, messages: [ - ...MessageV2.toModelMessages(msgs, model), + ...MessageV2.toModelMessages(window, model), ...(isLastStep ? [ { @@ -719,6 +749,17 @@ export namespace SessionPrompt { auto: true, overflow: !processor.message.finish, }) + needsFullReload = true + } else { + // Normal tool-call continuation: fetch the latest page to pick up + // new assistant messages and tool results, then merge with the + // cached history to avoid reloading the entire conversation. + const fresh = await MessageV2.page({ sessionID, limit: 200 }) + const existing = new Map(msgs.map((m) => [m.info.id, m])) + for (const msg of fresh.items) existing.set(msg.info.id, msg) + msgs = Array.from(existing.values()).sort((a, b) => + a.info.id < b.info.id ? -1 : a.info.id > b.info.id ? 1 : 0, + ) } continue }