From df831941c8a27e536d105c7ce4ea694d3d296b52 Mon Sep 17 00:00:00 2001 From: Lily Shen Date: Tue, 14 Apr 2026 12:49:45 -0700 Subject: [PATCH] =?UTF-8?q?feat(agent-memory):=20ADR-003=20Phase=202a=20?= =?UTF-8?q?=E2=80=94=20POST=20/memory/sync=20with=20mode=20+=20dedup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the kernel promotion endpoint every runtime driver will target. Phase 2b (new OpenClaw tools that call it) is a separate change in the submodule. ## What ships **New endpoint: `POST /api/agents/runtime/memory/sync`** - Required body: `{ sections, sourceRuntime?, mode }` - `mode: "full"` — replaces the entire sections envelope. Sections omitted from the payload are cleared. Use for driver snapshots. - `mode: "patch"` — merges with existing state. Single-object sections $set per-key (siblings preserved). Array sections (daily, relationships) merge element-wise keyed by `date` / `otherInstanceId`. Use for incremental promotion. - Idempotent within the same UTC day: repeated identical payloads return `{ deduped: true }` without writing. Dedup key is `(dayBucket, sourceRuntime, sha256-trunc32 of canonical-stringified body)`. Canonical stringify sorts object keys recursively so semantically identical payloads with different key order collapse to one write. **Supporting work** - `AgentMemory.lastSyncKey` + `lastSyncAt` track the dedup cache state. - Strict `'YYYY-MM-DD'` validation on `daily[].date` — regex + Date round-trip so calendar-invalid dates (2026-02-30) are rejected. - `mergePatchSections` in agentMemoryService pure-fns the element-level array merge; unit-tested separately from route wiring. - Full-mode handler always updates v1 `content` mirror from final sections, including blanking it when the new snapshot omits long_term. Avoids phantom data on GET when v1 readers still exist. ## Critical fix caught by review - `PUT /memory` and `nativeRuntimeService.commonly_write_memory` now clear `lastSyncKey`/`lastSyncAt` on every write. Without this, a non-sync writer mutating state between two identical syncs caused the second sync to be silently deduped, leaving the kernel stuck on the intervening write while the driver believed its promotion succeeded. Regression test locks the invariant. ## Other review-driven changes - Canonical stringify (json-stable-order) replaces JSON.stringify in the dedup key so webhook/Python drivers with different emit order don't silently miss dedup. - Full-mode v1 mirror rule made symmetric with sections (full replace → full mirror refresh, including empty when long_term omitted). - Structured logs now fire on both dedup hits and validation rejects (REVIEW.md §Maintainability, kernel-surface observability). - Concurrency caveat noted inline on `mergePatchSections` — read-merge- write assumes per-instance serialization; revisit when webhook drivers arrive in Phase 2b/later. ## Tests (17 new; 95/95 agent-memory pass; 705/705 backend total) Unit (on top of existing): - `isValidYMD` (4 tests — accept valid, reject malformed, reject calendar-invalid incl. leap years, reject non-strings) - `mergePatchSections` (6 tests — single-object per-key merge, replacement, daily by date, relationships by otherInstanceId, missing-existing, preserve-omitted) - `computeSyncDedupKey` (8 tests — same-day-same-payload collapse, mode/runtime/day/content sensitivity, key prefix, canonical-stringify order invariance across object keys AND across top-level sections) Integration (on top of existing): - Rejects without sections, without valid mode, with malformed date, with calendar-invalid date - full mode wipes envelope, patch mode preserves siblings + merges daily by date and relationships by otherInstanceId - full mode WITHOUT long_term blanks v1 content mirror (Important fix) - Patch mode with long_term mirrors v1 content - Same-day dedup returns deduped:true; cross-content re-sync does not - PUT /memory invalidates sync dedup cache (Critical fix) - Server-stamps byteSize; rejects unauthenticated Co-Authored-By: Claude Opus 4.6 (1M context) --- .../integration/agent-memory-envelope.test.js | 310 ++++++++++++++++++ .../unit/services/agentMemoryService.test.ts | 170 ++++++++++ backend/models/AgentMemory.ts | 7 + backend/routes/agentsRuntime.ts | 125 ++++++- backend/services/agentMemoryService.ts | 94 ++++++ backend/services/nativeRuntimeService.ts | 5 +- 6 files changed, 707 insertions(+), 4 deletions(-) diff --git a/backend/__tests__/integration/agent-memory-envelope.test.js b/backend/__tests__/integration/agent-memory-envelope.test.js index 194f657a..8d45b843 100644 --- a/backend/__tests__/integration/agent-memory-envelope.test.js +++ b/backend/__tests__/integration/agent-memory-envelope.test.js @@ -548,4 +548,314 @@ describe('AgentMemory envelope — GET/PUT /memory + backfill', () => { expect(after.sections).toBeUndefined(); }); }); + + // ------------------------------------------------------------------- // + // POST /memory/sync (ADR-003 Phase 2) // + // ------------------------------------------------------------------- // + + describe('POST /memory/sync', () => { + it('rejects requests without sections', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ mode: 'full' }); + expect(res.status).toBe(400); + expect(res.body.message).toMatch(/sections is required/); + }); + + it('rejects requests without a valid mode', async () => { + const resA = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'x' } } }); + expect(resA.status).toBe(400); + expect(resA.body.message).toMatch(/mode must be 'full' or 'patch'/); + + const resB = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'x' } }, mode: 'merge' }); + expect(resB.status).toBe(400); + }); + + it('rejects invalid YYYY-MM-DD date on daily entries', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { daily: [{ date: '2026/04/14', content: 'x' }] }, + mode: 'full', + }); + expect(res.status).toBe(400); + expect(res.body.message).toMatch(/YYYY-MM-DD/); + }); + + it('also rejects calendar-invalid dates (feb 30)', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { daily: [{ date: '2026-02-30', content: 'x' }] }, + mode: 'full', + }); + expect(res.status).toBe(400); + expect(res.body.message).toMatch(/YYYY-MM-DD/); + }); + + it('full mode: replaces the entire sections envelope', async () => { + // Seed with long_term + shared. + await request(app) + .put('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'old' }, shared: { content: 'bio', visibility: 'public' } } }) + .expect(200); + + // full sync with only dedup_state — long_term/shared should be gone. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { dedup_state: { content: '## Commented\n{}' } }, + mode: 'full', + sourceRuntime: 'openclaw', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term).toBeUndefined(); + expect(get.body.sections.shared).toBeUndefined(); + expect(get.body.sections.dedup_state.content).toContain('Commented'); + expect(get.body.sourceRuntime).toBe('openclaw'); + }); + + it('patch mode: preserves sibling sections and merges daily by date', async () => { + // Seed. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + long_term: { content: 'keep me' }, + daily: [ + { date: '2026-04-12', content: 'mon' }, + { date: '2026-04-13', content: 'tue' }, + ], + }, + mode: 'full', + }) + .expect(200); + + // Patch with updated tue + new wed; long_term should survive. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + daily: [ + { date: '2026-04-13', content: 'tue-updated' }, + { date: '2026-04-14', content: 'wed' }, + ], + }, + mode: 'patch', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.content).toBe('keep me'); + const byDate = Object.fromEntries(get.body.sections.daily.map((d) => [d.date, d.content])); + expect(byDate['2026-04-12']).toBe('mon'); + expect(byDate['2026-04-13']).toBe('tue-updated'); + expect(byDate['2026-04-14']).toBe('wed'); + }); + + it('patch mode: merges relationships by otherInstanceId', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + relationships: [ + { otherInstanceId: 'nova', notes: 'old nova' }, + { otherInstanceId: 'theo', notes: 'old theo' }, + ], + }, + mode: 'full', + }) + .expect(200); + + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + relationships: [ + { otherInstanceId: 'nova', notes: 'new nova' }, + { otherInstanceId: 'liz', notes: 'new liz' }, + ], + }, + mode: 'patch', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + const byId = Object.fromEntries( + get.body.sections.relationships.map((r) => [r.otherInstanceId, r.notes]), + ); + expect(byId.nova).toBe('new nova'); + expect(byId.theo).toBe('old theo'); + expect(byId.liz).toBe('new liz'); + }); + + it('mirrors v1 content when patch mode includes long_term', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: 'sync-mirrored' } }, + mode: 'patch', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.content).toBe('sync-mirrored'); + }); + + it('dedupes identical payloads within the same day bucket', async () => { + const body = { + sections: { long_term: { content: 'stable' } }, + sourceRuntime: 'openclaw', + mode: 'patch', + }; + + const first = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body); + expect(first.status).toBe(200); + expect(first.body.ok).toBe(true); + expect(first.body.deduped).toBeUndefined(); + + const second = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body); + expect(second.status).toBe(200); + expect(second.body.ok).toBe(true); + expect(second.body.deduped).toBe(true); + + // Count should still be 1. + expect(await AgentMemory.countDocuments({})).toBe(1); + }); + + it('does NOT dedupe when the payload content changes', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: 'first' } }, sourceRuntime: 'openclaw', mode: 'patch', + }) + .expect(200); + + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: 'second' } }, sourceRuntime: 'openclaw', mode: 'patch', + }); + expect(res.body.deduped).toBeUndefined(); + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.content).toBe('second'); + }); + + it('server-stamps byteSize on sync writes', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: '😀 hi', byteSize: 9999 } }, + mode: 'full', + }) + .expect(200); + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.byteSize).toBe(Buffer.byteLength('😀 hi', 'utf8')); + }); + + it('rejects unauthenticated sync requests', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .send({ sections: { long_term: { content: 'x' } }, mode: 'full' }); + expect(res.status).toBe(401); + }); + + it('full mode without long_term wipes the v1 content mirror', async () => { + // Seed with v1 content via mirror. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'v1 mirror source' } }, mode: 'full' }) + .expect(200); + let get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.content).toBe('v1 mirror source'); + + // full sync that omits long_term — v1 content must be blanked, not stale. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { dedup_state: { content: '## Commented\n{}' } }, mode: 'full' }) + .expect(200); + get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.content).toBe(''); + expect(get.body.sections.long_term).toBeUndefined(); + }); + + it('PUT /memory invalidates the sync dedup cache (cross-writer safety)', async () => { + const body = { + sections: { long_term: { content: 'dedup-me' } }, + sourceRuntime: 'openclaw', + mode: 'patch', + }; + + // Sync once so lastSyncKey is populated. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body) + .expect(200); + + // A non-sync writer mutates sections directly (human operator / v1 tool). + await request(app) + .put('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'stomped by PUT' } } }) + .expect(200); + + // The same sync payload must NOT be deduped now — kernel state drifted. + const second = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body); + expect(second.body.deduped).toBeUndefined(); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.content).toBe('dedup-me'); + }); + }); }); diff --git a/backend/__tests__/unit/services/agentMemoryService.test.ts b/backend/__tests__/unit/services/agentMemoryService.test.ts index 49c2ee8b..96901360 100644 --- a/backend/__tests__/unit/services/agentMemoryService.test.ts +++ b/backend/__tests__/unit/services/agentMemoryService.test.ts @@ -7,6 +7,9 @@ const { buildSectionsFromLegacyContent, mirrorContentFromSections, stampSectionsForWrite, + mergePatchSections, + computeSyncDedupKey, + isValidYMD, } = require('../../../services/agentMemoryService'); describe('parseContentIntoSections', () => { @@ -225,3 +228,170 @@ describe('stampSectionsForWrite', () => { expect(b).toEqual(a); }); }); + +describe('isValidYMD', () => { + it('accepts valid YYYY-MM-DD', () => { + expect(isValidYMD('2026-04-14')).toBe(true); + expect(isValidYMD('2000-01-01')).toBe(true); + expect(isValidYMD('2024-02-29')).toBe(true); // leap year + }); + + it('rejects malformed strings', () => { + expect(isValidYMD('2026-4-14')).toBe(false); // not zero-padded + expect(isValidYMD('2026/04/14')).toBe(false); + expect(isValidYMD('14-04-2026')).toBe(false); + expect(isValidYMD('')).toBe(false); + expect(isValidYMD('Apr 14')).toBe(false); + }); + + it('rejects calendar-invalid dates', () => { + expect(isValidYMD('2026-02-30')).toBe(false); // no Feb 30 + expect(isValidYMD('2026-13-01')).toBe(false); // month 13 + expect(isValidYMD('2026-00-01')).toBe(false); // month 0 + expect(isValidYMD('2026-04-00')).toBe(false); // day 0 + expect(isValidYMD('2023-02-29')).toBe(false); // non-leap year + }); + + it('rejects non-strings', () => { + expect(isValidYMD(20260414)).toBe(false); + expect(isValidYMD(undefined)).toBe(false); + expect(isValidYMD(null)).toBe(false); + expect(isValidYMD(new Date())).toBe(false); + }); +}); + +describe('mergePatchSections', () => { + it('merges single-object sections per-key, preserving siblings', () => { + const existing = { + long_term: { content: 'keep', visibility: 'private', updatedAt: new Date(), byteSize: 4 }, + shared: { content: 'bio', visibility: 'public', updatedAt: new Date(), byteSize: 3 }, + }; + const incoming = { + dedup_state: { content: '## C\n{}', visibility: 'private', updatedAt: new Date(), byteSize: 8 }, + }; + const out = mergePatchSections(existing, incoming); + expect(out.long_term?.content).toBe('keep'); + expect(out.shared?.content).toBe('bio'); + expect(out.dedup_state?.content).toBe('## C\n{}'); + }); + + it('replaces a single-object section when incoming has the same key', () => { + const existing = { long_term: { content: 'old', visibility: 'private', updatedAt: new Date(), byteSize: 3 } }; + const incoming = { long_term: { content: 'new', visibility: 'private', updatedAt: new Date(), byteSize: 3 } }; + const out = mergePatchSections(existing, incoming); + expect(out.long_term?.content).toBe('new'); + }); + + it('merges daily entries by date (replace same-date, keep other dates)', () => { + const existing = { + daily: [ + { date: '2026-04-12', content: 'mon', visibility: 'private' }, + { date: '2026-04-13', content: 'tue', visibility: 'private' }, + ], + }; + const incoming = { + daily: [ + { date: '2026-04-13', content: 'tue-updated', visibility: 'private' }, + { date: '2026-04-14', content: 'wed', visibility: 'private' }, + ], + }; + const out = mergePatchSections(existing, incoming); + const byDate = Object.fromEntries((out.daily || []).map((d) => [d.date, d.content])); + expect(byDate['2026-04-12']).toBe('mon'); // preserved + expect(byDate['2026-04-13']).toBe('tue-updated'); // replaced + expect(byDate['2026-04-14']).toBe('wed'); // added + expect(out.daily?.length).toBe(3); + }); + + it('merges relationships by otherInstanceId', () => { + const existing = { + relationships: [ + { otherInstanceId: 'nova', notes: 'old nova', visibility: 'private', updatedAt: new Date() }, + { otherInstanceId: 'theo', notes: 'old theo', visibility: 'private', updatedAt: new Date() }, + ], + }; + const incoming = { + relationships: [ + { otherInstanceId: 'nova', notes: 'new nova', visibility: 'private', updatedAt: new Date() }, + { otherInstanceId: 'liz', notes: 'new liz', visibility: 'private', updatedAt: new Date() }, + ], + }; + const out = mergePatchSections(existing, incoming); + const byId = Object.fromEntries((out.relationships || []).map((r) => [r.otherInstanceId, r.notes])); + expect(byId.nova).toBe('new nova'); + expect(byId.theo).toBe('old theo'); + expect(byId.liz).toBe('new liz'); + expect(out.relationships?.length).toBe(3); + }); + + it('handles missing existing doc (returns stamped incoming as-is)', () => { + const incoming = { long_term: { content: 'x', visibility: 'private', updatedAt: new Date(), byteSize: 1 } }; + const out = mergePatchSections(undefined, incoming); + expect(out.long_term?.content).toBe('x'); + }); + + it('does not lose sections from existing that incoming omitted', () => { + const existing = { + soul: { content: 'who', visibility: 'private', updatedAt: new Date(), byteSize: 3 }, + long_term: { content: 'why', visibility: 'private', updatedAt: new Date(), byteSize: 3 }, + }; + const out = mergePatchSections(existing, {}); + expect(out.soul?.content).toBe('who'); + expect(out.long_term?.content).toBe('why'); + }); +}); + +describe('computeSyncDedupKey', () => { + const FIXED = new Date('2026-04-14T12:00:00Z'); + + it('produces the same key for identical sections + runtime + mode on the same day', () => { + const s = { long_term: { content: 'x' } }; + const a = computeSyncDedupKey(s, 'openclaw', 'patch', FIXED); + const b = computeSyncDedupKey(s, 'openclaw', 'patch', FIXED); + expect(a).toBe(b); + }); + + it('produces a different key when mode differs', () => { + const s = { long_term: { content: 'x' } }; + expect(computeSyncDedupKey(s, 'openclaw', 'full', FIXED)) + .not.toBe(computeSyncDedupKey(s, 'openclaw', 'patch', FIXED)); + }); + + it('produces a different key when sourceRuntime differs', () => { + const s = { long_term: { content: 'x' } }; + expect(computeSyncDedupKey(s, 'openclaw', 'patch', FIXED)) + .not.toBe(computeSyncDedupKey(s, 'webhook', 'patch', FIXED)); + }); + + it('produces a different key when the day rolls over', () => { + const s = { long_term: { content: 'x' } }; + const day1 = new Date('2026-04-14T23:59:59Z'); + const day2 = new Date('2026-04-15T00:00:01Z'); + expect(computeSyncDedupKey(s, 'openclaw', 'patch', day1)) + .not.toBe(computeSyncDedupKey(s, 'openclaw', 'patch', day2)); + }); + + it('produces a different key when section content differs by a byte', () => { + expect(computeSyncDedupKey({ long_term: { content: 'x' } }, 'oc', 'patch', FIXED)) + .not.toBe(computeSyncDedupKey({ long_term: { content: 'y' } }, 'oc', 'patch', FIXED)); + }); + + it('key starts with the UTC day', () => { + const k = computeSyncDedupKey({ long_term: { content: 'x' } }, 'openclaw', 'patch', FIXED); + expect(k).toMatch(/^2026-04-14:/); + }); + + it('is order-invariant on object keys (canonical stringify)', () => { + const a = { long_term: { content: 'x', visibility: 'private' } }; + const b = { long_term: { visibility: 'private', content: 'x' } }; + expect(computeSyncDedupKey(a, 'openclaw', 'patch', FIXED)) + .toBe(computeSyncDedupKey(b, 'openclaw', 'patch', FIXED)); + }); + + it('is order-invariant across multiple sections', () => { + const a = { long_term: { content: 'x' }, shared: { content: 'y' } }; + const b = { shared: { content: 'y' }, long_term: { content: 'x' } }; + expect(computeSyncDedupKey(a, 'openclaw', 'patch', FIXED)) + .toBe(computeSyncDedupKey(b, 'openclaw', 'patch', FIXED)); + }); +}); diff --git a/backend/models/AgentMemory.ts b/backend/models/AgentMemory.ts index 4a1c6602..e378792a 100644 --- a/backend/models/AgentMemory.ts +++ b/backend/models/AgentMemory.ts @@ -43,6 +43,11 @@ export interface IAgentMemory extends Document { sections?: IAgentMemorySections; sourceRuntime?: string; schemaVersion?: number; + // ADR-003 Phase 2: idempotent-dedup key for POST /memory/sync, scoped + // (dayBucket + sourceRuntime + contentHash). Repeated identical syncs + // within the same day bucket return early with { deduped: true }. + lastSyncKey?: string; + lastSyncAt?: Date; createdAt: Date; updatedAt: Date; } @@ -105,6 +110,8 @@ const agentMemorySchema = new Schema( sections: { type: agentMemorySectionsSchema, default: undefined }, sourceRuntime: { type: String, default: undefined }, schemaVersion: { type: Number, default: undefined }, + lastSyncKey: { type: String, default: undefined }, + lastSyncAt: { type: Date, default: undefined }, }, { timestamps: true }, ); diff --git a/backend/routes/agentsRuntime.ts b/backend/routes/agentsRuntime.ts index 356d007b..d599796e 100644 --- a/backend/routes/agentsRuntime.ts +++ b/backend/routes/agentsRuntime.ts @@ -21,7 +21,13 @@ const { requireApiTokenScopes } = require('../middleware/apiTokenScopes'); const Integration = require('../models/Integration'); const AgentMemory = require('../models/AgentMemory'); -const { mirrorContentFromSections, stampSectionsForWrite } = require('../services/agentMemoryService'); +const { + mirrorContentFromSections, + stampSectionsForWrite, + mergePatchSections, + computeSyncDedupKey, + isValidYMD, +} = require('../services/agentMemoryService'); const DMService = require('../services/dmService'); const ChatSummarizerService = require('../services/chatSummarizerService'); const AgentMentionService = require('../services/agentMentionService'); @@ -1236,7 +1242,7 @@ function validateSectionsPayload(sections: any): string | null { if (sections.daily !== undefined) { if (!Array.isArray(sections.daily)) return 'sections.daily must be an array'; for (const d of sections.daily) { - if (typeof d?.date !== 'string') return 'sections.daily[].date must be a string'; + if (!isValidYMD(d?.date)) return 'sections.daily[].date must be YYYY-MM-DD'; if (d.visibility !== undefined && !VALID_VISIBILITIES.has(d.visibility)) { return 'sections.daily[].visibility must be one of private|pod|public'; } @@ -1340,9 +1346,13 @@ router.put('/memory', agentRuntimeAuth, async (req: any, res: any) => { if (content !== undefined) setOps.content = content; if (sourceRuntime !== undefined) setOps.sourceRuntime = sourceRuntime; + // Invalidate the /memory/sync dedup cache: any non-sync writer mutates + // state the sync dedup key may no longer reflect. Without this, a sync + // path that promoted the same bytes earlier in the day will get wrongly + // short-circuited after a PUT/native-runtime write landed between. await AgentMemory.findOneAndUpdate( { agentName, instanceId }, - { $set: setOps }, + { $set: setOps, $unset: { lastSyncKey: '', lastSyncAt: '' } }, { upsert: true, new: true, setDefaultsOnInsert: true }, ); console.log('[agent-memory PUT]', { @@ -1359,6 +1369,115 @@ router.put('/memory', agentRuntimeAuth, async (req: any, res: any) => { } }); +/** + * POST /memory/sync (agent runtime token auth) + * ADR-003 Phase 2. Runtime-driver promotion of memory into the kernel. + * + * Body: + * { + * sections: {...}, // required, validated as in PUT /memory + * sourceRuntime?: string, // driver self-id (e.g. "openclaw") + * mode: "full" | "patch" // required + * } + * + * Modes: + * - "full": replaces `sections` wholesale with the payload. Sections not + * in the payload are cleared. Use when the driver is pushing a + * complete snapshot. + * - "patch": merges with existing state. Single-object sections are $set + * per-key (siblings preserved). Array sections (`daily`, + * `relationships`) merge element-wise, keyed by `date` and + * `otherInstanceId`. Use for incremental updates. + * + * Idempotency: repeated identical payloads within the same UTC day bucket + * are deduped (no write, returns `{ deduped: true }`). Key is + * `(dayBucket, sourceRuntime, sha256(sections+mode))`. + * + * `byteSize` and `updatedAt` are server-stamped. `schemaVersion` auto-set to 2. + * v1 `content` is mirrored from `long_term.content` (same rule as PUT). + */ +router.post('/memory/sync', agentRuntimeAuth, async (req: any, res: any) => { + try { + const { agentName, instanceId } = resolveMemoryIdentity(req); + if (!agentName) { + return res.status(403).json({ message: 'Could not resolve agent identity' }); + } + const { sections, sourceRuntime, mode } = req.body || {}; + const rejectAndLog = (msg: string) => { + console.log('[agent-memory SYNC reject]', { agentName, instanceId, msg }); + return res.status(400).json({ message: msg }); + }; + if (sections === undefined) return rejectAndLog('sections is required'); + const sectionsError = validateSectionsPayload(sections); + if (sectionsError) return rejectAndLog(sectionsError); + if (sourceRuntime !== undefined && typeof sourceRuntime !== 'string') { + return rejectAndLog('sourceRuntime must be a string'); + } + if (mode !== 'full' && mode !== 'patch') { + return rejectAndLog("mode must be 'full' or 'patch'"); + } + + const now = new Date(); + const dedupKey = computeSyncDedupKey(sections, sourceRuntime, mode, now); + + const existing = await AgentMemory.findOne({ agentName, instanceId }).lean(); + if (existing?.lastSyncKey === dedupKey) { + console.log('[agent-memory SYNC deduped]', { agentName, instanceId, mode, sourceRuntime }); + return res.json({ ok: true, deduped: true }); + } + + const stamped = stampSectionsForWrite(sections, now); + + let finalSections: any; + if (mode === 'full') { + finalSections = stamped; + } else { + finalSections = mergePatchSections(existing?.sections, stamped); + } + + const update: Record = { + sections: finalSections, + schemaVersion: 2, + lastSyncKey: dedupKey, + lastSyncAt: now, + }; + if (sourceRuntime !== undefined) update.sourceRuntime = sourceRuntime; + + // v1 `content` mirror rules: + // - full mode: always reflects whatever `long_term` is in the new + // sections — including `''` when the caller omitted long_term, since + // full mode means "no long_term from now on." Otherwise v1 readers + // see phantom data the kernel no longer stores. + // - patch mode: only mirrored when the caller explicitly wrote + // long_term (so an incremental patch that ignored long_term doesn't + // stomp v1 content). + if (mode === 'full') { + update.content = mirrorContentFromSections(finalSections); + } else if ((stamped as any).long_term !== undefined) { + update.content = mirrorContentFromSections(stamped); + } + + await AgentMemory.findOneAndUpdate( + { agentName, instanceId }, + { $set: update }, + { upsert: true, new: true, setDefaultsOnInsert: true }, + ); + + console.log('[agent-memory SYNC]', { + agentName, + instanceId, + mode, + sectionKeys: Object.keys(stamped), + sourceRuntime, + }); + + return res.json({ ok: true, schemaVersion: 2 }); + } catch (err: any) { + console.error('POST /memory/sync error:', err); + return res.status(500).json({ message: 'Failed to sync agent memory' }); + } +}); + /** * POST /posts (agent runtime token auth) * Create a post in the feed as the agent's bot user diff --git a/backend/services/agentMemoryService.ts b/backend/services/agentMemoryService.ts index 822d5f27..5caec9ab 100644 --- a/backend/services/agentMemoryService.ts +++ b/backend/services/agentMemoryService.ts @@ -1,3 +1,5 @@ +import crypto from 'crypto'; + import type { IAgentMemorySections, IDailySection, @@ -6,6 +8,24 @@ import type { MemoryVisibility, } from '../models/AgentMemory'; +// Valid YYYY-MM-DD — ADR-003 daily[].date shape. Strict: must be a calendar- +// valid date (Date.parse rejects e.g. 2026-02-30). +export const YMD_RE = /^(\d{4})-(\d{2})-(\d{2})$/; + +export function isValidYMD(s: unknown): boolean { + if (typeof s !== 'string') return false; + const m = s.match(YMD_RE); + if (!m) return false; + const [, y, mo, d] = m; + const dt = new Date(`${y}-${mo}-${d}T00:00:00Z`); + if (Number.isNaN(dt.getTime())) return false; + return ( + dt.getUTCFullYear() === Number(y) + && dt.getUTCMonth() + 1 === Number(mo) + && dt.getUTCDate() === Number(d) + ); +} + // ADR-003 Phase 1: parse legacy v1 `content` blobs into a v2 section envelope. // The parser splits markdown on top-level `## ` headers and buckets each // section into `dedup_state` (if the header name matches a known dedup tag) @@ -167,3 +187,77 @@ export function stampSectionsForWrite( } return out; } + +// ADR-003 Phase 2: element-level merge for array sections under +// `mode: 'patch'`. `daily[]` is keyed by `date`; `relationships[]` is keyed by +// `otherInstanceId`. Incoming entries overwrite existing ones with the same +// key; existing entries with keys not in the payload are preserved. Caller +// is expected to pass already-stamped incoming sections. +// +// Concurrency caveat: the `/memory/sync` handler does a read-merge-write +// (findOne → mergePatchSections → findOneAndUpdate). Drivers are expected to +// serialize promotions per (agentName, instanceId) — typical heartbeat +// cadence. When webhook drivers land (Phase 2b / later), revisit with an +// update-pipeline `$mergeObjects` / optimistic-concurrency version check. +export function mergePatchSections( + existing: IAgentMemorySections | undefined, + incoming: IAgentMemorySections, +): IAgentMemorySections { + const out: IAgentMemorySections = { ...(existing || {}) }; + + for (const key of Object.keys(incoming) as (keyof IAgentMemorySections)[]) { + const inc = incoming[key]; + if (inc === undefined) continue; + + if (key === 'daily') { + const byDate = new Map(); + for (const d of existing?.daily || []) byDate.set(d.date, d); + for (const d of inc as IDailySection[]) byDate.set(d.date, d); + out.daily = Array.from(byDate.values()); + continue; + } + + if (key === 'relationships') { + const byId = new Map(); + for (const r of existing?.relationships || []) byId.set(r.otherInstanceId, r); + for (const r of inc as IRelationshipNote[]) byId.set(r.otherInstanceId, r); + out.relationships = Array.from(byId.values()); + continue; + } + + out[key] = inc as IMemorySection; + } + + return out; +} + +// Canonical stringify: sorts object keys recursively so semantically identical +// payloads hash identically regardless of emit order. JSON.stringify is NOT +// canonical. Used only for dedup key computation. +function canonicalStringify(value: unknown): string { + if (value === null || typeof value !== 'object') return JSON.stringify(value); + if (value instanceof Date) return JSON.stringify(value); + if (Array.isArray(value)) return `[${value.map(canonicalStringify).join(',')}]`; + const rec = value as Record; + const keys = Object.keys(rec).sort(); + const parts = keys.map((k) => `${JSON.stringify(k)}:${canonicalStringify(rec[k])}`); + return `{${parts.join(',')}}`; +} + +// Dedup key for /memory/sync — combines day bucket (UTC) with sourceRuntime +// and a stable content hash of the sections payload. Two syncs with the same +// *semantic* payload on the same day collapse to one write (see +// canonicalStringify: key order is normalized). +// 32 hex chars = 128 bits of collision resistance — ample for a per-instance, +// per-day dedup where a miss just means one extra write. +export function computeSyncDedupKey( + sections: IAgentMemorySections, + sourceRuntime: string | undefined, + mode: 'full' | 'patch', + now: Date = new Date(), +): string { + const day = now.toISOString().slice(0, 10); // YYYY-MM-DD + const payload = canonicalStringify({ sections, sourceRuntime: sourceRuntime ?? null, mode }); + const hash = crypto.createHash('sha256').update(payload).digest('hex').slice(0, 32); + return `${day}:${sourceRuntime ?? '-'}:${hash}`; +} diff --git a/backend/services/nativeRuntimeService.ts b/backend/services/nativeRuntimeService.ts index 6941f873..6bef65e9 100644 --- a/backend/services/nativeRuntimeService.ts +++ b/backend/services/nativeRuntimeService.ts @@ -240,9 +240,12 @@ async function dispatchTool( }); const prior = (existing?.content as string) || ''; const nextContent = `${prior}${prior ? '\n\n' : ''}[${new Date().toISOString()}] ${content}`; + // ADR-003 Phase 2: clear /memory/sync dedup cache when a non-sync + // writer mutates the doc, so the next sync promotion isn't wrongly + // short-circuited on a stale hash. await AgentMemory.findOneAndUpdate( { agentName: ctx.agentName, instanceId: ctx.instanceId }, - { $set: { content: nextContent } }, + { $set: { content: nextContent }, $unset: { lastSyncKey: '', lastSyncAt: '' } }, { upsert: true, new: true, setDefaultsOnInsert: true }, ); return { content: { ok: true, written: true } };