diff --git a/src/paths.ts b/src/paths.ts index f9433a4..1d11c29 100644 --- a/src/paths.ts +++ b/src/paths.ts @@ -20,6 +20,10 @@ export async function workspaceMemoryPath(root: string): Promise { return join(await memoryRoot(root), "workspace-memory.json"); } +export async function workspacePendingJournalPath(root: string): Promise { + return join(await memoryRoot(root), "workspace-pending-journal.json"); +} + export async function sessionStatePath(root: string, sessionID: string): Promise { const safeSessionID = createHash("sha256").update(sessionID).digest("hex").slice(0, 32); return join(await memoryRoot(root), "sessions", `${safeSessionID}.json`); diff --git a/src/pending-journal.ts b/src/pending-journal.ts new file mode 100644 index 0000000..0c5e49a --- /dev/null +++ b/src/pending-journal.ts @@ -0,0 +1,102 @@ +import type { LongTermMemoryEntry, PendingMemoryJournalStore } from "./types.ts"; +import { workspaceKey, workspacePendingJournalPath } from "./paths.ts"; +import { atomicWriteJSON, readJSON, updateJSON } from "./storage.ts"; + +function normalizeMemoryText(text: string): string { + return text + .normalize("NFKC") + .toLowerCase() + .replace(/[\s\p{P}]+/gu, " ") + .trim(); +} + +export function memoryKey(entry: Pick): string { + return `${entry.type}:${normalizeMemoryText(entry.text)}`; +} + +export async function emptyPendingJournal(root: string): Promise { + return { + version: 1, + workspace: { root, key: await workspaceKey(root) }, + entries: [], + updatedAt: new Date().toISOString(), + }; +} + +function dedupeByText(entries: LongTermMemoryEntry[]): LongTermMemoryEntry[] { + const seen = new Set(); + const result: LongTermMemoryEntry[] = []; + + for (const entry of entries) { + const key = memoryKey(entry); + if (seen.has(key)) continue; + seen.add(key); + result.push(entry); + } + + return result; +} + +function normalizeJournal( + root: string, + store: PendingMemoryJournalStore, +): Promise { + return workspaceKey(root).then(key => ({ + version: 1, + workspace: { root, key }, + entries: dedupeByText(Array.isArray(store.entries) ? store.entries : []), + updatedAt: new Date().toISOString(), + })); +} + +export async function loadPendingJournal(root: string): Promise { + const path = await workspacePendingJournalPath(root); + const fallback = await emptyPendingJournal(root); + const loaded = await readJSON(path, () => fallback) as Partial; + return normalizeJournal(root, { + version: loaded.version ?? 1, + workspace: loaded.workspace ?? fallback.workspace, + entries: Array.isArray(loaded.entries) ? loaded.entries : [], + updatedAt: loaded.updatedAt ?? fallback.updatedAt, + }); +} + +export async function savePendingJournal(root: string, store: PendingMemoryJournalStore): Promise { + await atomicWriteJSON(await workspacePendingJournalPath(root), await normalizeJournal(root, store)); +} + +export async function updatePendingJournal( + root: string, + updater: (store: PendingMemoryJournalStore) => PendingMemoryJournalStore | Promise, +): Promise { + const path = await workspacePendingJournalPath(root); + const fallback = await emptyPendingJournal(root); + return updateJSON(path, () => fallback, async current => { + const normalized = await normalizeJournal(root, current); + return normalizeJournal(root, await updater(normalized)); + }); +} + +export async function appendPendingMemories(root: string, memories: LongTermMemoryEntry[]): Promise { + if (memories.length === 0) return; + await updatePendingJournal(root, store => { + store.entries.push(...memories); + return store; + }); +} + +export async function hasPendingJournalEntries(root: string): Promise { + const journal = await loadPendingJournal(root); + return journal.entries.length > 0; +} + +export async function clearPendingMemories(root: string, keys?: Set): Promise { + await updatePendingJournal(root, store => { + if (!keys || keys.size === 0) { + store.entries = []; + return store; + } + store.entries = store.entries.filter(entry => !keys.has(memoryKey(entry))); + return store; + }); +} diff --git a/src/plugin.ts b/src/plugin.ts index c822621..8ea2afb 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -28,6 +28,13 @@ import { updateWorkspaceMemory, renderWorkspaceMemory, } from "./workspace-memory.ts"; +import { + appendPendingMemories, + clearPendingMemories, + hasPendingJournalEntries, + loadPendingJournal, + memoryKey, +} from "./pending-journal.ts"; import { loadSessionState, updateSessionState, @@ -178,19 +185,13 @@ export const MemoryV2Plugin: Plugin = async (input) => { const memories = extractExplicitMemories(latestMessage.text); const decisions = memories.filter(memory => memory.type === "decision"); - let workspaceMemory: Awaited> | undefined; if (memories.length > 0) { - workspaceMemory = await updateWorkspaceMemory(directory, store => { - store.entries.push(...memories); - return store; + await updateSessionState(directory, sessionID, state => { + state.pendingMemories.push(...memories); + return state; }); - - // Update frozen cache - const cached = frozenWorkspaceMemoryCache.get(sessionID); - if (cached) { - cached.store = workspaceMemory; - } + await appendPendingMemories(directory, memories); } if (decisions.length > 0) { @@ -210,6 +211,45 @@ export const MemoryV2Plugin: Plugin = async (input) => { processedUserMessages.set(sessionID, processedForSession); } + async function promotePendingMemories(sessionID?: string): Promise { + const [journal, sessionState] = await Promise.all([ + loadPendingJournal(directory), + sessionID ? loadSessionState(directory, sessionID) : Promise.resolve(undefined), + ]); + + const pending = [ + ...(sessionState?.pendingMemories ?? []), + ...journal.entries, + ]; + if (pending.length === 0) return; + + const promotedKeys = new Set(); + await updateWorkspaceMemory(directory, workspaceMemory => { + const existingKeys = new Set(workspaceMemory.entries.map(memory => memoryKey(memory))); + + for (const memory of pending) { + const key = memoryKey(memory); + if (!existingKeys.has(key)) { + workspaceMemory.entries.push(memory); + existingKeys.add(key); + } + promotedKeys.add(key); + } + + return workspaceMemory; + }); + + if (sessionID) { + await updateSessionState(directory, sessionID, state => { + state.pendingMemories = state.pendingMemories.filter(memory => !promotedKeys.has(memoryKey(memory))); + return state; + }); + clearFrozenWorkspaceMemoryCache(sessionID); + } + + await clearPendingMemories(directory, promotedKeys); + } + function bashExitCode(hookOutput: unknown): number | undefined { const output = hookOutput as { exitCode?: unknown; @@ -273,6 +313,13 @@ export const MemoryV2Plugin: Plugin = async (input) => { // Sub-agents are short-lived - skip memory system if (await isSubAgent(sessionID)) return; + // Before first snapshot in this session, promote durable pending memories from + // prior sessions. Keep this before processing latest user text so current-turn + // explicit memory remains pending (not immediately frozen into system[1]). + if (!frozenWorkspaceMemoryCache.has(sessionID) && await hasPendingJournalEntries(directory)) { + await promotePendingMemories(); + } + // Process explicit user memory even on no-tool turns. await processLatestUserMessage(sessionID); @@ -419,25 +466,32 @@ export const MemoryV2Plugin: Plugin = async (input) => { // Sub-agents don't need post-compaction processing if (await isSubAgent(sessionID)) return; - // Parse latest compaction summary for memory candidates + // Parse latest compaction summary for memory candidates, stage them into + // durable pending journal, then promote pending memories. const summary = await latestCompactionSummary(client, sessionID); - if (summary) { - const candidates = parseWorkspaceMemoryCandidates(summary); - if (candidates.length > 0) { - await updateWorkspaceMemory(directory, workspaceMemory => { - workspaceMemory.entries.push(...candidates); - return workspaceMemory; - }); + const candidates = summary ? parseWorkspaceMemoryCandidates(summary) : []; + if (candidates.length > 0) { + await appendPendingMemories(directory, candidates); + } - // Clear frozen cache so next session reloads with new memories - clearFrozenWorkspaceMemoryCache(sessionID); - } + try { + await promotePendingMemories(sessionID); + } catch { + // Keep pending memories in session/journal for retry on next event/session. } } if (event.type === "session.deleted") { const sessionID = (event.properties as { info?: { id?: string } })?.info?.id; if (sessionID) { + // Promote pending memories before deleting per-session state. + // If promotion fails, leave session state and journal intact. + try { + await promotePendingMemories(sessionID); + } catch { + return; + } + // Clean up caches frozenWorkspaceMemoryCache.delete(sessionID); processedUserMessages.delete(sessionID); diff --git a/src/session-state.ts b/src/session-state.ts index db0614b..c168459 100644 --- a/src/session-state.ts +++ b/src/session-state.ts @@ -1,8 +1,9 @@ import { relative } from "path"; import { sessionStatePath } from "./paths.ts"; import { atomicWriteJSON, readJSON, updateJSON } from "./storage.ts"; -import type { ActiveFile, OpenError, SessionDecision, SessionState } from "./types.ts"; +import type { ActiveFile, LongTermMemoryEntry, OpenError, SessionDecision, SessionState } from "./types.ts"; import { HOT_STATE_LIMITS } from "./types.ts"; +import { memoryKey } from "./pending-journal.ts"; const ACTION_WEIGHT: Record = { edit: 50, @@ -20,6 +21,7 @@ export function createEmptySessionState(sessionID: string): SessionState { activeFiles: [], openErrors: [], recentDecisions: [], + pendingMemories: [], }; } @@ -30,6 +32,7 @@ export async function loadSessionState(root: string, sessionID: string): Promise loaded.activeFiles = Array.isArray(loaded.activeFiles) ? loaded.activeFiles : []; loaded.openErrors = Array.isArray(loaded.openErrors) ? loaded.openErrors : []; loaded.recentDecisions = Array.isArray(loaded.recentDecisions) ? loaded.recentDecisions : []; + loaded.pendingMemories = Array.isArray(loaded.pendingMemories) ? loaded.pendingMemories : []; return loaded; } @@ -48,6 +51,7 @@ export async function updateSessionState( current.activeFiles = Array.isArray(current.activeFiles) ? current.activeFiles : []; current.openErrors = Array.isArray(current.openErrors) ? current.openErrors : []; current.recentDecisions = Array.isArray(current.recentDecisions) ? current.recentDecisions : []; + current.pendingMemories = Array.isArray(current.pendingMemories) ? current.pendingMemories : []; return normalizeSessionState(await updater(current)); }); } @@ -57,9 +61,23 @@ function normalizeSessionState(state: SessionState): SessionState { state.activeFiles = state.activeFiles.slice(0, HOT_STATE_LIMITS.maxActiveFilesStored); state.openErrors = state.openErrors.slice(0, HOT_STATE_LIMITS.maxOpenErrorsStored); state.recentDecisions = state.recentDecisions.slice(0, HOT_STATE_LIMITS.maxRecentDecisionsStored); + state.pendingMemories = dedupePendingMemories(Array.isArray(state.pendingMemories) ? state.pendingMemories : []) + .slice(-HOT_STATE_LIMITS.maxPendingMemoriesStored); return state; } +function dedupePendingMemories(memories: LongTermMemoryEntry[]): LongTermMemoryEntry[] { + const seen = new Set(); + const deduped: LongTermMemoryEntry[] = []; + for (const memory of memories) { + const key = memoryKey(memory); + if (seen.has(key)) continue; + seen.add(key); + deduped.push(memory); + } + return deduped; +} + export function touchActiveFile(state: SessionState, filePath: string, action: ActiveFile["action"]): void { const now = Date.now(); const existing = state.activeFiles.find(item => item.path === filePath); @@ -177,8 +195,10 @@ export function renderHotSessionState(state: SessionState, workspaceRoot: string .sort((a, b) => b.lastSeen - a.lastSeen) .slice(0, HOT_STATE_LIMITS.maxOpenErrorsRendered); const decisions = state.recentDecisions.slice(-HOT_STATE_LIMITS.maxRecentDecisionsStored); + const pendingMemories = dedupePendingMemories(state.pendingMemories) + .slice(-HOT_STATE_LIMITS.maxPendingMemoriesRendered); - if (activeFiles.length === 0 && openErrors.length === 0 && decisions.length === 0) return ""; + if (activeFiles.length === 0 && openErrors.length === 0 && decisions.length === 0 && pendingMemories.length === 0) return ""; const lines: string[] = ["Hot session state (current session):"]; @@ -204,6 +224,13 @@ export function renderHotSessionState(state: SessionState, workspaceRoot: string } } + if (pendingMemories.length > 0) { + lines.push("pending_memories:"); + for (const memory of pendingMemories) { + lines.push(`- [${memory.type}] ${memory.text}`); + } + } + return lines.join("\n").slice(0, HOT_STATE_LIMITS.maxRenderedChars); } diff --git a/src/types.ts b/src/types.ts index 1a5e58d..efa170f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -32,6 +32,16 @@ export type WorkspaceMemoryStore = { updatedAt: string; }; +export type PendingMemoryJournalStore = { + version: 1; + workspace: { + root: string; + key: string; + }; + entries: LongTermMemoryEntry[]; + updatedAt: string; +}; + export type ActiveFile = { path: string; action: "read" | "grep" | "edit" | "write"; @@ -69,6 +79,7 @@ export type SessionState = { activeFiles: ActiveFile[]; openErrors: OpenError[]; recentDecisions: SessionDecision[]; + pendingMemories: LongTermMemoryEntry[]; }; export const LONG_TERM_LIMITS = { @@ -86,4 +97,6 @@ export const HOT_STATE_LIMITS = { maxOpenErrorsStored: 5, maxOpenErrorsRendered: 3, maxRecentDecisionsStored: 8, + maxPendingMemoriesStored: 12, + maxPendingMemoriesRendered: 6, } as const; diff --git a/tests/plugin.test.ts b/tests/plugin.test.ts index c1d55f2..a40ea9d 100644 --- a/tests/plugin.test.ts +++ b/tests/plugin.test.ts @@ -1,18 +1,39 @@ import test from "node:test"; import assert from "node:assert/strict"; -import { mkdtemp, rm } from "node:fs/promises"; +import { mkdir, mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; -import { join } from "node:path"; +import { dirname, join } from "node:path"; import { MemoryV2Plugin } from "../src/plugin.ts"; import { loadSessionState, saveSessionState } from "../src/session-state.ts"; import { parseWorkspaceMemoryCandidates } from "../src/extractors.ts"; import type { OpenError } from "../src/types.ts"; +import { workspaceMemoryPath, workspacePendingJournalPath } from "../src/paths.ts"; +import { loadPendingJournal, savePendingJournal } from "../src/pending-journal.ts"; // Mock client for root session (not a sub-agent) function mockRootClient() { return { session: { get: async () => ({ data: { parentID: null } }), + messages: async () => ({ data: [] }), + todo: async () => ({ data: [] }), + }, + }; +} + +function mockClientWithLatestUser(text: string, messageID: string) { + return { + session: { + get: async () => ({ data: { parentID: null } }), + messages: async () => ({ + data: [ + { + info: { role: "user", id: messageID }, + parts: [{ type: "text", text }], + }, + ], + }), + todo: async () => ({ data: [] }), }, }; } @@ -27,6 +48,7 @@ function createSessionWithError(sessionID: string, error: OpenError) { activeFiles: [], openErrors: [error], recentDecisions: [], + pendingMemories: [], }; } @@ -379,3 +401,227 @@ test("chat system transform reuses frozen rendered workspace snapshot", async () await rm(tmpDir, { recursive: true, force: true }); } }); + +test("no compaction: explicit memory is promoted on next session start from durable journal", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-")); + + try { + const firstClient = mockClientWithLatestUser("remember this: Prefer boring cache boundaries.", "msg-remember-1"); + const firstPlugin = await MemoryV2Plugin({ directory: tmpDir, client: firstClient }); + + await (firstPlugin as Record)["experimental.chat.system.transform"]( + { sessionID: "session-without-compaction", model: {} }, + { system: ["base header"] }, + ); + + const secondPlugin = await MemoryV2Plugin({ directory: tmpDir, client: mockRootClient() }); + const output = { system: ["base header"] }; + + await (secondPlugin as Record)["experimental.chat.system.transform"]( + { sessionID: "new-session", model: {} }, + output, + ); + + const workspacePrompt = output.system.find((part: string) => part.startsWith("Workspace memory")); + assert.match(workspacePrompt ?? "", /Prefer boring cache boundaries/); + } finally { + await rm(tmpDir, { recursive: true, force: true }); + } +}); + +test("session.deleted promotes pending memories before deleting session state", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-")); + + try { + const client = mockClientWithLatestUser("remember this: Promote pending memories before delete.", "msg-delete-1"); + const plugin = await MemoryV2Plugin({ directory: tmpDir, client }); + + await (plugin as Record)["experimental.chat.system.transform"]( + { sessionID: "delete-session", model: {} }, + { system: ["base header"] }, + ); + + await (plugin as Record)["event"]({ + event: { + type: "session.deleted", + properties: { info: { id: "delete-session" } }, + }, + }); + + const nextPlugin = await MemoryV2Plugin({ directory: tmpDir, client: mockRootClient() }); + const output = { system: ["base header"] }; + await (nextPlugin as Record)["experimental.chat.system.transform"]( + { sessionID: "after-delete-session", model: {} }, + output, + ); + + const workspacePrompt = output.system.find((part: string) => part.startsWith("Workspace memory")); + assert.match(workspacePrompt ?? "", /Promote pending memories before delete/); + } finally { + await rm(tmpDir, { recursive: true, force: true }); + } +}); + +test("duplicate explicit memories dedupe by normalized type and text, not generated id", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-")); + + try { + const pluginA = await MemoryV2Plugin({ + directory: tmpDir, + client: mockClientWithLatestUser("remember this: Prefer stable cache boundaries.", "msg-a"), + }); + await (pluginA as Record)["experimental.chat.system.transform"]( + { sessionID: "dedupe-session", model: {} }, + { system: ["base header"] }, + ); + + const pluginB = await MemoryV2Plugin({ + directory: tmpDir, + client: mockClientWithLatestUser("remember this: prefer stable cache boundaries.", "msg-b"), + }); + await (pluginB as Record)["experimental.chat.system.transform"]( + { sessionID: "dedupe-session", model: {} }, + { system: ["base header"] }, + ); + + await (pluginB as Record)["event"]({ + event: { type: "session.compacted", properties: { sessionID: "dedupe-session" } }, + }); + + const output = { system: ["base header"] }; + const pluginC = await MemoryV2Plugin({ directory: tmpDir, client: mockRootClient() }); + await (pluginC as Record)["experimental.chat.system.transform"]( + { sessionID: "dedupe-next", model: {} }, + output, + ); + + const joined = output.system.join("\n"); + assert.equal((joined.match(/stable cache boundaries/gi) ?? []).length, 1); + } finally { + await rm(tmpDir, { recursive: true, force: true }); + } +}); + +test("session.compacted promotes pending memories to workspace memory and clears pending list", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-")); + + try { + const client = mockRootClient(); + const plugin = await MemoryV2Plugin({ directory: tmpDir, client }); + + await saveSessionState(tmpDir, { + version: 1, + sessionID: "promote-session", + turn: 1, + updatedAt: new Date().toISOString(), + activeFiles: [], + openErrors: [], + recentDecisions: [], + pendingMemories: [{ + id: "mem_pending_1", + type: "decision", + text: "Use frozen rendered snapshots for cache stability.", + source: "explicit", + confidence: 1, + status: "active", + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }], + }); + + await (plugin as Record)["event"]({ + event: { + type: "session.compacted", + properties: { sessionID: "promote-session" }, + }, + }); + + const state = await loadSessionState(tmpDir, "promote-session"); + assert.equal(state.pendingMemories.length, 0, + "pending memories should be cleared after promotion"); + + const after = { system: ["base header"] }; + await (plugin as Record)["experimental.chat.system.transform"]( + { sessionID: "new-session-after-promotion", model: {} }, + after, + ); + + const workspacePrompt = after.system.find((part: string) => part.startsWith("Workspace memory")); + assert.match(workspacePrompt ?? "", /Use frozen rendered snapshots for cache stability/); + } finally { + await rm(tmpDir, { recursive: true, force: true }); + } +}); + +test("promotion failure does not clear pending memories in session or journal", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-")); + + try { + const client = mockRootClient(); + const plugin = await MemoryV2Plugin({ directory: tmpDir, client }); + + const now = new Date().toISOString(); + await saveSessionState(tmpDir, { + version: 1, + sessionID: "failure-session", + turn: 0, + updatedAt: now, + activeFiles: [], + openErrors: [], + recentDecisions: [], + pendingMemories: [{ + id: "mem_pending_failure", + type: "decision", + text: "Keep pending when promotion fails", + source: "explicit", + confidence: 1, + status: "active", + createdAt: now, + updatedAt: now, + }], + }); + + const journalPath = await workspacePendingJournalPath(tmpDir); + await mkdir(dirname(journalPath), { recursive: true }); + const journal = await loadPendingJournal(tmpDir); + journal.entries = [{ + id: "mem_pending_failure_journal", + type: "decision", + text: "Keep pending when promotion fails", + source: "explicit", + confidence: 1, + status: "active", + createdAt: now, + updatedAt: now, + }]; + await savePendingJournal(tmpDir, journal); + + const wmPath = await workspaceMemoryPath(tmpDir); + await rm(wmPath, { force: true }).catch(() => undefined); + await mkdir(wmPath, { recursive: true }); + + let didThrow = false; + try { + await (plugin as Record)["event"]({ + event: { + type: "session.compacted", + properties: { sessionID: "failure-session" }, + }, + }); + } catch { + didThrow = true; + } + assert.equal(didThrow, false, + "promotion failure should not throw from session.compacted handler"); + + const state = await loadSessionState(tmpDir, "failure-session"); + assert.equal(state.pendingMemories.length, 1, + "session pending memories should remain when promotion fails"); + + const pendingAfter = await loadPendingJournal(tmpDir); + assert.equal(pendingAfter.entries.length, 1, + "journal pending memories should remain when promotion fails"); + } finally { + await rm(tmpDir, { recursive: true, force: true }); + } +});