fix(tui): preserve live parts during session hydration (#30300)

This commit is contained in:
Kit Langton
2026-06-01 21:40:29 -04:00
committed by GitHub
parent 8dc2ffd48f
commit 1cae8f89c5
2 changed files with 361 additions and 22 deletions
@@ -113,6 +113,14 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
const kv = useKV()
const fullSyncedSessions = new Set<string>()
const syncingSessions = new Map<string, Promise<void>>()
const hydratingSessions = new Map<string, { messages: Set<string>; parts: Set<string> }>()
const touchMessage = (sessionID: string, messageID: string) => {
hydratingSessions.get(sessionID)?.messages.add(messageID)
}
const touchPart = (sessionID: string, partID: string) => {
hydratingSessions.get(sessionID)?.parts.add(partID)
}
function sessionListQuery(): { scope?: "project"; path?: string } {
if (!kv.get("session_directory_filter_enabled", true)) return { scope: "project" }
@@ -251,6 +259,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}
case "message.updated": {
touchMessage(event.properties.info.sessionID, event.properties.info.id)
const messages = store.message[event.properties.info.sessionID]
if (!messages) {
setStore("message", event.properties.info.sessionID, [event.properties.info])
@@ -290,6 +299,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
break
}
case "message.removed": {
touchMessage(event.properties.sessionID, event.properties.messageID)
const messages = store.message[event.properties.sessionID]
const result = Binary.search(messages, event.properties.messageID, (m) => m.id)
if (result.found) {
@@ -304,6 +314,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
break
}
case "message.part.updated": {
touchPart(event.properties.part.sessionID, event.properties.part.id)
const parts = store.part[event.properties.part.messageID]
if (!parts) {
setStore("part", event.properties.part.messageID, [event.properties.part])
@@ -329,6 +340,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
if (!parts) break
const result = Binary.search(parts, event.properties.partID, (p) => p.id)
if (!result.found) break
touchPart(event.properties.sessionID, event.properties.partID)
setStore(
"part",
event.properties.messageID,
@@ -343,6 +355,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
}
case "message.part.removed": {
touchPart(event.properties.sessionID, event.properties.partID)
const parts = store.part[event.properties.messageID]
const result = Binary.search(parts, event.properties.partID, (p) => p.id)
if (result.found) {
@@ -520,28 +533,76 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
},
async sync(sessionID: string) {
if (fullSyncedSessions.has(sessionID)) return
const [session, messages, todo, diff] = await Promise.all([
sdk.client.session.get({ sessionID }, { throwOnError: true }),
sdk.client.session.messages({ sessionID, limit: 100 }),
sdk.client.session.todo({ sessionID }),
sdk.client.session.diff({ sessionID }),
])
setStore(
produce((draft) => {
const match = Binary.search(draft.session, sessionID, (s) => s.id)
if (match.found) draft.session[match.index] = session.data!
if (!match.found) draft.session.splice(match.index, 0, session.data!)
draft.todo[sessionID] = todo.data ?? []
const infos: (typeof draft.message)[string] = []
for (const message of messages.data ?? []) {
infos.push(message.info)
draft.part[message.info.id] = message.parts
}
draft.message[sessionID] = infos
draft.session_diff[sessionID] = diff.data ?? []
}),
)
fullSyncedSessions.add(sessionID)
const syncing = syncingSessions.get(sessionID)
if (syncing) return syncing
const tracker = { messages: new Set<string>(), parts: new Set<string>() }
hydratingSessions.set(sessionID, tracker)
const task = (async () => {
const [session, messages, todo, diff] = await Promise.all([
sdk.client.session.get({ sessionID }, { throwOnError: true }),
sdk.client.session.messages({ sessionID, limit: 100 }),
sdk.client.session.todo({ sessionID }),
sdk.client.session.diff({ sessionID }),
])
setStore(
produce((draft) => {
const match = Binary.search(draft.session, sessionID, (s) => s.id)
if (match.found) draft.session[match.index] = session.data!
if (!match.found) draft.session.splice(match.index, 0, session.data!)
draft.todo[sessionID] = todo.data ?? []
const currentMessages = draft.message[sessionID] ?? []
const infos = (messages.data ?? []).flatMap((message) => {
if (!tracker.messages.has(message.info.id)) return [message.info]
const current = currentMessages.find((item) => item.id === message.info.id)
return current ? [current] : []
})
infos.push(
...currentMessages.filter(
(message) => tracker.messages.has(message.id) && !infos.some((item) => item.id === message.id),
),
)
const removed = infos.slice(0, -100)
const visible = infos.slice(-100)
const visibleIDs = new Set(visible.map((message) => message.id))
for (const message of messages.data ?? []) {
if (!visibleIDs.has(message.info.id)) {
delete draft.part[message.info.id]
continue
}
const currentParts = draft.part[message.info.id] ?? []
const parts = message.parts.flatMap((part) => {
const current = currentParts.find((item) => item.id === part.id)
if (tracker.parts.has(part.id)) return current ? [current] : []
if (
current &&
(part.type === "text" || part.type === "reasoning") &&
(current.type === "text" || current.type === "reasoning") &&
part.text.length === 0 &&
current.text.length > 0
) {
return [current]
}
return [part]
})
parts.push(
...currentParts.filter(
(part) => tracker.parts.has(part.id) && !parts.some((item) => item.id === part.id),
),
)
draft.part[message.info.id] = parts
}
for (const message of removed) delete draft.part[message.id]
draft.message[sessionID] = visible
draft.session_diff[sessionID] = diff.data ?? []
}),
)
fullSyncedSessions.add(sessionID)
})().finally(() => {
syncingSessions.delete(sessionID)
hydratingSessions.delete(sessionID)
})
syncingSessions.set(sessionID, task)
return task
},
},
bootstrap,
@@ -0,0 +1,278 @@
/** @jsxImportSource @opentui/solid */
import { expect, test } from "bun:test"
import { Global } from "@opencode-ai/core/global"
import type { GlobalEvent } from "@opencode-ai/sdk/v2"
import { tmpdir } from "../../../fixture/fixture"
import { json, mount, wait } from "./sync-fixture"
const sessionID = "ses_hydration_race"
const messageID = "msg_hydration_race"
const partID = "prt_hydration_race"
const session = {
id: sessionID,
title: "race",
time: { created: 0, updated: 0 },
version: "1.15.13",
directory: "/tmp/opencode/packages/opencode",
}
const assistant = {
id: messageID,
sessionID,
role: "assistant" as const,
agent: "build",
modelID: "model",
providerID: "test",
mode: "build",
parentID: "msg_user",
path: { cwd: session.directory, root: session.directory },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
time: { created: 1, completed: 2 },
}
function global(payload: GlobalEvent["payload"]): GlobalEvent {
return { directory: "/tmp/other", project: "proj_test", payload }
}
test("stale session hydration does not overwrite live message parts", async () => {
const previous = Global.Path.state
await using tmp = await tmpdir()
Global.Path.state = tmp.path
await Bun.write(`${tmp.path}/kv.json`, "{}")
let resolveMessages!: (response: Response) => void
const messages = new Promise<Response>((resolve) => {
resolveMessages = resolve
})
let requested = false
const { app, emit, sync } = await mount((url) => {
if (url.pathname === `/session/${sessionID}`) return json(session)
if (url.pathname === `/session/${sessionID}/message`) {
requested = true
return messages
}
if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([])
return undefined
})
try {
const hydrate = sync.session.sync(sessionID)
await wait(() => requested)
emit(global({ id: "evt_message", type: "message.updated", properties: { sessionID, info: assistant } }))
emit(
global({
id: "evt_part",
type: "message.part.updated",
properties: {
sessionID,
time: 2,
part: { id: partID, sessionID, messageID, type: "text", text: "visible live content" },
},
}),
)
await wait(() => sync.data.part[messageID]?.[0]?.type === "text")
resolveMessages(
json([
{
info: assistant,
parts: [{ id: partID, sessionID, messageID, type: "text", text: "" }],
},
]),
)
await hydrate
expect(sync.data.part[messageID][0]).toMatchObject({ text: "visible live content" })
} finally {
app.renderer.destroy()
Global.Path.state = previous
}
})
test("orphan live deltas do not suppress hydrated parts", async () => {
const previous = Global.Path.state
await using tmp = await tmpdir()
Global.Path.state = tmp.path
await Bun.write(`${tmp.path}/kv.json`, "{}")
let resolveMessages!: (response: Response) => void
const messages = new Promise<Response>((resolve) => {
resolveMessages = resolve
})
let requested = false
const { app, emit, sync } = await mount((url) => {
if (url.pathname === `/session/${sessionID}`) return json(session)
if (url.pathname === `/session/${sessionID}/message`) {
requested = true
return messages
}
if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([])
return undefined
})
try {
const hydrate = sync.session.sync(sessionID)
await wait(() => requested)
emit(
global({
id: "evt_delta",
type: "message.part.delta",
properties: { sessionID, messageID, partID, field: "text", delta: "ignored until part exists" },
}),
)
resolveMessages(
json([{ info: assistant, parts: [{ id: partID, sessionID, messageID, type: "text", text: "hydrated" }] }]),
)
await hydrate
expect(sync.data.part[messageID][0]).toMatchObject({ text: "hydrated" })
} finally {
app.renderer.destroy()
Global.Path.state = previous
}
})
test("hydration does not clear text streamed before it starts", async () => {
const previous = Global.Path.state
await using tmp = await tmpdir()
Global.Path.state = tmp.path
await Bun.write(`${tmp.path}/kv.json`, "{}")
let resolveMessages!: (response: Response) => void
const messages = new Promise<Response>((resolve) => {
resolveMessages = resolve
})
let requested = false
const { app, emit, sync } = await mount((url) => {
if (url.pathname === `/session/${sessionID}`) return json(session)
if (url.pathname === `/session/${sessionID}/message`) {
requested = true
return messages
}
if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([])
return undefined
})
try {
emit(global({ id: "evt_message", type: "message.updated", properties: { sessionID, info: assistant } }))
emit(
global({
id: "evt_part",
type: "message.part.updated",
properties: {
sessionID,
time: 1,
part: { id: partID, sessionID, messageID, type: "text", text: "" },
},
}),
)
emit(
global({
id: "evt_delta",
type: "message.part.delta",
properties: { sessionID, messageID, partID, field: "text", delta: "visible streamed content" },
}),
)
await wait(() => sync.data.part[messageID]?.[0]?.type === "text" && sync.data.part[messageID][0].text !== "")
const hydrate = sync.session.sync(sessionID)
await wait(() => requested)
resolveMessages(json([{ info: assistant, parts: [{ id: partID, sessionID, messageID, type: "text", text: "" }] }]))
await hydrate
expect(sync.data.part[messageID][0]).toMatchObject({ text: "visible streamed content" })
} finally {
app.renderer.destroy()
Global.Path.state = previous
}
})
test("live messages merged during hydration retain the 100 message window", async () => {
const previous = Global.Path.state
await using tmp = await tmpdir()
Global.Path.state = tmp.path
await Bun.write(`${tmp.path}/kv.json`, "{}")
let resolveMessages!: (response: Response) => void
const messages = new Promise<Response>((resolve) => {
resolveMessages = resolve
})
let requested = false
const { app, emit, sync } = await mount((url) => {
if (url.pathname === `/session/${sessionID}`) return json(session)
if (url.pathname === `/session/${sessionID}/message`) {
requested = true
return messages
}
if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([])
return undefined
})
try {
const hydrate = sync.session.sync(sessionID)
await wait(() => requested)
const live = { ...assistant, id: "msg_z_live" }
emit(global({ id: "evt_live", type: "message.updated", properties: { sessionID, info: live } }))
await wait(() => sync.data.message[sessionID]?.some((message) => message.id === live.id) ?? false)
resolveMessages(
json(
Array.from({ length: 100 }, (_, index) => {
const id = `msg_${String(index).padStart(3, "0")}`
return {
info: { ...assistant, id },
parts: [{ id: `prt_${id}`, sessionID, messageID: id, type: "text", text: id }],
}
}),
),
)
await hydrate
expect(sync.data.message[sessionID]).toHaveLength(100)
expect(sync.data.message[sessionID].at(-1)?.id).toBe(live.id)
expect(sync.data.message[sessionID].some((message) => message.id === "msg_000")).toBe(false)
expect(sync.data.part.msg_000).toBeUndefined()
} finally {
app.renderer.destroy()
Global.Path.state = previous
}
})
test("a message removed during hydration does not regain stale parts", async () => {
const previous = Global.Path.state
await using tmp = await tmpdir()
Global.Path.state = tmp.path
await Bun.write(`${tmp.path}/kv.json`, "{}")
let resolveMessages!: (response: Response) => void
const messages = new Promise<Response>((resolve) => {
resolveMessages = resolve
})
let requested = false
const { app, emit, sync } = await mount((url) => {
if (url.pathname === `/session/${sessionID}`) return json(session)
if (url.pathname === `/session/${sessionID}/message`) {
requested = true
return messages
}
if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([])
return undefined
})
try {
emit(global({ id: "evt_message", type: "message.updated", properties: { sessionID, info: assistant } }))
await wait(() => sync.data.message[sessionID]?.length === 1)
const hydrate = sync.session.sync(sessionID)
await wait(() => requested)
emit(global({ id: "evt_removed", type: "message.removed", properties: { sessionID, messageID } }))
await wait(() => sync.data.message[sessionID]?.length === 0)
resolveMessages(
json([{ info: assistant, parts: [{ id: partID, sessionID, messageID, type: "text", text: "stale" }] }]),
)
await hydrate
expect(sync.data.message[sessionID]).toEqual([])
expect(sync.data.part[messageID]).toBeUndefined()
} finally {
app.renderer.destroy()
Global.Path.state = previous
}
})