feat: add durable pending memory journal

This commit is contained in:
Ralph Chang
2026-04-27 02:20:26 +08:00
parent 026c75a5e4
commit e7c7a5cfb2
6 changed files with 471 additions and 25 deletions
+4
View File
@@ -20,6 +20,10 @@ export async function workspaceMemoryPath(root: string): Promise<string> {
return join(await memoryRoot(root), "workspace-memory.json");
}
export async function workspacePendingJournalPath(root: string): Promise<string> {
return join(await memoryRoot(root), "workspace-pending-journal.json");
}
export async function sessionStatePath(root: string, sessionID: string): Promise<string> {
const safeSessionID = createHash("sha256").update(sessionID).digest("hex").slice(0, 32);
return join(await memoryRoot(root), "sessions", `${safeSessionID}.json`);
+102
View File
@@ -0,0 +1,102 @@
import type { LongTermMemoryEntry, PendingMemoryJournalStore } from "./types.ts";
import { workspaceKey, workspacePendingJournalPath } from "./paths.ts";
import { atomicWriteJSON, readJSON, updateJSON } from "./storage.ts";
function normalizeMemoryText(text: string): string {
return text
.normalize("NFKC")
.toLowerCase()
.replace(/[\s\p{P}]+/gu, " ")
.trim();
}
export function memoryKey(entry: Pick<LongTermMemoryEntry, "type" | "text">): string {
return `${entry.type}:${normalizeMemoryText(entry.text)}`;
}
export async function emptyPendingJournal(root: string): Promise<PendingMemoryJournalStore> {
return {
version: 1,
workspace: { root, key: await workspaceKey(root) },
entries: [],
updatedAt: new Date().toISOString(),
};
}
function dedupeByText(entries: LongTermMemoryEntry[]): LongTermMemoryEntry[] {
const seen = new Set<string>();
const result: LongTermMemoryEntry[] = [];
for (const entry of entries) {
const key = memoryKey(entry);
if (seen.has(key)) continue;
seen.add(key);
result.push(entry);
}
return result;
}
function normalizeJournal(
root: string,
store: PendingMemoryJournalStore,
): Promise<PendingMemoryJournalStore> {
return workspaceKey(root).then(key => ({
version: 1,
workspace: { root, key },
entries: dedupeByText(Array.isArray(store.entries) ? store.entries : []),
updatedAt: new Date().toISOString(),
}));
}
export async function loadPendingJournal(root: string): Promise<PendingMemoryJournalStore> {
const path = await workspacePendingJournalPath(root);
const fallback = await emptyPendingJournal(root);
const loaded = await readJSON(path, () => fallback) as Partial<PendingMemoryJournalStore>;
return normalizeJournal(root, {
version: loaded.version ?? 1,
workspace: loaded.workspace ?? fallback.workspace,
entries: Array.isArray(loaded.entries) ? loaded.entries : [],
updatedAt: loaded.updatedAt ?? fallback.updatedAt,
});
}
export async function savePendingJournal(root: string, store: PendingMemoryJournalStore): Promise<void> {
await atomicWriteJSON(await workspacePendingJournalPath(root), await normalizeJournal(root, store));
}
export async function updatePendingJournal(
root: string,
updater: (store: PendingMemoryJournalStore) => PendingMemoryJournalStore | Promise<PendingMemoryJournalStore>,
): Promise<PendingMemoryJournalStore> {
const path = await workspacePendingJournalPath(root);
const fallback = await emptyPendingJournal(root);
return updateJSON(path, () => fallback, async current => {
const normalized = await normalizeJournal(root, current);
return normalizeJournal(root, await updater(normalized));
});
}
export async function appendPendingMemories(root: string, memories: LongTermMemoryEntry[]): Promise<void> {
if (memories.length === 0) return;
await updatePendingJournal(root, store => {
store.entries.push(...memories);
return store;
});
}
export async function hasPendingJournalEntries(root: string): Promise<boolean> {
const journal = await loadPendingJournal(root);
return journal.entries.length > 0;
}
export async function clearPendingMemories(root: string, keys?: Set<string>): Promise<void> {
await updatePendingJournal(root, store => {
if (!keys || keys.size === 0) {
store.entries = [];
return store;
}
store.entries = store.entries.filter(entry => !keys.has(memoryKey(entry)));
return store;
});
}
+74 -20
View File
@@ -28,6 +28,13 @@ import {
updateWorkspaceMemory,
renderWorkspaceMemory,
} from "./workspace-memory.ts";
import {
appendPendingMemories,
clearPendingMemories,
hasPendingJournalEntries,
loadPendingJournal,
memoryKey,
} from "./pending-journal.ts";
import {
loadSessionState,
updateSessionState,
@@ -178,19 +185,13 @@ export const MemoryV2Plugin: Plugin = async (input) => {
const memories = extractExplicitMemories(latestMessage.text);
const decisions = memories.filter(memory => memory.type === "decision");
let workspaceMemory: Awaited<ReturnType<typeof loadWorkspaceMemory>> | undefined;
if (memories.length > 0) {
workspaceMemory = await updateWorkspaceMemory(directory, store => {
store.entries.push(...memories);
return store;
await updateSessionState(directory, sessionID, state => {
state.pendingMemories.push(...memories);
return state;
});
// Update frozen cache
const cached = frozenWorkspaceMemoryCache.get(sessionID);
if (cached) {
cached.store = workspaceMemory;
}
await appendPendingMemories(directory, memories);
}
if (decisions.length > 0) {
@@ -210,6 +211,45 @@ export const MemoryV2Plugin: Plugin = async (input) => {
processedUserMessages.set(sessionID, processedForSession);
}
async function promotePendingMemories(sessionID?: string): Promise<void> {
const [journal, sessionState] = await Promise.all([
loadPendingJournal(directory),
sessionID ? loadSessionState(directory, sessionID) : Promise.resolve(undefined),
]);
const pending = [
...(sessionState?.pendingMemories ?? []),
...journal.entries,
];
if (pending.length === 0) return;
const promotedKeys = new Set<string>();
await updateWorkspaceMemory(directory, workspaceMemory => {
const existingKeys = new Set(workspaceMemory.entries.map(memory => memoryKey(memory)));
for (const memory of pending) {
const key = memoryKey(memory);
if (!existingKeys.has(key)) {
workspaceMemory.entries.push(memory);
existingKeys.add(key);
}
promotedKeys.add(key);
}
return workspaceMemory;
});
if (sessionID) {
await updateSessionState(directory, sessionID, state => {
state.pendingMemories = state.pendingMemories.filter(memory => !promotedKeys.has(memoryKey(memory)));
return state;
});
clearFrozenWorkspaceMemoryCache(sessionID);
}
await clearPendingMemories(directory, promotedKeys);
}
function bashExitCode(hookOutput: unknown): number | undefined {
const output = hookOutput as {
exitCode?: unknown;
@@ -273,6 +313,13 @@ export const MemoryV2Plugin: Plugin = async (input) => {
// Sub-agents are short-lived - skip memory system
if (await isSubAgent(sessionID)) return;
// Before first snapshot in this session, promote durable pending memories from
// prior sessions. Keep this before processing latest user text so current-turn
// explicit memory remains pending (not immediately frozen into system[1]).
if (!frozenWorkspaceMemoryCache.has(sessionID) && await hasPendingJournalEntries(directory)) {
await promotePendingMemories();
}
// Process explicit user memory even on no-tool turns.
await processLatestUserMessage(sessionID);
@@ -419,25 +466,32 @@ export const MemoryV2Plugin: Plugin = async (input) => {
// Sub-agents don't need post-compaction processing
if (await isSubAgent(sessionID)) return;
// Parse latest compaction summary for memory candidates
// Parse latest compaction summary for memory candidates, stage them into
// durable pending journal, then promote pending memories.
const summary = await latestCompactionSummary(client, sessionID);
if (summary) {
const candidates = parseWorkspaceMemoryCandidates(summary);
const candidates = summary ? parseWorkspaceMemoryCandidates(summary) : [];
if (candidates.length > 0) {
await updateWorkspaceMemory(directory, workspaceMemory => {
workspaceMemory.entries.push(...candidates);
return workspaceMemory;
});
// Clear frozen cache so next session reloads with new memories
clearFrozenWorkspaceMemoryCache(sessionID);
await appendPendingMemories(directory, candidates);
}
try {
await promotePendingMemories(sessionID);
} catch {
// Keep pending memories in session/journal for retry on next event/session.
}
}
if (event.type === "session.deleted") {
const sessionID = (event.properties as { info?: { id?: string } })?.info?.id;
if (sessionID) {
// Promote pending memories before deleting per-session state.
// If promotion fails, leave session state and journal intact.
try {
await promotePendingMemories(sessionID);
} catch {
return;
}
// Clean up caches
frozenWorkspaceMemoryCache.delete(sessionID);
processedUserMessages.delete(sessionID);
+29 -2
View File
@@ -1,8 +1,9 @@
import { relative } from "path";
import { sessionStatePath } from "./paths.ts";
import { atomicWriteJSON, readJSON, updateJSON } from "./storage.ts";
import type { ActiveFile, OpenError, SessionDecision, SessionState } from "./types.ts";
import type { ActiveFile, LongTermMemoryEntry, OpenError, SessionDecision, SessionState } from "./types.ts";
import { HOT_STATE_LIMITS } from "./types.ts";
import { memoryKey } from "./pending-journal.ts";
const ACTION_WEIGHT: Record<ActiveFile["action"], number> = {
edit: 50,
@@ -20,6 +21,7 @@ export function createEmptySessionState(sessionID: string): SessionState {
activeFiles: [],
openErrors: [],
recentDecisions: [],
pendingMemories: [],
};
}
@@ -30,6 +32,7 @@ export async function loadSessionState(root: string, sessionID: string): Promise
loaded.activeFiles = Array.isArray(loaded.activeFiles) ? loaded.activeFiles : [];
loaded.openErrors = Array.isArray(loaded.openErrors) ? loaded.openErrors : [];
loaded.recentDecisions = Array.isArray(loaded.recentDecisions) ? loaded.recentDecisions : [];
loaded.pendingMemories = Array.isArray(loaded.pendingMemories) ? loaded.pendingMemories : [];
return loaded;
}
@@ -48,6 +51,7 @@ export async function updateSessionState(
current.activeFiles = Array.isArray(current.activeFiles) ? current.activeFiles : [];
current.openErrors = Array.isArray(current.openErrors) ? current.openErrors : [];
current.recentDecisions = Array.isArray(current.recentDecisions) ? current.recentDecisions : [];
current.pendingMemories = Array.isArray(current.pendingMemories) ? current.pendingMemories : [];
return normalizeSessionState(await updater(current));
});
}
@@ -57,9 +61,23 @@ function normalizeSessionState(state: SessionState): SessionState {
state.activeFiles = state.activeFiles.slice(0, HOT_STATE_LIMITS.maxActiveFilesStored);
state.openErrors = state.openErrors.slice(0, HOT_STATE_LIMITS.maxOpenErrorsStored);
state.recentDecisions = state.recentDecisions.slice(0, HOT_STATE_LIMITS.maxRecentDecisionsStored);
state.pendingMemories = dedupePendingMemories(Array.isArray(state.pendingMemories) ? state.pendingMemories : [])
.slice(-HOT_STATE_LIMITS.maxPendingMemoriesStored);
return state;
}
function dedupePendingMemories(memories: LongTermMemoryEntry[]): LongTermMemoryEntry[] {
const seen = new Set<string>();
const deduped: LongTermMemoryEntry[] = [];
for (const memory of memories) {
const key = memoryKey(memory);
if (seen.has(key)) continue;
seen.add(key);
deduped.push(memory);
}
return deduped;
}
export function touchActiveFile(state: SessionState, filePath: string, action: ActiveFile["action"]): void {
const now = Date.now();
const existing = state.activeFiles.find(item => item.path === filePath);
@@ -177,8 +195,10 @@ export function renderHotSessionState(state: SessionState, workspaceRoot: string
.sort((a, b) => b.lastSeen - a.lastSeen)
.slice(0, HOT_STATE_LIMITS.maxOpenErrorsRendered);
const decisions = state.recentDecisions.slice(-HOT_STATE_LIMITS.maxRecentDecisionsStored);
const pendingMemories = dedupePendingMemories(state.pendingMemories)
.slice(-HOT_STATE_LIMITS.maxPendingMemoriesRendered);
if (activeFiles.length === 0 && openErrors.length === 0 && decisions.length === 0) return "";
if (activeFiles.length === 0 && openErrors.length === 0 && decisions.length === 0 && pendingMemories.length === 0) return "";
const lines: string[] = ["Hot session state (current session):"];
@@ -204,6 +224,13 @@ export function renderHotSessionState(state: SessionState, workspaceRoot: string
}
}
if (pendingMemories.length > 0) {
lines.push("pending_memories:");
for (const memory of pendingMemories) {
lines.push(`- [${memory.type}] ${memory.text}`);
}
}
return lines.join("\n").slice(0, HOT_STATE_LIMITS.maxRenderedChars);
}
+13
View File
@@ -32,6 +32,16 @@ export type WorkspaceMemoryStore = {
updatedAt: string;
};
export type PendingMemoryJournalStore = {
version: 1;
workspace: {
root: string;
key: string;
};
entries: LongTermMemoryEntry[];
updatedAt: string;
};
export type ActiveFile = {
path: string;
action: "read" | "grep" | "edit" | "write";
@@ -69,6 +79,7 @@ export type SessionState = {
activeFiles: ActiveFile[];
openErrors: OpenError[];
recentDecisions: SessionDecision[];
pendingMemories: LongTermMemoryEntry[];
};
export const LONG_TERM_LIMITS = {
@@ -86,4 +97,6 @@ export const HOT_STATE_LIMITS = {
maxOpenErrorsStored: 5,
maxOpenErrorsRendered: 3,
maxRecentDecisionsStored: 8,
maxPendingMemoriesStored: 12,
maxPendingMemoriesRendered: 6,
} as const;
+248 -2
View File
@@ -1,18 +1,39 @@
import test from "node:test";
import assert from "node:assert/strict";
import { mkdtemp, rm } from "node:fs/promises";
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { dirname, join } from "node:path";
import { MemoryV2Plugin } from "../src/plugin.ts";
import { loadSessionState, saveSessionState } from "../src/session-state.ts";
import { parseWorkspaceMemoryCandidates } from "../src/extractors.ts";
import type { OpenError } from "../src/types.ts";
import { workspaceMemoryPath, workspacePendingJournalPath } from "../src/paths.ts";
import { loadPendingJournal, savePendingJournal } from "../src/pending-journal.ts";
// Mock client for root session (not a sub-agent)
function mockRootClient() {
return {
session: {
get: async () => ({ data: { parentID: null } }),
messages: async () => ({ data: [] }),
todo: async () => ({ data: [] }),
},
};
}
function mockClientWithLatestUser(text: string, messageID: string) {
return {
session: {
get: async () => ({ data: { parentID: null } }),
messages: async () => ({
data: [
{
info: { role: "user", id: messageID },
parts: [{ type: "text", text }],
},
],
}),
todo: async () => ({ data: [] }),
},
};
}
@@ -27,6 +48,7 @@ function createSessionWithError(sessionID: string, error: OpenError) {
activeFiles: [],
openErrors: [error],
recentDecisions: [],
pendingMemories: [],
};
}
@@ -379,3 +401,227 @@ test("chat system transform reuses frozen rendered workspace snapshot", async ()
await rm(tmpDir, { recursive: true, force: true });
}
});
test("no compaction: explicit memory is promoted on next session start from durable journal", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-"));
try {
const firstClient = mockClientWithLatestUser("remember this: Prefer boring cache boundaries.", "msg-remember-1");
const firstPlugin = await MemoryV2Plugin({ directory: tmpDir, client: firstClient });
await (firstPlugin as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "session-without-compaction", model: {} },
{ system: ["base header"] },
);
const secondPlugin = await MemoryV2Plugin({ directory: tmpDir, client: mockRootClient() });
const output = { system: ["base header"] };
await (secondPlugin as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "new-session", model: {} },
output,
);
const workspacePrompt = output.system.find((part: string) => part.startsWith("Workspace memory"));
assert.match(workspacePrompt ?? "", /Prefer boring cache boundaries/);
} finally {
await rm(tmpDir, { recursive: true, force: true });
}
});
test("session.deleted promotes pending memories before deleting session state", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-"));
try {
const client = mockClientWithLatestUser("remember this: Promote pending memories before delete.", "msg-delete-1");
const plugin = await MemoryV2Plugin({ directory: tmpDir, client });
await (plugin as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "delete-session", model: {} },
{ system: ["base header"] },
);
await (plugin as Record<string, Function>)["event"]({
event: {
type: "session.deleted",
properties: { info: { id: "delete-session" } },
},
});
const nextPlugin = await MemoryV2Plugin({ directory: tmpDir, client: mockRootClient() });
const output = { system: ["base header"] };
await (nextPlugin as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "after-delete-session", model: {} },
output,
);
const workspacePrompt = output.system.find((part: string) => part.startsWith("Workspace memory"));
assert.match(workspacePrompt ?? "", /Promote pending memories before delete/);
} finally {
await rm(tmpDir, { recursive: true, force: true });
}
});
test("duplicate explicit memories dedupe by normalized type and text, not generated id", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-"));
try {
const pluginA = await MemoryV2Plugin({
directory: tmpDir,
client: mockClientWithLatestUser("remember this: Prefer stable cache boundaries.", "msg-a"),
});
await (pluginA as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "dedupe-session", model: {} },
{ system: ["base header"] },
);
const pluginB = await MemoryV2Plugin({
directory: tmpDir,
client: mockClientWithLatestUser("remember this: prefer stable cache boundaries.", "msg-b"),
});
await (pluginB as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "dedupe-session", model: {} },
{ system: ["base header"] },
);
await (pluginB as Record<string, Function>)["event"]({
event: { type: "session.compacted", properties: { sessionID: "dedupe-session" } },
});
const output = { system: ["base header"] };
const pluginC = await MemoryV2Plugin({ directory: tmpDir, client: mockRootClient() });
await (pluginC as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "dedupe-next", model: {} },
output,
);
const joined = output.system.join("\n");
assert.equal((joined.match(/stable cache boundaries/gi) ?? []).length, 1);
} finally {
await rm(tmpDir, { recursive: true, force: true });
}
});
test("session.compacted promotes pending memories to workspace memory and clears pending list", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-"));
try {
const client = mockRootClient();
const plugin = await MemoryV2Plugin({ directory: tmpDir, client });
await saveSessionState(tmpDir, {
version: 1,
sessionID: "promote-session",
turn: 1,
updatedAt: new Date().toISOString(),
activeFiles: [],
openErrors: [],
recentDecisions: [],
pendingMemories: [{
id: "mem_pending_1",
type: "decision",
text: "Use frozen rendered snapshots for cache stability.",
source: "explicit",
confidence: 1,
status: "active",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
}],
});
await (plugin as Record<string, Function>)["event"]({
event: {
type: "session.compacted",
properties: { sessionID: "promote-session" },
},
});
const state = await loadSessionState(tmpDir, "promote-session");
assert.equal(state.pendingMemories.length, 0,
"pending memories should be cleared after promotion");
const after = { system: ["base header"] };
await (plugin as Record<string, Function>)["experimental.chat.system.transform"](
{ sessionID: "new-session-after-promotion", model: {} },
after,
);
const workspacePrompt = after.system.find((part: string) => part.startsWith("Workspace memory"));
assert.match(workspacePrompt ?? "", /Use frozen rendered snapshots for cache stability/);
} finally {
await rm(tmpDir, { recursive: true, force: true });
}
});
test("promotion failure does not clear pending memories in session or journal", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "memory-plugin-test-"));
try {
const client = mockRootClient();
const plugin = await MemoryV2Plugin({ directory: tmpDir, client });
const now = new Date().toISOString();
await saveSessionState(tmpDir, {
version: 1,
sessionID: "failure-session",
turn: 0,
updatedAt: now,
activeFiles: [],
openErrors: [],
recentDecisions: [],
pendingMemories: [{
id: "mem_pending_failure",
type: "decision",
text: "Keep pending when promotion fails",
source: "explicit",
confidence: 1,
status: "active",
createdAt: now,
updatedAt: now,
}],
});
const journalPath = await workspacePendingJournalPath(tmpDir);
await mkdir(dirname(journalPath), { recursive: true });
const journal = await loadPendingJournal(tmpDir);
journal.entries = [{
id: "mem_pending_failure_journal",
type: "decision",
text: "Keep pending when promotion fails",
source: "explicit",
confidence: 1,
status: "active",
createdAt: now,
updatedAt: now,
}];
await savePendingJournal(tmpDir, journal);
const wmPath = await workspaceMemoryPath(tmpDir);
await rm(wmPath, { force: true }).catch(() => undefined);
await mkdir(wmPath, { recursive: true });
let didThrow = false;
try {
await (plugin as Record<string, Function>)["event"]({
event: {
type: "session.compacted",
properties: { sessionID: "failure-session" },
},
});
} catch {
didThrow = true;
}
assert.equal(didThrow, false,
"promotion failure should not throw from session.compacted handler");
const state = await loadSessionState(tmpDir, "failure-session");
assert.equal(state.pendingMemories.length, 1,
"session pending memories should remain when promotion fails");
const pendingAfter = await loadPendingJournal(tmpDir);
assert.equal(pendingAfter.entries.length, 1,
"journal pending memories should remain when promotion fails");
} finally {
await rm(tmpDir, { recursive: true, force: true });
}
});