diff --git a/backend/__tests__/integration/agent-memory-envelope.test.js b/backend/__tests__/integration/agent-memory-envelope.test.js index 194f657a..8d45b843 100644 --- a/backend/__tests__/integration/agent-memory-envelope.test.js +++ b/backend/__tests__/integration/agent-memory-envelope.test.js @@ -548,4 +548,314 @@ describe('AgentMemory envelope — GET/PUT /memory + backfill', () => { expect(after.sections).toBeUndefined(); }); }); + + // ------------------------------------------------------------------- // + // POST /memory/sync (ADR-003 Phase 2) // + // ------------------------------------------------------------------- // + + describe('POST /memory/sync', () => { + it('rejects requests without sections', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ mode: 'full' }); + expect(res.status).toBe(400); + expect(res.body.message).toMatch(/sections is required/); + }); + + it('rejects requests without a valid mode', async () => { + const resA = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'x' } } }); + expect(resA.status).toBe(400); + expect(resA.body.message).toMatch(/mode must be 'full' or 'patch'/); + + const resB = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'x' } }, mode: 'merge' }); + expect(resB.status).toBe(400); + }); + + it('rejects invalid YYYY-MM-DD date on daily entries', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { daily: [{ date: '2026/04/14', content: 'x' }] }, + mode: 'full', + }); + expect(res.status).toBe(400); + expect(res.body.message).toMatch(/YYYY-MM-DD/); + }); + + it('also rejects calendar-invalid dates (feb 30)', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { daily: [{ date: '2026-02-30', content: 'x' }] }, + mode: 'full', + }); + expect(res.status).toBe(400); + expect(res.body.message).toMatch(/YYYY-MM-DD/); + }); + + it('full mode: replaces the entire sections envelope', async () => { + // Seed with long_term + shared. + await request(app) + .put('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'old' }, shared: { content: 'bio', visibility: 'public' } } }) + .expect(200); + + // full sync with only dedup_state — long_term/shared should be gone. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { dedup_state: { content: '## Commented\n{}' } }, + mode: 'full', + sourceRuntime: 'openclaw', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term).toBeUndefined(); + expect(get.body.sections.shared).toBeUndefined(); + expect(get.body.sections.dedup_state.content).toContain('Commented'); + expect(get.body.sourceRuntime).toBe('openclaw'); + }); + + it('patch mode: preserves sibling sections and merges daily by date', async () => { + // Seed. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + long_term: { content: 'keep me' }, + daily: [ + { date: '2026-04-12', content: 'mon' }, + { date: '2026-04-13', content: 'tue' }, + ], + }, + mode: 'full', + }) + .expect(200); + + // Patch with updated tue + new wed; long_term should survive. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + daily: [ + { date: '2026-04-13', content: 'tue-updated' }, + { date: '2026-04-14', content: 'wed' }, + ], + }, + mode: 'patch', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.content).toBe('keep me'); + const byDate = Object.fromEntries(get.body.sections.daily.map((d) => [d.date, d.content])); + expect(byDate['2026-04-12']).toBe('mon'); + expect(byDate['2026-04-13']).toBe('tue-updated'); + expect(byDate['2026-04-14']).toBe('wed'); + }); + + it('patch mode: merges relationships by otherInstanceId', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + relationships: [ + { otherInstanceId: 'nova', notes: 'old nova' }, + { otherInstanceId: 'theo', notes: 'old theo' }, + ], + }, + mode: 'full', + }) + .expect(200); + + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { + relationships: [ + { otherInstanceId: 'nova', notes: 'new nova' }, + { otherInstanceId: 'liz', notes: 'new liz' }, + ], + }, + mode: 'patch', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + const byId = Object.fromEntries( + get.body.sections.relationships.map((r) => [r.otherInstanceId, r.notes]), + ); + expect(byId.nova).toBe('new nova'); + expect(byId.theo).toBe('old theo'); + expect(byId.liz).toBe('new liz'); + }); + + it('mirrors v1 content when patch mode includes long_term', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: 'sync-mirrored' } }, + mode: 'patch', + }) + .expect(200); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.content).toBe('sync-mirrored'); + }); + + it('dedupes identical payloads within the same day bucket', async () => { + const body = { + sections: { long_term: { content: 'stable' } }, + sourceRuntime: 'openclaw', + mode: 'patch', + }; + + const first = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body); + expect(first.status).toBe(200); + expect(first.body.ok).toBe(true); + expect(first.body.deduped).toBeUndefined(); + + const second = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body); + expect(second.status).toBe(200); + expect(second.body.ok).toBe(true); + expect(second.body.deduped).toBe(true); + + // Count should still be 1. + expect(await AgentMemory.countDocuments({})).toBe(1); + }); + + it('does NOT dedupe when the payload content changes', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: 'first' } }, sourceRuntime: 'openclaw', mode: 'patch', + }) + .expect(200); + + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: 'second' } }, sourceRuntime: 'openclaw', mode: 'patch', + }); + expect(res.body.deduped).toBeUndefined(); + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.content).toBe('second'); + }); + + it('server-stamps byteSize on sync writes', async () => { + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ + sections: { long_term: { content: '😀 hi', byteSize: 9999 } }, + mode: 'full', + }) + .expect(200); + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.byteSize).toBe(Buffer.byteLength('😀 hi', 'utf8')); + }); + + it('rejects unauthenticated sync requests', async () => { + const res = await request(app) + .post('/api/agents/runtime/memory/sync') + .send({ sections: { long_term: { content: 'x' } }, mode: 'full' }); + expect(res.status).toBe(401); + }); + + it('full mode without long_term wipes the v1 content mirror', async () => { + // Seed with v1 content via mirror. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'v1 mirror source' } }, mode: 'full' }) + .expect(200); + let get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.content).toBe('v1 mirror source'); + + // full sync that omits long_term — v1 content must be blanked, not stale. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { dedup_state: { content: '## Commented\n{}' } }, mode: 'full' }) + .expect(200); + get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.content).toBe(''); + expect(get.body.sections.long_term).toBeUndefined(); + }); + + it('PUT /memory invalidates the sync dedup cache (cross-writer safety)', async () => { + const body = { + sections: { long_term: { content: 'dedup-me' } }, + sourceRuntime: 'openclaw', + mode: 'patch', + }; + + // Sync once so lastSyncKey is populated. + await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body) + .expect(200); + + // A non-sync writer mutates sections directly (human operator / v1 tool). + await request(app) + .put('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`) + .send({ sections: { long_term: { content: 'stomped by PUT' } } }) + .expect(200); + + // The same sync payload must NOT be deduped now — kernel state drifted. + const second = await request(app) + .post('/api/agents/runtime/memory/sync') + .set('Authorization', `Bearer ${runtimeToken}`) + .send(body); + expect(second.body.deduped).toBeUndefined(); + + const get = await request(app) + .get('/api/agents/runtime/memory') + .set('Authorization', `Bearer ${runtimeToken}`); + expect(get.body.sections.long_term.content).toBe('dedup-me'); + }); + }); }); diff --git a/backend/__tests__/unit/services/agentMemoryService.test.ts b/backend/__tests__/unit/services/agentMemoryService.test.ts index 49c2ee8b..96901360 100644 --- a/backend/__tests__/unit/services/agentMemoryService.test.ts +++ b/backend/__tests__/unit/services/agentMemoryService.test.ts @@ -7,6 +7,9 @@ const { buildSectionsFromLegacyContent, mirrorContentFromSections, stampSectionsForWrite, + mergePatchSections, + computeSyncDedupKey, + isValidYMD, } = require('../../../services/agentMemoryService'); describe('parseContentIntoSections', () => { @@ -225,3 +228,170 @@ describe('stampSectionsForWrite', () => { expect(b).toEqual(a); }); }); + +describe('isValidYMD', () => { + it('accepts valid YYYY-MM-DD', () => { + expect(isValidYMD('2026-04-14')).toBe(true); + expect(isValidYMD('2000-01-01')).toBe(true); + expect(isValidYMD('2024-02-29')).toBe(true); // leap year + }); + + it('rejects malformed strings', () => { + expect(isValidYMD('2026-4-14')).toBe(false); // not zero-padded + expect(isValidYMD('2026/04/14')).toBe(false); + expect(isValidYMD('14-04-2026')).toBe(false); + expect(isValidYMD('')).toBe(false); + expect(isValidYMD('Apr 14')).toBe(false); + }); + + it('rejects calendar-invalid dates', () => { + expect(isValidYMD('2026-02-30')).toBe(false); // no Feb 30 + expect(isValidYMD('2026-13-01')).toBe(false); // month 13 + expect(isValidYMD('2026-00-01')).toBe(false); // month 0 + expect(isValidYMD('2026-04-00')).toBe(false); // day 0 + expect(isValidYMD('2023-02-29')).toBe(false); // non-leap year + }); + + it('rejects non-strings', () => { + expect(isValidYMD(20260414)).toBe(false); + expect(isValidYMD(undefined)).toBe(false); + expect(isValidYMD(null)).toBe(false); + expect(isValidYMD(new Date())).toBe(false); + }); +}); + +describe('mergePatchSections', () => { + it('merges single-object sections per-key, preserving siblings', () => { + const existing = { + long_term: { content: 'keep', visibility: 'private', updatedAt: new Date(), byteSize: 4 }, + shared: { content: 'bio', visibility: 'public', updatedAt: new Date(), byteSize: 3 }, + }; + const incoming = { + dedup_state: { content: '## C\n{}', visibility: 'private', updatedAt: new Date(), byteSize: 8 }, + }; + const out = mergePatchSections(existing, incoming); + expect(out.long_term?.content).toBe('keep'); + expect(out.shared?.content).toBe('bio'); + expect(out.dedup_state?.content).toBe('## C\n{}'); + }); + + it('replaces a single-object section when incoming has the same key', () => { + const existing = { long_term: { content: 'old', visibility: 'private', updatedAt: new Date(), byteSize: 3 } }; + const incoming = { long_term: { content: 'new', visibility: 'private', updatedAt: new Date(), byteSize: 3 } }; + const out = mergePatchSections(existing, incoming); + expect(out.long_term?.content).toBe('new'); + }); + + it('merges daily entries by date (replace same-date, keep other dates)', () => { + const existing = { + daily: [ + { date: '2026-04-12', content: 'mon', visibility: 'private' }, + { date: '2026-04-13', content: 'tue', visibility: 'private' }, + ], + }; + const incoming = { + daily: [ + { date: '2026-04-13', content: 'tue-updated', visibility: 'private' }, + { date: '2026-04-14', content: 'wed', visibility: 'private' }, + ], + }; + const out = mergePatchSections(existing, incoming); + const byDate = Object.fromEntries((out.daily || []).map((d) => [d.date, d.content])); + expect(byDate['2026-04-12']).toBe('mon'); // preserved + expect(byDate['2026-04-13']).toBe('tue-updated'); // replaced + expect(byDate['2026-04-14']).toBe('wed'); // added + expect(out.daily?.length).toBe(3); + }); + + it('merges relationships by otherInstanceId', () => { + const existing = { + relationships: [ + { otherInstanceId: 'nova', notes: 'old nova', visibility: 'private', updatedAt: new Date() }, + { otherInstanceId: 'theo', notes: 'old theo', visibility: 'private', updatedAt: new Date() }, + ], + }; + const incoming = { + relationships: [ + { otherInstanceId: 'nova', notes: 'new nova', visibility: 'private', updatedAt: new Date() }, + { otherInstanceId: 'liz', notes: 'new liz', visibility: 'private', updatedAt: new Date() }, + ], + }; + const out = mergePatchSections(existing, incoming); + const byId = Object.fromEntries((out.relationships || []).map((r) => [r.otherInstanceId, r.notes])); + expect(byId.nova).toBe('new nova'); + expect(byId.theo).toBe('old theo'); + expect(byId.liz).toBe('new liz'); + expect(out.relationships?.length).toBe(3); + }); + + it('handles missing existing doc (returns stamped incoming as-is)', () => { + const incoming = { long_term: { content: 'x', visibility: 'private', updatedAt: new Date(), byteSize: 1 } }; + const out = mergePatchSections(undefined, incoming); + expect(out.long_term?.content).toBe('x'); + }); + + it('does not lose sections from existing that incoming omitted', () => { + const existing = { + soul: { content: 'who', visibility: 'private', updatedAt: new Date(), byteSize: 3 }, + long_term: { content: 'why', visibility: 'private', updatedAt: new Date(), byteSize: 3 }, + }; + const out = mergePatchSections(existing, {}); + expect(out.soul?.content).toBe('who'); + expect(out.long_term?.content).toBe('why'); + }); +}); + +describe('computeSyncDedupKey', () => { + const FIXED = new Date('2026-04-14T12:00:00Z'); + + it('produces the same key for identical sections + runtime + mode on the same day', () => { + const s = { long_term: { content: 'x' } }; + const a = computeSyncDedupKey(s, 'openclaw', 'patch', FIXED); + const b = computeSyncDedupKey(s, 'openclaw', 'patch', FIXED); + expect(a).toBe(b); + }); + + it('produces a different key when mode differs', () => { + const s = { long_term: { content: 'x' } }; + expect(computeSyncDedupKey(s, 'openclaw', 'full', FIXED)) + .not.toBe(computeSyncDedupKey(s, 'openclaw', 'patch', FIXED)); + }); + + it('produces a different key when sourceRuntime differs', () => { + const s = { long_term: { content: 'x' } }; + expect(computeSyncDedupKey(s, 'openclaw', 'patch', FIXED)) + .not.toBe(computeSyncDedupKey(s, 'webhook', 'patch', FIXED)); + }); + + it('produces a different key when the day rolls over', () => { + const s = { long_term: { content: 'x' } }; + const day1 = new Date('2026-04-14T23:59:59Z'); + const day2 = new Date('2026-04-15T00:00:01Z'); + expect(computeSyncDedupKey(s, 'openclaw', 'patch', day1)) + .not.toBe(computeSyncDedupKey(s, 'openclaw', 'patch', day2)); + }); + + it('produces a different key when section content differs by a byte', () => { + expect(computeSyncDedupKey({ long_term: { content: 'x' } }, 'oc', 'patch', FIXED)) + .not.toBe(computeSyncDedupKey({ long_term: { content: 'y' } }, 'oc', 'patch', FIXED)); + }); + + it('key starts with the UTC day', () => { + const k = computeSyncDedupKey({ long_term: { content: 'x' } }, 'openclaw', 'patch', FIXED); + expect(k).toMatch(/^2026-04-14:/); + }); + + it('is order-invariant on object keys (canonical stringify)', () => { + const a = { long_term: { content: 'x', visibility: 'private' } }; + const b = { long_term: { visibility: 'private', content: 'x' } }; + expect(computeSyncDedupKey(a, 'openclaw', 'patch', FIXED)) + .toBe(computeSyncDedupKey(b, 'openclaw', 'patch', FIXED)); + }); + + it('is order-invariant across multiple sections', () => { + const a = { long_term: { content: 'x' }, shared: { content: 'y' } }; + const b = { shared: { content: 'y' }, long_term: { content: 'x' } }; + expect(computeSyncDedupKey(a, 'openclaw', 'patch', FIXED)) + .toBe(computeSyncDedupKey(b, 'openclaw', 'patch', FIXED)); + }); +}); diff --git a/backend/models/AgentMemory.ts b/backend/models/AgentMemory.ts index 4a1c6602..e378792a 100644 --- a/backend/models/AgentMemory.ts +++ b/backend/models/AgentMemory.ts @@ -43,6 +43,11 @@ export interface IAgentMemory extends Document { sections?: IAgentMemorySections; sourceRuntime?: string; schemaVersion?: number; + // ADR-003 Phase 2: idempotent-dedup key for POST /memory/sync, scoped + // (dayBucket + sourceRuntime + contentHash). Repeated identical syncs + // within the same day bucket return early with { deduped: true }. + lastSyncKey?: string; + lastSyncAt?: Date; createdAt: Date; updatedAt: Date; } @@ -105,6 +110,8 @@ const agentMemorySchema = new Schema( sections: { type: agentMemorySectionsSchema, default: undefined }, sourceRuntime: { type: String, default: undefined }, schemaVersion: { type: Number, default: undefined }, + lastSyncKey: { type: String, default: undefined }, + lastSyncAt: { type: Date, default: undefined }, }, { timestamps: true }, ); diff --git a/backend/routes/agentsRuntime.ts b/backend/routes/agentsRuntime.ts index 356d007b..d599796e 100644 --- a/backend/routes/agentsRuntime.ts +++ b/backend/routes/agentsRuntime.ts @@ -21,7 +21,13 @@ const { requireApiTokenScopes } = require('../middleware/apiTokenScopes'); const Integration = require('../models/Integration'); const AgentMemory = require('../models/AgentMemory'); -const { mirrorContentFromSections, stampSectionsForWrite } = require('../services/agentMemoryService'); +const { + mirrorContentFromSections, + stampSectionsForWrite, + mergePatchSections, + computeSyncDedupKey, + isValidYMD, +} = require('../services/agentMemoryService'); const DMService = require('../services/dmService'); const ChatSummarizerService = require('../services/chatSummarizerService'); const AgentMentionService = require('../services/agentMentionService'); @@ -1236,7 +1242,7 @@ function validateSectionsPayload(sections: any): string | null { if (sections.daily !== undefined) { if (!Array.isArray(sections.daily)) return 'sections.daily must be an array'; for (const d of sections.daily) { - if (typeof d?.date !== 'string') return 'sections.daily[].date must be a string'; + if (!isValidYMD(d?.date)) return 'sections.daily[].date must be YYYY-MM-DD'; if (d.visibility !== undefined && !VALID_VISIBILITIES.has(d.visibility)) { return 'sections.daily[].visibility must be one of private|pod|public'; } @@ -1340,9 +1346,13 @@ router.put('/memory', agentRuntimeAuth, async (req: any, res: any) => { if (content !== undefined) setOps.content = content; if (sourceRuntime !== undefined) setOps.sourceRuntime = sourceRuntime; + // Invalidate the /memory/sync dedup cache: any non-sync writer mutates + // state the sync dedup key may no longer reflect. Without this, a sync + // path that promoted the same bytes earlier in the day will get wrongly + // short-circuited after a PUT/native-runtime write landed between. await AgentMemory.findOneAndUpdate( { agentName, instanceId }, - { $set: setOps }, + { $set: setOps, $unset: { lastSyncKey: '', lastSyncAt: '' } }, { upsert: true, new: true, setDefaultsOnInsert: true }, ); console.log('[agent-memory PUT]', { @@ -1359,6 +1369,115 @@ router.put('/memory', agentRuntimeAuth, async (req: any, res: any) => { } }); +/** + * POST /memory/sync (agent runtime token auth) + * ADR-003 Phase 2. Runtime-driver promotion of memory into the kernel. + * + * Body: + * { + * sections: {...}, // required, validated as in PUT /memory + * sourceRuntime?: string, // driver self-id (e.g. "openclaw") + * mode: "full" | "patch" // required + * } + * + * Modes: + * - "full": replaces `sections` wholesale with the payload. Sections not + * in the payload are cleared. Use when the driver is pushing a + * complete snapshot. + * - "patch": merges with existing state. Single-object sections are $set + * per-key (siblings preserved). Array sections (`daily`, + * `relationships`) merge element-wise, keyed by `date` and + * `otherInstanceId`. Use for incremental updates. + * + * Idempotency: repeated identical payloads within the same UTC day bucket + * are deduped (no write, returns `{ deduped: true }`). Key is + * `(dayBucket, sourceRuntime, sha256(sections+mode))`. + * + * `byteSize` and `updatedAt` are server-stamped. `schemaVersion` auto-set to 2. + * v1 `content` is mirrored from `long_term.content` (same rule as PUT). + */ +router.post('/memory/sync', agentRuntimeAuth, async (req: any, res: any) => { + try { + const { agentName, instanceId } = resolveMemoryIdentity(req); + if (!agentName) { + return res.status(403).json({ message: 'Could not resolve agent identity' }); + } + const { sections, sourceRuntime, mode } = req.body || {}; + const rejectAndLog = (msg: string) => { + console.log('[agent-memory SYNC reject]', { agentName, instanceId, msg }); + return res.status(400).json({ message: msg }); + }; + if (sections === undefined) return rejectAndLog('sections is required'); + const sectionsError = validateSectionsPayload(sections); + if (sectionsError) return rejectAndLog(sectionsError); + if (sourceRuntime !== undefined && typeof sourceRuntime !== 'string') { + return rejectAndLog('sourceRuntime must be a string'); + } + if (mode !== 'full' && mode !== 'patch') { + return rejectAndLog("mode must be 'full' or 'patch'"); + } + + const now = new Date(); + const dedupKey = computeSyncDedupKey(sections, sourceRuntime, mode, now); + + const existing = await AgentMemory.findOne({ agentName, instanceId }).lean(); + if (existing?.lastSyncKey === dedupKey) { + console.log('[agent-memory SYNC deduped]', { agentName, instanceId, mode, sourceRuntime }); + return res.json({ ok: true, deduped: true }); + } + + const stamped = stampSectionsForWrite(sections, now); + + let finalSections: any; + if (mode === 'full') { + finalSections = stamped; + } else { + finalSections = mergePatchSections(existing?.sections, stamped); + } + + const update: Record = { + sections: finalSections, + schemaVersion: 2, + lastSyncKey: dedupKey, + lastSyncAt: now, + }; + if (sourceRuntime !== undefined) update.sourceRuntime = sourceRuntime; + + // v1 `content` mirror rules: + // - full mode: always reflects whatever `long_term` is in the new + // sections — including `''` when the caller omitted long_term, since + // full mode means "no long_term from now on." Otherwise v1 readers + // see phantom data the kernel no longer stores. + // - patch mode: only mirrored when the caller explicitly wrote + // long_term (so an incremental patch that ignored long_term doesn't + // stomp v1 content). + if (mode === 'full') { + update.content = mirrorContentFromSections(finalSections); + } else if ((stamped as any).long_term !== undefined) { + update.content = mirrorContentFromSections(stamped); + } + + await AgentMemory.findOneAndUpdate( + { agentName, instanceId }, + { $set: update }, + { upsert: true, new: true, setDefaultsOnInsert: true }, + ); + + console.log('[agent-memory SYNC]', { + agentName, + instanceId, + mode, + sectionKeys: Object.keys(stamped), + sourceRuntime, + }); + + return res.json({ ok: true, schemaVersion: 2 }); + } catch (err: any) { + console.error('POST /memory/sync error:', err); + return res.status(500).json({ message: 'Failed to sync agent memory' }); + } +}); + /** * POST /posts (agent runtime token auth) * Create a post in the feed as the agent's bot user diff --git a/backend/services/agentMemoryService.ts b/backend/services/agentMemoryService.ts index 822d5f27..5caec9ab 100644 --- a/backend/services/agentMemoryService.ts +++ b/backend/services/agentMemoryService.ts @@ -1,3 +1,5 @@ +import crypto from 'crypto'; + import type { IAgentMemorySections, IDailySection, @@ -6,6 +8,24 @@ import type { MemoryVisibility, } from '../models/AgentMemory'; +// Valid YYYY-MM-DD — ADR-003 daily[].date shape. Strict: must be a calendar- +// valid date (Date.parse rejects e.g. 2026-02-30). +export const YMD_RE = /^(\d{4})-(\d{2})-(\d{2})$/; + +export function isValidYMD(s: unknown): boolean { + if (typeof s !== 'string') return false; + const m = s.match(YMD_RE); + if (!m) return false; + const [, y, mo, d] = m; + const dt = new Date(`${y}-${mo}-${d}T00:00:00Z`); + if (Number.isNaN(dt.getTime())) return false; + return ( + dt.getUTCFullYear() === Number(y) + && dt.getUTCMonth() + 1 === Number(mo) + && dt.getUTCDate() === Number(d) + ); +} + // ADR-003 Phase 1: parse legacy v1 `content` blobs into a v2 section envelope. // The parser splits markdown on top-level `## ` headers and buckets each // section into `dedup_state` (if the header name matches a known dedup tag) @@ -167,3 +187,77 @@ export function stampSectionsForWrite( } return out; } + +// ADR-003 Phase 2: element-level merge for array sections under +// `mode: 'patch'`. `daily[]` is keyed by `date`; `relationships[]` is keyed by +// `otherInstanceId`. Incoming entries overwrite existing ones with the same +// key; existing entries with keys not in the payload are preserved. Caller +// is expected to pass already-stamped incoming sections. +// +// Concurrency caveat: the `/memory/sync` handler does a read-merge-write +// (findOne → mergePatchSections → findOneAndUpdate). Drivers are expected to +// serialize promotions per (agentName, instanceId) — typical heartbeat +// cadence. When webhook drivers land (Phase 2b / later), revisit with an +// update-pipeline `$mergeObjects` / optimistic-concurrency version check. +export function mergePatchSections( + existing: IAgentMemorySections | undefined, + incoming: IAgentMemorySections, +): IAgentMemorySections { + const out: IAgentMemorySections = { ...(existing || {}) }; + + for (const key of Object.keys(incoming) as (keyof IAgentMemorySections)[]) { + const inc = incoming[key]; + if (inc === undefined) continue; + + if (key === 'daily') { + const byDate = new Map(); + for (const d of existing?.daily || []) byDate.set(d.date, d); + for (const d of inc as IDailySection[]) byDate.set(d.date, d); + out.daily = Array.from(byDate.values()); + continue; + } + + if (key === 'relationships') { + const byId = new Map(); + for (const r of existing?.relationships || []) byId.set(r.otherInstanceId, r); + for (const r of inc as IRelationshipNote[]) byId.set(r.otherInstanceId, r); + out.relationships = Array.from(byId.values()); + continue; + } + + out[key] = inc as IMemorySection; + } + + return out; +} + +// Canonical stringify: sorts object keys recursively so semantically identical +// payloads hash identically regardless of emit order. JSON.stringify is NOT +// canonical. Used only for dedup key computation. +function canonicalStringify(value: unknown): string { + if (value === null || typeof value !== 'object') return JSON.stringify(value); + if (value instanceof Date) return JSON.stringify(value); + if (Array.isArray(value)) return `[${value.map(canonicalStringify).join(',')}]`; + const rec = value as Record; + const keys = Object.keys(rec).sort(); + const parts = keys.map((k) => `${JSON.stringify(k)}:${canonicalStringify(rec[k])}`); + return `{${parts.join(',')}}`; +} + +// Dedup key for /memory/sync — combines day bucket (UTC) with sourceRuntime +// and a stable content hash of the sections payload. Two syncs with the same +// *semantic* payload on the same day collapse to one write (see +// canonicalStringify: key order is normalized). +// 32 hex chars = 128 bits of collision resistance — ample for a per-instance, +// per-day dedup where a miss just means one extra write. +export function computeSyncDedupKey( + sections: IAgentMemorySections, + sourceRuntime: string | undefined, + mode: 'full' | 'patch', + now: Date = new Date(), +): string { + const day = now.toISOString().slice(0, 10); // YYYY-MM-DD + const payload = canonicalStringify({ sections, sourceRuntime: sourceRuntime ?? null, mode }); + const hash = crypto.createHash('sha256').update(payload).digest('hex').slice(0, 32); + return `${day}:${sourceRuntime ?? '-'}:${hash}`; +} diff --git a/backend/services/nativeRuntimeService.ts b/backend/services/nativeRuntimeService.ts index 6941f873..6bef65e9 100644 --- a/backend/services/nativeRuntimeService.ts +++ b/backend/services/nativeRuntimeService.ts @@ -240,9 +240,12 @@ async function dispatchTool( }); const prior = (existing?.content as string) || ''; const nextContent = `${prior}${prior ? '\n\n' : ''}[${new Date().toISOString()}] ${content}`; + // ADR-003 Phase 2: clear /memory/sync dedup cache when a non-sync + // writer mutates the doc, so the next sync promotion isn't wrongly + // short-circuited on a stale hash. await AgentMemory.findOneAndUpdate( { agentName: ctx.agentName, instanceId: ctx.instanceId }, - { $set: { content: nextContent } }, + { $set: { content: nextContent }, $unset: { lastSyncKey: '', lastSyncAt: '' } }, { upsert: true, new: true, setDefaultsOnInsert: true }, ); return { content: { ok: true, written: true } };