add run --replay mode (#30239)

This commit is contained in:
Simon Klee
2026-06-01 15:11:45 +02:00
committed by GitHub
parent d85f8cd4d8
commit 6a3b2f339a
16 changed files with 1292 additions and 58 deletions
+15 -15
View File
@@ -611,9 +611,9 @@
"typescript": "catalog:",
},
"peerDependencies": {
"@opentui/core": ">=0.3.0",
"@opentui/keymap": ">=0.3.0",
"@opentui/solid": ">=0.3.0",
"@opentui/core": ">=0.3.1",
"@opentui/keymap": ">=0.3.1",
"@opentui/solid": ">=0.3.1",
},
"optionalPeers": [
"@opentui/core",
@@ -862,9 +862,9 @@
"@npmcli/arborist": "9.4.0",
"@octokit/rest": "22.0.0",
"@openauthjs/openauth": "0.0.0-20250322224806",
"@opentui/core": "0.3.0",
"@opentui/keymap": "0.3.0",
"@opentui/solid": "0.3.0",
"@opentui/core": "0.3.1",
"@opentui/keymap": "0.3.1",
"@opentui/solid": "0.3.1",
"@pierre/diffs": "1.1.0-beta.18",
"@playwright/test": "1.59.1",
"@sentry/solid": "10.36.0",
@@ -1758,23 +1758,23 @@
"@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.40.0", "", {}, "sha512-cifvXDhcqMwwTlTK04GBNeIe7yyo28Mfby85QXFe1Yk8nmi36Ab/5UQwptOx84SsoGNRg+EVSjwzfSZMy6pmlw=="],
"@opentui/core": ["@opentui/core@0.3.0", "", { "dependencies": { "bun-ffi-structs": "0.2.2", "diff": "9.0.0", "marked": "17.0.1", "string-width": "7.2.0", "strip-ansi": "7.1.2", "yoga-layout": "3.2.1" }, "optionalDependencies": { "@opentui/core-darwin-arm64": "0.3.0", "@opentui/core-darwin-x64": "0.3.0", "@opentui/core-linux-arm64": "0.3.0", "@opentui/core-linux-x64": "0.3.0", "@opentui/core-win32-arm64": "0.3.0", "@opentui/core-win32-x64": "0.3.0" }, "peerDependencies": { "web-tree-sitter": "0.25.10" } }, "sha512-wvNESYGYGRLuvarZ3QY4CTB+BziZ/j6Snd9qRKD4fQ7SF6G4UpYElLTFrg7uzRo1v7WJTqbquymcTvWEHMnpYA=="],
"@opentui/core": ["@opentui/core@0.3.1", "", { "dependencies": { "bun-ffi-structs": "0.2.2", "diff": "9.0.0", "marked": "17.0.1", "string-width": "7.2.0", "strip-ansi": "7.1.2", "yoga-layout": "3.2.1" }, "optionalDependencies": { "@opentui/core-darwin-arm64": "0.3.1", "@opentui/core-darwin-x64": "0.3.1", "@opentui/core-linux-arm64": "0.3.1", "@opentui/core-linux-x64": "0.3.1", "@opentui/core-win32-arm64": "0.3.1", "@opentui/core-win32-x64": "0.3.1" }, "peerDependencies": { "web-tree-sitter": "0.25.10" } }, "sha512-kQFSsSCgtlasSqTigCgKmM67xaquGvTg+vwimDnFSZtcBEt4E3dz7qLrbeh5FVvTA+RMbwe+Bozq03PW+SgjXw=="],
"@opentui/core-darwin-arm64": ["@opentui/core-darwin-arm64@0.3.0", "", { "os": "darwin", "cpu": "arm64" }, "sha512-/eDfAcutAHJqR9spwHMLuo6LMqngymev/m+i6uqlk98gX1EJiJe2pJ16sKbp3RctgH/Gz/8TYOhVHpPGYJl7yQ=="],
"@opentui/core-darwin-arm64": ["@opentui/core-darwin-arm64@0.3.1", "", { "os": "darwin", "cpu": "arm64" }, "sha512-krvVfiBpeBY+727R8yogdqIcxkK3RUVcI97bqjl8jTeDMcWOkFFfHezssRMPmbR5x++1tX669Fz3fuxoe7XUIg=="],
"@opentui/core-darwin-x64": ["@opentui/core-darwin-x64@0.3.0", "", { "os": "darwin", "cpu": "x64" }, "sha512-/j6EWAvdwhz1wU/mWfXepAf3+NuMYz2Ic5ozaid5LdwIpPomIkM9yCUDm76mQhRBbjsAl/7UeSeUA0qSCMSZBg=="],
"@opentui/core-darwin-x64": ["@opentui/core-darwin-x64@0.3.1", "", { "os": "darwin", "cpu": "x64" }, "sha512-D/6ec5H8SPpSBMr01/sqgSddIl1Qc1QMKsDl/wV5MpbxYc7Qvie9qlNvvoSsWNfAXAbafLRb1jQBzouk41cp1w=="],
"@opentui/core-linux-arm64": ["@opentui/core-linux-arm64@0.3.0", "", { "os": "linux", "cpu": "arm64" }, "sha512-uUFVT3V35KkM1m8gaLmRcTV9dsJzXnxwM+dv6+NjScx0W/Y0CJKbW9wDYwnLyPnBNgaFUi171zmJra5gTtFTsw=="],
"@opentui/core-linux-arm64": ["@opentui/core-linux-arm64@0.3.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-E/FFBoAsWJyS/EO/cF7h7DuEENYa9nAdSv1W/TIyKXpBisN6K3U1Xgbk528TkfWjrwJjhGs+9OMYdXuAHd5LTw=="],
"@opentui/core-linux-x64": ["@opentui/core-linux-x64@0.3.0", "", { "os": "linux", "cpu": "x64" }, "sha512-73bNNNU2OaqZQLIlvzDOdAzQmzBAqf+cSilmJ+Y9JnybrBn1d6VShC66+V4xxIgonq1swk7BD+SUHYbwwGilQA=="],
"@opentui/core-linux-x64": ["@opentui/core-linux-x64@0.3.1", "", { "os": "linux", "cpu": "x64" }, "sha512-Btb7Q4BOC55Aj2qCs0VoxGuj87DNfUEaSx0z89oeU4npTN+6SpJApyGZTCNNeSe2sdmOGeh/8eAR4X96ORjcKg=="],
"@opentui/core-win32-arm64": ["@opentui/core-win32-arm64@0.3.0", "", { "os": "win32", "cpu": "arm64" }, "sha512-jg5KrV/4mVQ0mdkcL9CtQVtBk0NAtQ+2rCKoZ/jNHB6GxGK0ot9vDV6P3X68hZVkvpb2pdXfg6GRsZJ+Np4hZA=="],
"@opentui/core-win32-arm64": ["@opentui/core-win32-arm64@0.3.1", "", { "os": "win32", "cpu": "arm64" }, "sha512-+lt24u3KwEPG69oXDOLz9N484wPcAHvrPbDNU77OT6DvWew+StAjh40eY+Zeu0TkTNDWfj7qnQKV0GKWtFA3cw=="],
"@opentui/core-win32-x64": ["@opentui/core-win32-x64@0.3.0", "", { "os": "win32", "cpu": "x64" }, "sha512-kiM3C5bwQBTfrJKAOfb+L3U6MMkPSQlMhAERlLMjqSurc+llcyqygr/wbXSvfAqJtKlIpf3MKJRnVFTyfRIdng=="],
"@opentui/core-win32-x64": ["@opentui/core-win32-x64@0.3.1", "", { "os": "win32", "cpu": "x64" }, "sha512-eVkKMYirYgpn92lI0YT/GKru4J+UiXjzwyzNRFX+P59OHXvL3GFdqJMcJmX4/zvyjg4c8HDnU79YLnyG+TlXLw=="],
"@opentui/keymap": ["@opentui/keymap@0.3.0", "", { "dependencies": { "@opentui/core": "0.3.0" }, "peerDependencies": { "@opentui/react": "0.3.0", "@opentui/solid": "0.3.0", "react": ">=19.2.0", "solid-js": "1.9.12" }, "optionalPeers": ["@opentui/react", "@opentui/solid", "react", "solid-js"] }, "sha512-lJN57DanKujy3u0IhfSMCShvXIobRjhprdkrdM3brQoX6wxk7gTFE8fTCCz9z1nINkXNsKHQ6grZO1dsT/0mzA=="],
"@opentui/keymap": ["@opentui/keymap@0.3.1", "", { "dependencies": { "@opentui/core": "0.3.1" }, "peerDependencies": { "@opentui/react": "0.3.1", "@opentui/solid": "0.3.1", "react": ">=19.2.0", "solid-js": "1.9.12" }, "optionalPeers": ["@opentui/react", "@opentui/solid", "react", "solid-js"] }, "sha512-BTj+ggsarO2uyvd6CWzvgfsekA8c4aEclbAPKPZGVjBI3Fo5+KAHUrXvteFO5qpGMANfEJTtVHoRu5cic1Nlaw=="],
"@opentui/solid": ["@opentui/solid@0.3.0", "", { "dependencies": { "@babel/core": "7.28.0", "@babel/preset-typescript": "7.27.1", "@opentui/core": "0.3.0", "babel-plugin-module-resolver": "5.0.2", "babel-preset-solid": "1.9.12", "entities": "7.0.1", "s-js": "^0.4.9" }, "peerDependencies": { "solid-js": "1.9.12" } }, "sha512-AUtNzvgkdW81Ftl0sahAy3tY1LIPSMzBw3APBC8jiDAzzPv4kYVdyWXryTxLbU2q+Pgtr57VwKwHgc5wsNrd2w=="],
"@opentui/solid": ["@opentui/solid@0.3.1", "", { "dependencies": { "@babel/core": "7.28.0", "@babel/preset-typescript": "7.27.1", "@opentui/core": "0.3.1", "babel-plugin-module-resolver": "5.0.2", "babel-preset-solid": "1.9.12", "entities": "7.0.1", "s-js": "^0.4.9" }, "peerDependencies": { "solid-js": "1.9.12" } }, "sha512-2R6wEijfMub9COTBCm8IKVj2y7+Sc4fZZjJawxk8sE6+++mzeUaokKNJTlYhZXpMju4LKMv6j9CjWkG8JYfbcg=="],
"@oslojs/asn1": ["@oslojs/asn1@1.0.0", "", { "dependencies": { "@oslojs/binary": "1.0.0" } }, "sha512-zw/wn0sj0j0QKbIXfIlnEcTviaCzYOY3V5rAyjR6YtOByFtJiT574+8p9Wlach0lZH9fddD4yb9laEAIl4vXQA=="],
+3 -3
View File
@@ -38,9 +38,9 @@
"@types/cross-spawn": "6.0.6",
"@octokit/rest": "22.0.0",
"@hono/zod-validator": "0.4.2",
"@opentui/core": "0.3.0",
"@opentui/keymap": "0.3.0",
"@opentui/solid": "0.3.0",
"@opentui/core": "0.3.1",
"@opentui/keymap": "0.3.1",
"@opentui/solid": "0.3.1",
"ulid": "3.0.1",
"@kobalte/core": "0.13.11",
"@types/luxon": "3.7.1",
+1 -1
View File
@@ -221,7 +221,7 @@ export const RunCommand = effectCmd({
.option("replay", {
type: "boolean",
default: false,
describe: "replay visible session history on interactive resume",
describe: "replay interactive session history on resume and after resize",
})
.option("replay-limit", {
type: "number",
+32 -8
View File
@@ -171,6 +171,7 @@ export class RunFooter implements FooterApi {
private queue: StreamCommit[] = []
private pending = false
private flushing: Promise<void> = Promise.resolve()
private flushError: unknown
// Fixed portion of footer height above the textarea.
private base: number
private rows = TEXTAREA_MIN_ROWS
@@ -204,6 +205,15 @@ export class RunFooter implements FooterApi {
private requestExitHandler: (() => boolean) | undefined
private scrollback: RunScrollbackStream
private createScrollback(wrote: boolean): RunScrollbackStream {
return new RunScrollbackStream(this.renderer, this.options.theme, {
diffStyle: this.options.diffStyle,
wrote,
sessionID: this.options.sessionID,
treeSitterClient: this.options.treeSitterClient,
})
}
constructor(
private renderer: CliRenderer,
private options: RunFooterOptions,
@@ -257,12 +267,7 @@ export class RunFooter implements FooterApi {
this.queuedPrompts = queuedPrompts
this.setQueuedPrompts = setQueuedPrompts
this.base = Math.max(1, renderer.footerHeight - TEXTAREA_MIN_ROWS)
this.scrollback = new RunScrollbackStream(renderer, options.theme, {
diffStyle: options.diffStyle,
wrote: options.wrote,
sessionID: options.sessionID,
treeSitterClient: options.treeSitterClient,
})
this.scrollback = this.createScrollback(options.wrote ?? false)
this.renderer.on(CliRenderEvents.DESTROY, this.handleDestroy)
@@ -465,7 +470,9 @@ export class RunFooter implements FooterApi {
},
),
)
.catch(() => {})
.catch((error) => {
this.flushError = error
})
}
private present(view: FooterView): void {
@@ -523,6 +530,12 @@ export class RunFooter implements FooterApi {
}
return this.flushing.then(async () => {
if (this.flushError !== undefined) {
const error = this.flushError
this.flushError = undefined
throw error
}
if (this.isGone) {
return
}
@@ -535,6 +548,15 @@ export class RunFooter implements FooterApi {
})
}
public resetForReplay(wrote: boolean): void {
if (this.isGone) {
return
}
this.scrollback.destroy()
this.scrollback = this.createScrollback(wrote)
}
public close(): void {
if (this.closed) {
return
@@ -936,6 +958,8 @@ export class RunFooter implements FooterApi {
},
),
)
.catch(() => {})
.catch((error) => {
this.flushError = error
})
}
}
@@ -8,7 +8,7 @@
//
// Also wires SIGINT so Ctrl-c clears a live prompt draft first, then falls
// back to the usual two-press exit sequence through RunFooter.requestExit().
import { createCliRenderer, type CliRenderer, type ScrollbackWriter } from "@opentui/core"
import { CliRenderEvents, createCliRenderer, type CliRenderer, type ScrollbackWriter } from "@opentui/core"
import { createDefaultOpenTuiKeymap } from "@opentui/keymap/opentui"
import { Session as SessionApi } from "@/session/session"
import { registerOpencodeKeymap } from "@/cli/cmd/tui/keymap"
@@ -75,6 +75,8 @@ export type LifecycleInput = {
export type Lifecycle = {
footer: FooterApi
onResize(fn: () => void): () => void
resetForReplay(input: { sessionTitle?: string; sessionID?: string; history: RunPrompt[] }): Promise<void>
close(input: { showExit: boolean; sessionTitle?: string; sessionID?: string; history?: RunPrompt[] }): Promise<void>
}
@@ -307,6 +309,46 @@ export async function createRuntimeLifecycle(input: LifecycleInput): Promise<Lif
return {
footer,
onResize(fn) {
let width = renderer.terminalWidth
let height = renderer.terminalHeight
const resize = () => {
if (width === renderer.terminalWidth && height === renderer.terminalHeight) {
return
}
width = renderer.terminalWidth
height = renderer.terminalHeight
fn()
}
renderer.on(CliRenderEvents.RESIZE, resize)
return () => renderer.off(CliRenderEvents.RESIZE, resize)
},
async resetForReplay(next) {
if (closed || renderer.isDestroyed || footer.isClosed) {
throw new Error("runtime closed")
}
await footer.idle()
if (closed || renderer.isDestroyed || footer.isClosed) {
throw new Error("runtime closed")
}
footer.resetForReplay(true)
renderer.resetSplitFooterForReplay({ clearSavedLines: true })
const splash = splashInfo(next.sessionTitle ?? input.sessionTitle, next.history)
renderer.writeToScrollback(
entrySplash({
...splashMeta({
title: splash.title,
session_id: next.sessionID ?? input.getSessionID?.() ?? input.sessionID,
}),
theme: theme.splash,
showSession: splash.showSession,
}),
)
renderer.requestRender()
},
close,
}
} catch (error) {
@@ -162,7 +162,14 @@ export async function runPromptQueue(input: QueueInput): Promise<void> {
continue
}
state.active = prompt
const sent =
prompt.mode === "shell"
? prompt
: {
...prompt,
messageID: prompt.messageID ?? queued?.messageID ?? MessageID.ascending(),
}
state.active = sent
emit(
{
@@ -185,18 +192,24 @@ export async function runPromptQueue(input: QueueInput): Promise<void> {
break
}
if (prompt.mode !== "shell") {
const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const
if (sent.mode !== "shell") {
const commit = {
kind: "user",
text: sent.text,
phase: "start",
source: "system",
messageID: sent.messageID,
} as const
input.trace?.write("ui.commit", commit)
input.footer.append(commit)
}
input.onSend?.(prompt)
input.onSend?.(sent)
if (state.closed) {
break
}
const task = input.run(prompt, ctrl.signal).then(
const task = input.run(sent, ctrl.signal).then(
() => ({ type: "done" as const }),
(error) => ({ type: "error" as const, error }),
)
+77 -4
View File
@@ -14,13 +14,14 @@
// 4. runs the prompt queue until the footer closes.
import { createOpencodeClient } from "@opencode-ai/sdk/v2"
import { Flag } from "@opencode-ai/core/flag/flag"
import { MessageID } from "@/session/schema"
import { createRunDemo } from "./demo"
import { resolveModelInfo, resolveRunTuiConfig, resolveSessionInfo } from "./runtime.boot"
import { createRuntimeLifecycle } from "./runtime.lifecycle"
import { recordRunSpanError, setRunSpanAttributes, withRunSpan } from "./otel"
import { trace } from "./trace"
import { cycleVariant, formatModelLabel, resolveSavedVariant, resolveVariant, saveVariant } from "./variant.shared"
import type { RunInput, RunPrompt, RunProvider } from "./types"
import type { LocalReplayAnchor, LocalReplayRow, RunInput, RunPrompt, RunProvider, StreamCommit } from "./types"
/** @internal Exported for testing */
export { pickVariant, resolveVariant } from "./variant.shared"
@@ -114,6 +115,7 @@ type RuntimeState = {
activeVariant: string | undefined
sessionID: string
history: RunPrompt[]
localRows: LocalReplayRow[]
sessionTitle?: string
agent: string | undefined
switching?: Promise<void>
@@ -139,6 +141,9 @@ function variantsFor(providers: RunProvider[], model: RunInput["model"]) {
return Object.keys(providers.find((item) => item.id === model.providerID)?.models?.[model.modelID]?.variants ?? {})
}
const REPLAY_RESIZE_DELAY = 250
const LOCAL_REPLAY_ROW_LIMIT = 100
async function resolveExitTitle(
ctx: BootContext,
input: RunRuntimeInput,
@@ -196,6 +201,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
activeVariant: resolveVariant(ctx.variant, session.variant, savedVariant, []),
sessionID: ctx.sessionID,
history: [...session.history],
localRows: [],
sessionTitle: ctx.sessionTitle,
agent: ctx.agent,
}
@@ -374,6 +380,9 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
},
})
const footer = shell.footer
const rememberLocal = (commit: StreamCommit, after?: LocalReplayAnchor) => {
state.localRows = [...state.localRows, { commit, after }].slice(-LOCAL_REPLAY_ROW_LIMIT)
}
const loadCatalog = async (): Promise<void> => {
if (footer.isClosed) {
@@ -510,6 +519,36 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
return next
}
let replayResizeTimer: ReturnType<typeof setTimeout> | undefined
const offResize = input.replay
? shell.onResize(() => {
if (replayResizeTimer) {
clearTimeout(replayResizeTimer)
}
replayResizeTimer = setTimeout(() => {
replayResizeTimer = undefined
if (footer.isClosed || !state.stream) {
return
}
void state.stream
.then((item) =>
item.handle.replayOnResize({
localRows: () => state.localRows,
reset: () =>
shell.resetForReplay({
sessionTitle: state.sessionTitle,
sessionID: state.sessionID,
history: state.history,
}),
}),
)
.catch(() => {})
}, REPLAY_RESIZE_DELAY)
})
: () => {}
const runQueue = async () => {
let includeFiles = true
if (state.demo) {
@@ -525,6 +564,15 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
onSend: (prompt) => {
state.shown = true
state.history.push(prompt)
if (prompt.mode !== "shell") {
rememberLocal({
kind: "user",
text: prompt.text,
phase: "start",
source: "system",
messageID: prompt.messageID,
})
}
},
onNewSession: createSession
? async () => {
@@ -545,6 +593,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
state.sessionTitle = created.sessionTitle
state.agent = created.agent ?? state.agent
state.history = []
state.localRows = []
includeFiles = true
state.demo = input.demo
? createRunDemo({
@@ -598,12 +647,15 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
status: "failed to start new session",
},
})
footer.append({
const commit = {
kind: "error",
text: error instanceof Error ? error.message : String(error),
phase: "start",
source: "system",
})
messageID: MessageID.ascending(),
} as const
rememberLocal(commit)
footer.append(commit)
}
}
: undefined,
@@ -614,6 +666,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
await state.switching?.catch(() => {})
let outputAnchor: LocalReplayAnchor | undefined
return withRunSpan(
"RunInteractive.turn",
{
@@ -644,8 +697,16 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
prompt,
files: input.files,
includeFiles,
onVisibleOutput: (anchor) => {
outputAnchor = anchor
},
signal,
})
if (prompt.messageID) {
state.localRows = state.localRows.filter(
(row) => row.commit.kind !== "user" || row.commit.messageID !== prompt.messageID,
)
}
includeFiles = false
} catch (error) {
if (signal.aborted || footer.isClosed) {
@@ -656,7 +717,15 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
const text =
(await state.stream?.then((item) => item.mod).catch(() => undefined))?.formatUnknownError(error) ??
(error instanceof Error ? error.message : String(error))
footer.append({ kind: "error", text, phase: "start", source: "system" })
const commit = {
kind: "error",
text,
phase: "start",
source: "system",
messageID: prompt.messageID,
} as const
rememberLocal(commit, outputAnchor)
footer.append(commit)
}
},
)
@@ -683,6 +752,10 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
try {
await runQueue()
} finally {
if (replayResizeTimer) {
clearTimeout(replayResizeTimer)
}
offResize()
await state.stream?.then((item) => item.handle.close()).catch(() => {})
}
} finally {
@@ -60,6 +60,7 @@ type SessionCommit = StreamCommit
// - part: part ID → "assistant" | "reasoning" (text parts only)
// - text: part ID → full accumulated text so far
// - sent: part ID → byte offset of last flushed text (for incremental output)
// - visible: part ID → rendered text for an active part after display transforms
// - end: part IDs whose time.end has arrived (part is finished)
// - shell: shell call ID → chosen transcript source for direct shell calls
// - echo: message ID → bash outputs to strip from the next assistant chunk
@@ -82,6 +83,7 @@ export type SessionData = {
part: Map<string, PartKind>
text: Map<string, string>
sent: Map<string, number>
visible: Map<string, string>
end: Set<string>
echo: Map<string, Set<string>>
}
@@ -119,6 +121,7 @@ export function createSessionData(
part: new Map(),
text: new Map(),
sent: new Map(),
visible: new Map(),
end: new Set(),
echo: new Map(),
}
@@ -538,6 +541,7 @@ function flushPart(data: SessionData, commits: SessionCommit[], partID: string,
if (chunk) {
data.sent.set(partID, text.length)
data.visible.set(partID, (data.visible.get(partID) ?? "") + chunk)
commits.push({
kind,
text: chunk,
@@ -567,6 +571,7 @@ function drop(data: SessionData, partID: string) {
data.part.delete(partID)
data.text.delete(partID)
data.sent.delete(partID)
data.visible.delete(partID)
data.msg.delete(partID)
data.end.delete(partID)
}
@@ -1,7 +1,7 @@
import type { Event, PermissionRequest, QuestionRequest } from "@opencode-ai/sdk/v2"
import { bootstrapSessionData, createSessionData, reduceSessionData, type SessionData } from "./session-data"
import { messagePrompt, type SessionMessages } from "./session.shared"
import type { FooterPatch, StreamCommit } from "./types"
import type { FooterPatch, LocalReplayRow, StreamCommit } from "./types"
type ReplayInput = {
messages: SessionMessages
@@ -186,3 +186,112 @@ export function replaySession(input: ReplayInput): SessionReplay {
patch: replayPatch(data, patch),
}
}
export function replayLocalRows(messages: SessionMessages, commits: StreamCommit[], rows: LocalReplayRow[]): StreamCommit[] {
const persisted = new Set(messages.map((message) => message.info.id))
return rows.reduce((out, local) => {
const row = local.commit
if (row.kind === "user" && row.messageID && persisted.has(row.messageID)) {
return out
}
if (!row.messageID) {
return [...out, row]
}
const exact = local.after
? out.findIndex(
(commit) =>
commit.kind === local.after?.kind &&
commit.text === local.after.text &&
commit.phase === local.after.phase &&
commit.toolState === local.after.toolState &&
(local.after.partID ? commit.partID === local.after.partID : commit.messageID === local.after.messageID),
)
: -1
const anchored =
exact !== -1
? exact
: local.after
? out.findLastIndex((commit) =>
local.after?.partID
? commit.partID === local.after.partID
: commit.kind === local.after?.kind && commit.messageID === local.after.messageID,
)
: -1
if (anchored !== -1) {
const commit = out[anchored]
const visible = local.after?.visible
if (commit && visible && commit.text.startsWith(visible) && commit.text.length > visible.length) {
return [
...out.slice(0, anchored),
{ ...commit, text: visible },
row,
{ ...commit, text: commit.text.slice(visible.length) },
...out.slice(anchored + 1),
]
}
return [...out.slice(0, anchored + 1), row, ...out.slice(anchored + 1)]
}
const after = out.findIndex((commit) => commit.kind === "user" && commit.messageID === row.messageID)
if (after !== -1) {
return [...out.slice(0, after + 1), row, ...out.slice(after + 1)]
}
const before = out.findIndex((commit) => commit.messageID && row.messageID! < commit.messageID)
if (before === -1) {
return [...out, row]
}
return [...out.slice(0, before), row, ...out.slice(before)]
}, commits)
}
export function replayActiveText(data: SessionData, current: SessionData): StreamCommit[] {
return [...current.part.entries()].flatMap(([partID, kind]) => {
if (kind === "user" || current.end.has(partID) || data.ids.has(partID)) {
return []
}
const text = current.text.get(partID) ?? ""
const existing = data.text.get(partID) ?? ""
const sent = current.sent.get(partID) ?? 0
const existingSent = data.sent.get(partID) ?? 0
const visible = current.visible.get(partID) ?? ""
const existingVisible = data.visible.get(partID) ?? ""
if (!text.startsWith(existing) || existingSent > sent || !visible.startsWith(existingVisible)) {
return []
}
data.part.set(partID, kind)
data.text.set(partID, text)
data.sent.set(partID, sent)
data.visible.set(partID, visible)
const messageID = current.msg.get(partID)
if (messageID) {
data.msg.set(partID, messageID)
const role = current.role.get(messageID)
if (role) {
data.role.set(messageID, role)
}
}
const chunk = visible.slice(existingVisible.length)
if (!chunk) {
return []
}
return [
{
kind,
text: chunk,
phase: "progress",
source: kind,
...(messageID ? { messageID } : {}),
partID,
},
] satisfies StreamCommit[]
})
}
@@ -27,7 +27,7 @@ import {
reduceSessionData,
type SessionData,
} from "./session-data"
import { replaySession } from "./session-replay"
import { replayActiveText, replayLocalRows, replaySession } from "./session-replay"
import {
bootstrapSubagentCalls,
bootstrapSubagentData,
@@ -51,6 +51,8 @@ import type {
FooterSubagentState,
FooterSubagentTab,
FooterView,
LocalReplayAnchor,
LocalReplayRow,
RunFilePart,
RunInput,
RunPrompt,
@@ -81,6 +83,7 @@ type Wait = {
tick: number
armed: boolean
live: boolean
onVisibleOutput?: (anchor: LocalReplayAnchor) => void
done: Deferred.Deferred<void, unknown>
}
@@ -91,15 +94,22 @@ export type SessionTurnInput = {
prompt: RunPrompt
files: RunFilePart[]
includeFiles: boolean
onVisibleOutput?: (anchor: LocalReplayAnchor) => void
signal?: AbortSignal
}
export type SessionTransport = {
runPromptTurn(input: SessionTurnInput): Promise<void>
selectSubagent(sessionID: string | undefined): void
replayOnResize(input: SessionResizeReplayInput): Promise<boolean>
close(): Promise<void>
}
export type SessionResizeReplayInput = {
localRows: () => LocalReplayRow[]
reset: () => Promise<void>
}
type State = {
data: SessionData
subagent: SubagentData
@@ -115,6 +125,7 @@ type State = {
type TransportService = {
readonly runPromptTurn: (input: SessionTurnInput) => Effect.Effect<void, unknown>
readonly selectSubagent: (sessionID: string | undefined) => Effect.Effect<void>
readonly replayOnResize: (input: SessionResizeReplayInput) => Effect.Effect<boolean>
readonly close: () => Effect.Effect<void>
}
@@ -440,6 +451,9 @@ function createLayer(input: StreamInput) {
blockers: new Map(),
}
let booting = true
let replaying = false
let replayDisabled = false
let replayPending: SessionResizeReplayInput | undefined
const buffered: Event[] = []
const replayedParts = new Set<string>()
const recovering = new Set<string>()
@@ -594,6 +608,38 @@ function createLayer(input: StreamInput) {
Effect.orElseSucceed(() => []),
)
const replayMessages = () =>
Effect.promise(() =>
input.sdk.session.messages({
sessionID: input.sessionID,
...(input.replayLimit === undefined
? {}
: { limit: Math.max(input.replayLimit, SUBAGENT_BOOTSTRAP_LIMIT) }),
}),
).pipe(Effect.flatMap((item) => (item.error ? Effect.fail(item.error) : Effect.succeed(item.data ?? []))))
const replayRequests = () =>
Effect.all(
[
Effect.promise(() => input.sdk.permission.list()).pipe(
Effect.flatMap((item) => (item.error ? Effect.fail(item.error) : Effect.succeed(item.data ?? []))),
),
Effect.promise(() => input.sdk.question.list()).pipe(
Effect.flatMap((item) => (item.error ? Effect.fail(item.error) : Effect.succeed(item.data ?? []))),
),
],
{ concurrency: "unbounded" },
)
const markReplayedParts = (data: SessionData) => {
replayedParts.clear()
for (const [partID] of data.text) {
if (data.part.has(partID)) {
replayedParts.add(partID)
}
}
}
const bootstrapSubagentHistory = Effect.fn("RunStreamTransport.bootstrapSubagentHistory")(function* (
sessions: string[],
) {
@@ -681,7 +727,6 @@ function createLayer(input: StreamInput) {
})
: history
replayedParts.clear()
if (history) {
state.data = history.data
}
@@ -695,14 +740,8 @@ function createLayer(input: StreamInput) {
})
}
if (replay) {
for (const [partID] of replay.data.text) {
if (!replay.data.part.has(partID)) {
continue
}
replayedParts.add(partID)
}
if (history) {
markReplayedParts(history.data)
}
bootstrapSubagentData({
@@ -862,6 +901,20 @@ function createLayer(input: StreamInput) {
limits: input.limits(),
})
state.data = next.data
const visible = next.commits.at(-1)
if (visible) {
state.wait?.onVisibleOutput?.({
kind: visible.kind,
text: visible.text,
phase: visible.phase,
messageID: visible.messageID,
partID: visible.partID,
toolState: visible.toolState,
...(visible.partID && state.data.visible.has(visible.partID)
? { visible: state.data.visible.get(visible.partID) }
: {}),
})
}
if (
event.type === "message.part.updated" &&
@@ -910,15 +963,163 @@ function createLayer(input: StreamInput) {
yield* applyEvent(event)
}
if (!changed) {
const arrived = buffered.splice(0)
if (!changed && arrived.length === 0) {
buffered.push(...next)
return
}
pending = next
pending = [...next, ...arrived]
}
})
const replayOnResize: (next: SessionResizeReplayInput) => Effect.Effect<boolean> = Effect.fn(
"RunStreamTransport.replayOnResize",
)(function* (next: SessionResizeReplayInput) {
if (!input.replay || replayDisabled || booting || closed || input.footer.isClosed) {
return false
}
if (replaying) {
replayPending = next
return false
}
const finish: () => Effect.Effect<void> = Effect.fnUntraced(function* () {
yield* drainBuffered()
const pending = replayPending
replayPending = undefined
if (!pending || replayDisabled || closed || input.footer.isClosed) {
replaying = false
return
}
replaying = false
yield* replayOnResize(pending).pipe(Effect.asVoid)
})
replayedParts.clear()
replaying = true
input.trace?.write("replay.resize.start", {
sessionID: input.sessionID,
})
const source = yield* Effect.all([replayMessages(), replayRequests()], { concurrency: "unbounded" }).pipe(
Effect.exit,
)
if (Exit.isFailure(source)) {
input.trace?.write("replay.resize.abort", {
sessionID: input.sessionID,
phase: "snapshot",
})
yield* finish()
return false
}
const [messagesList, [permissions, questions]] = source.value
const sessionPermissions = permissions.filter((item) => item.sessionID === input.sessionID)
const sessionQuestions = questions.filter((item) => item.sessionID === input.sessionID)
const snapshot = yield* Effect.try({
try: () => {
const history = replaySession({
messages: messagesList,
permissions: sessionPermissions,
questions: sessionQuestions,
thinking: input.thinking,
limits: input.limits(),
})
const activeCommits = replayActiveText(history.data, state.data)
return {
history,
activeCommits,
patch:
history.data.part.size > 0 || history.data.tools.size > 0
? { ...history.patch, phase: "running" as const }
: history.patch,
visible:
input.replayLimit !== undefined && messagesList.length > input.replayLimit
? replaySession({
messages: messagesList.slice(-input.replayLimit),
permissions: sessionPermissions,
questions: sessionQuestions,
thinking: input.thinking,
limits: input.limits(),
})
: history,
}
},
catch: (error) => error,
}).pipe(Effect.exit)
if (Exit.isFailure(snapshot)) {
input.trace?.write("replay.resize.abort", {
sessionID: input.sessionID,
phase: "snapshot",
})
yield* finish()
return false
}
const idle = yield* Effect.promise(() => input.footer.idle()).pipe(Effect.exit)
if (Exit.isFailure(idle) || closed || input.footer.isClosed) {
yield* finish()
return false
}
const reset = yield* Effect.promise(() => next.reset()).pipe(Effect.exit)
if (Exit.isFailure(reset)) {
replayDisabled = true
input.trace?.write("replay.resize.disable", {
sessionID: input.sessionID,
phase: "reset",
})
input.footer.append({
kind: "error",
text: "resize replay failed; disabled for this session",
phase: "start",
source: "system",
})
yield* finish()
return false
}
state.data = snapshot.value.history.data
for (const request of [...state.data.permissions, ...state.data.questions]) {
seedBlocker(request.id)
}
for (const commit of replayLocalRows(
messagesList,
[...snapshot.value.visible.commits, ...snapshot.value.activeCommits],
next.localRows(),
)) {
input.trace?.write("ui.commit", commit)
input.footer.append(commit)
}
syncFooter([], snapshot.value.patch, currentSubagentState())
const rebuilt = yield* Effect.promise(() => input.footer.idle()).pipe(Effect.exit)
if (Exit.isFailure(rebuilt)) {
replayDisabled = true
input.trace?.write("replay.resize.disable", {
sessionID: input.sessionID,
phase: "rebuild",
})
input.footer.append({
kind: "error",
text: "resize replay failed; disabled for this session",
phase: "start",
source: "system",
})
yield* finish()
return false
}
input.trace?.write("replay.resize.complete", {
sessionID: input.sessionID,
})
yield* finish()
return true
})
const watch = Effect.fn("RunStreamTransport.watch")(() =>
Stream.fromAsyncIterable(events.stream, (error) =>
error instanceof Error ? error : new Error(String(error)),
@@ -943,7 +1144,7 @@ function createLayer(input: StreamInput) {
}
const sessionID = sid(event)
if (booting) {
if (booting || replaying) {
if (sessionID) {
input.trace?.write("recv.event", event)
buffered.push(event)
@@ -1005,6 +1206,7 @@ function createLayer(input: StreamInput) {
tick: state.tick,
armed: false,
live: false,
onVisibleOutput: next.onVisibleOutput,
done: yield* Deferred.make<void, unknown>(),
}
state.wait = item
@@ -1020,6 +1222,7 @@ function createLayer(input: StreamInput) {
const req = {
sessionID: input.sessionID,
messageID: next.prompt.messageID,
agent: next.agent,
model: next.model,
variant: next.variant,
@@ -1081,6 +1284,7 @@ function createLayer(input: StreamInput) {
input.sdk.session.command(
{
sessionID: input.sessionID,
messageID: next.prompt.messageID,
agent: next.agent,
model: next.model ? `${next.model.providerID}/${next.model.modelID}` : undefined,
variant: next.variant,
@@ -1231,6 +1435,7 @@ function createLayer(input: StreamInput) {
return Service.of({
runPromptTurn,
selectSubagent,
replayOnResize,
close,
})
}),
@@ -1254,6 +1459,7 @@ export async function createSessionTransport(input: StreamInput): Promise<Sessio
return {
runPromptTurn: (next) => runtime.runPromise((svc) => svc.runPromptTurn(next)),
selectSubagent: (sessionID) => runtime.runSync((svc) => svc.selectSubagent(sessionID)),
replayOnResize: (next) => runtime.runPromise((svc) => svc.replayOnResize(next)),
close: () => runtime.runPromise((svc) => svc.close()),
}
}
@@ -309,6 +309,21 @@ export type StreamCommit = {
}
}
export type LocalReplayAnchor = {
kind: EntryKind
text: string
phase: StreamPhase
messageID?: string
partID?: string
toolState?: StreamToolState
visible?: string
}
export type LocalReplayRow = {
commit: StreamCommit
after?: LocalReplayAnchor
}
// The public contract between the stream transport / prompt queue and
// the footer. RunFooter implements this. The transport and queue never
// touch the renderer directly -- they go through this interface.
@@ -103,7 +103,7 @@ Options:
--variant model variant (provider-specific reasoning effort, e.g., high,
max, minimal) [string]
--thinking show thinking blocks [boolean]
--replay replay visible session history on interactive resume
--replay replay interactive session history on resume and after resize
[boolean] [default: false]
--replay-limit cap visible interactive replay to the newest N messages
[number]
@@ -143,6 +143,7 @@ describe("run runtime queue", () => {
text: "hello",
phase: "start",
source: "system",
messageID: expect.any(String),
},
])
})
@@ -225,6 +226,7 @@ describe("run runtime queue", () => {
text: " hello ",
phase: "start",
source: "system",
messageID: expect.any(String),
},
])
})
@@ -260,6 +262,7 @@ describe("run runtime queue", () => {
text: "/fmt bash",
phase: "start",
source: "system",
messageID: expect.any(String),
},
])
ui.api.close()
@@ -321,7 +324,7 @@ describe("run runtime queue", () => {
await Promise.resolve()
expect(turns.map((item) => item.text)).toEqual(["one"])
expect(turns[0]?.messageID).toBeUndefined()
expect(turns[0]?.messageID).toEqual(expect.any(String))
expect(ui.commits.map((item) => item.text)).toEqual(["one"])
const first = ui.events.find((item) => item.type === "queued.prompts")
const event = ui.events.findLast((item) => item.type === "queued.prompts")
@@ -1,5 +1,5 @@
import { describe, expect, test } from "bun:test"
import { replaySession } from "@/cli/cmd/run/session-replay"
import { replayLocalRows, replaySession } from "@/cli/cmd/run/session-replay"
import type { SessionMessages } from "@/cli/cmd/run/session.shared"
function userMessage(id: string, text: string): SessionMessages[number] {
@@ -156,4 +156,286 @@ describe("run session replay", () => {
}),
)
})
test("merges failed local rows ahead of later persisted prompts", () => {
const persisted = {
kind: "user",
text: "successful",
phase: "start",
source: "system",
messageID: "msg-user-2",
} as const
const failed = {
kind: "user",
text: "failed",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const error = {
kind: "error",
text: "network unavailable",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
expect(replayLocalRows([userMessage("msg-user-2", "successful")], [persisted], [{ commit: failed }, { commit: error }])).toEqual([
failed,
error,
persisted,
])
})
test("retains local errors but not duplicate local prompts once a prompt persists", () => {
const persisted = {
kind: "user",
text: "failed after persistence",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const error = {
kind: "error",
text: "connection closed",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
expect(replayLocalRows([userMessage("msg-user-1", "failed after persistence")], [persisted], [{ commit: persisted }, { commit: error }])).toEqual([
persisted,
error,
])
})
test("keeps a local turn failure below assistant output already visible for that turn", () => {
const first = {
kind: "user",
text: "start",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const answer = {
kind: "assistant",
text: "partial answer",
phase: "progress",
source: "assistant",
messageID: "msg-assistant-1",
} as const
const error = {
kind: "error",
text: "stream failed",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const second = {
kind: "user",
text: "retry",
phase: "start",
source: "system",
messageID: "msg-user-2",
} as const
expect(
replayLocalRows(
[userMessage("msg-user-1", "start"), userMessage("msg-user-2", "retry")],
[first, answer, second],
[
{
commit: error,
after: { kind: "assistant", text: "partial answer", phase: "progress", messageID: "msg-assistant-1" },
},
],
),
).toEqual([first, answer, error, second])
})
test("keeps a local failure above assistant output received after the failure", () => {
const first = {
kind: "user",
text: "start",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const error = {
kind: "error",
text: "request failed",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const late = {
kind: "assistant",
text: "late answer",
phase: "progress",
source: "assistant",
messageID: "msg-assistant-1",
} as const
expect(replayLocalRows([userMessage("msg-user-1", "start")], [first, late], [{ commit: error }])).toEqual([
first,
error,
late,
])
})
test("inserts a local failure between persisted output chunks spanning that failure", () => {
const first = {
kind: "user",
text: "start",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const complete = {
kind: "assistant",
text: "before after",
phase: "progress",
source: "assistant",
messageID: "msg-assistant-1",
partID: "part-1",
} as const
const error = {
kind: "error",
text: "stream failed",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
expect(
replayLocalRows([userMessage("msg-user-1", "start")], [first, complete], [
{
commit: error,
after: {
kind: "assistant",
text: "before ",
phase: "progress",
messageID: "msg-assistant-1",
partID: "part-1",
visible: "before ",
},
},
]),
).toEqual([first, { ...complete, text: "before " }, error, { ...complete, text: "after" }])
})
test("places an unpersisted failed prompt before live output from that turn", () => {
const prompt = {
kind: "user",
text: "start",
phase: "start",
source: "system",
messageID: "msg-1",
} as const
const answer = {
kind: "assistant",
text: "partial answer",
phase: "progress",
source: "assistant",
messageID: "msg-2",
} as const
const error = {
kind: "error",
text: "stream failed",
phase: "start",
source: "system",
messageID: "msg-1",
} as const
expect(
replayLocalRows([], [answer], [
{ commit: prompt },
{
commit: error,
after: { kind: "assistant", text: "partial answer", phase: "progress", messageID: "msg-2" },
},
]),
).toEqual([prompt, answer, error])
})
test("anchors a failure after the visible start of a tool that later completes", () => {
const prompt = {
kind: "user",
text: "run ls",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const running = {
kind: "tool",
text: "running bash",
phase: "start",
source: "tool",
messageID: "msg-assistant-1",
partID: "part-tool-1",
toolState: "running",
} as const
const completed = {
kind: "tool",
text: "file.txt",
phase: "final",
source: "tool",
messageID: "msg-assistant-1",
partID: "part-tool-1",
toolState: "completed",
} as const
const error = {
kind: "error",
text: "connection lost",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
expect(
replayLocalRows([userMessage("msg-user-1", "run ls")], [prompt, running, completed], [
{
commit: error,
after: {
kind: "tool",
text: "running bash",
phase: "start",
messageID: "msg-assistant-1",
partID: "part-tool-1",
toolState: "running",
},
},
]),
).toEqual([prompt, running, error, completed])
})
test("retains an unpersisted local diagnostic before later persisted prompts", () => {
const first = {
kind: "user",
text: "before",
phase: "start",
source: "system",
messageID: "msg-user-1",
} as const
const error = {
kind: "error",
text: "failed to start new session",
phase: "start",
source: "system",
messageID: "msg-user-2",
} as const
const second = {
kind: "user",
text: "after",
phase: "start",
source: "system",
messageID: "msg-user-3",
} as const
expect(
replayLocalRows([userMessage("msg-user-1", "before"), userMessage("msg-user-3", "after")], [first, second], [
{ commit: error },
]),
).toEqual([first, error, second])
})
})
@@ -1,7 +1,7 @@
import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"
import { OpencodeClient, type GlobalEvent } from "@opencode-ai/sdk/v2"
import { createSessionTransport } from "@/cli/cmd/run/stream.transport"
import type { FooterApi, FooterEvent, RunFilePart, StreamCommit } from "@/cli/cmd/run/types"
import type { FooterApi, FooterEvent, LocalReplayRow, RunFilePart, StreamCommit } from "@/cli/cmd/run/types"
type EventStream = Awaited<ReturnType<OpencodeClient["event"]["subscribe"]>>["stream"]
type GlobalEventStream = Awaited<ReturnType<OpencodeClient["global"]["event"]>>["stream"]
@@ -11,6 +11,7 @@ type SessionChild = NonNullable<Awaited<ReturnType<OpencodeClient["session"]["ch
type SessionToolPart = Extract<SessionMessage["parts"][number], { type: "tool" }>
type SessionStatusMap = NonNullable<Awaited<ReturnType<OpencodeClient["session"]["status"]>>["data"]>
type TextPart = Extract<SessionMessage["parts"][number], { type: "text" }>
type ReasoningPart = Extract<SessionMessage["parts"][number], { type: "reasoning" }>
afterEach(() => {
mock.restore()
@@ -298,6 +299,29 @@ function textUpdated(part: TextPart): SdkEvent {
}
}
function reasoningPart(id: string, messageID: string, text: string): ReasoningPart {
return {
id,
sessionID: "session-1",
messageID,
type: "reasoning",
text,
time: { start: 1 },
}
}
function reasoningUpdated(part: ReasoningPart): SdkEvent {
return {
id: `evt-${part.id}-updated`,
type: "message.part.updated",
properties: {
sessionID: part.sessionID,
part,
time: 1,
},
}
}
function toolUpdated(part: SessionToolPart): SdkEvent {
return {
id: `evt-${part.id}-updated`,
@@ -721,6 +745,444 @@ describe("run stream transport", () => {
}
})
test("rebuilds session output on resize and continues live deltas from replayed state", async () => {
const src = eventFeed()
const ui = footer()
let calls = 0
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async () => {
calls += 1
if (calls === 1) {
return ok([])
}
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [textPart("text-1", "msg-1", "Hello")],
}),
])
},
}),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
const localRows: LocalReplayRow[] = [
{ commit: { kind: "user", text: "pending prompt", phase: "start", source: "system", messageID: "msg-pending" } },
]
const reset = mock(() => {
localRows.push({
commit: {
kind: "user",
text: "sent during reset",
phase: "start",
source: "system",
messageID: "msg-during-reset",
},
})
return Promise.resolve()
})
try {
expect(
await transport.replayOnResize({
localRows: () => localRows,
reset,
}),
).toBe(true)
expect(reset).toHaveBeenCalledTimes(1)
expect(ui.commits).toEqual(
expect.arrayContaining([
expect.objectContaining({ kind: "assistant", text: "Hello" }),
expect.objectContaining({ kind: "user", text: "sent during reset", messageID: "msg-during-reset" }),
]),
)
src.push(textUpdated(textPart("text-1", "msg-1", "Hello world")))
await waitFor(() => ui.commits.find((commit) => commit.kind === "assistant" && commit.text === " world"))
expect(ui.commits.filter((commit) => commit.kind === "assistant").map((commit) => commit.text)).toEqual([
"Hello",
" world",
])
} finally {
src.close()
await transport.close()
}
})
test("coalesces active resize requests into one trailing replay", async () => {
const src = eventFeed()
const ui = footer()
const firstReset = defer()
const resetA = mock(() => firstReset.promise)
const resetB = mock(() => Promise.resolve())
const resetC = mock(() => Promise.resolve())
const transport = await createSessionTransport({
sdk: sdk({ stream: src.stream }),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
try {
const active = transport.replayOnResize({ localRows: () => [], reset: resetA })
await waitFor(() => (resetA.mock.calls.length === 1 ? true : undefined))
expect(await transport.replayOnResize({ localRows: () => [], reset: resetB })).toBe(false)
expect(await transport.replayOnResize({ localRows: () => [], reset: resetC })).toBe(false)
expect(resetB).not.toHaveBeenCalled()
firstReset.resolve()
expect(await active).toBe(true)
expect(resetA).toHaveBeenCalledTimes(1)
expect(resetB).not.toHaveBeenCalled()
expect(resetC).toHaveBeenCalledTimes(1)
} finally {
src.close()
await transport.close()
}
})
test("keeps coalescing resize requests while buffered events drain", async () => {
const src = eventFeed()
const ui = footer()
const firstReset = defer()
const statusGate = defer()
const statusStarted = defer()
let blockStatus = false
const trace = mock((_type: string, _data?: unknown) => {})
const resetA = mock(() => firstReset.promise)
const resetB = mock(() => Promise.resolve())
const resetC = mock(() => Promise.resolve())
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
status: async () => {
if (blockStatus) {
statusStarted.resolve()
await statusGate.promise
}
return ok(statusMap(true))
},
}),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
trace: { write: trace },
})
const turn = transport.runPromptTurn({
agent: undefined,
model: undefined,
variant: undefined,
prompt: { text: "active", parts: [] },
files: [],
includeFiles: false,
})
try {
await waitFor(() => ui.events.find((event) => event.type === "turn.wait"))
const active = transport.replayOnResize({ localRows: () => [], reset: resetA })
await waitFor(() => (resetA.mock.calls.length === 1 ? true : undefined))
blockStatus = true
src.push(busy())
src.push(idle())
await waitFor(() => (trace.mock.calls.filter((call) => call[0] === "recv.event").length >= 2 ? true : undefined))
expect(await transport.replayOnResize({ localRows: () => [], reset: resetB })).toBe(false)
firstReset.resolve()
await Promise.race([
statusStarted.promise,
Bun.sleep(1_000).then(() => {
throw new Error("timed out waiting for buffered status drain")
}),
])
expect(await transport.replayOnResize({ localRows: () => [], reset: resetC })).toBe(false)
expect(resetC).not.toHaveBeenCalled()
blockStatus = false
statusGate.resolve()
expect(
await Promise.race([
active,
Bun.sleep(1_000).then(() => {
throw new Error("timed out waiting for trailing resize replay")
}),
]),
).toBe(true)
expect(resetB).not.toHaveBeenCalled()
expect(resetC).toHaveBeenCalledTimes(1)
} finally {
src.close()
await transport.close()
await turn
}
})
test("preserves assistant deltas not yet persisted when replaying during a live stream", async () => {
const src = eventFeed()
const ui = footer()
let calls = 0
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async () => {
calls += 1
if (calls === 1) {
return ok([])
}
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-live",
parts: [textPart("text-live", "msg-live", "")],
}),
])
},
}),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
try {
src.push(assistant("msg-live"))
src.push(textUpdated(textPart("text-live", "msg-live", "")))
src.push(textDelta("msg-live", "text-live", "Hello"))
await waitFor(() => ui.commits.find((commit) => commit.kind === "assistant" && commit.text === "Hello"))
ui.commits.length = 0
expect(await transport.replayOnResize({ localRows: () => [], reset: () => Promise.resolve() })).toBe(true)
src.push(textDelta("msg-live", "text-live", "Hello"))
src.push(
textUpdated({
...textPart("text-live", "msg-live", "HelloHello"),
time: { start: 1, end: 2 },
}),
)
await waitFor(() =>
ui.commits.filter((commit) => commit.kind === "assistant" && commit.text === "Hello").length === 2
? true
: undefined,
)
expect(
ui.commits.filter((commit) => commit.kind === "assistant" && commit.text).map((commit) => commit.text),
).toEqual(["Hello", "Hello"])
} finally {
src.close()
await transport.close()
}
})
test("preserves the display prefix for active reasoning restored during replay", async () => {
const src = eventFeed()
const ui = footer()
let calls = 0
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async () => {
calls += 1
if (calls === 1) {
return ok([])
}
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-thinking",
parts: [reasoningPart("thinking-1", "msg-thinking", "")],
}),
])
},
}),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
try {
src.push(assistant("msg-thinking"))
src.push(reasoningUpdated(reasoningPart("thinking-1", "msg-thinking", "")))
src.push(textDelta("msg-thinking", "thinking-1", "plan"))
await waitFor(() => ui.commits.find((commit) => commit.kind === "reasoning" && commit.text === "Thinking: plan"))
ui.commits.length = 0
expect(await transport.replayOnResize({ localRows: () => [], reset: () => Promise.resolve() })).toBe(true)
expect(ui.commits.filter((commit) => commit.kind === "reasoning").map((commit) => commit.text)).toEqual([
"Thinking: plan",
])
} finally {
src.close()
await transport.close()
}
})
test("does not overlay stale active text when persistence completes during replay", async () => {
const src = eventFeed()
const ui = footer()
let calls = 0
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async () => {
calls += 1
if (calls === 1) {
return ok([])
}
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-finished",
parts: [
{
...textPart("text-finished", "msg-finished", "Hello"),
time: { start: 1, end: 2 },
},
],
}),
])
},
}),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
try {
src.push(assistant("msg-finished"))
src.push(textUpdated(textPart("text-finished", "msg-finished", "")))
src.push(textDelta("msg-finished", "text-finished", "Hello"))
await waitFor(() => ui.commits.find((commit) => commit.kind === "assistant" && commit.text === "Hello"))
ui.commits.length = 0
expect(await transport.replayOnResize({ localRows: () => [], reset: () => Promise.resolve() })).toBe(true)
expect(
ui.commits.filter((commit) => commit.kind === "assistant" && commit.text).map((commit) => commit.text),
).toEqual(["Hello"])
} finally {
src.close()
await transport.close()
}
})
test("does not clear the terminal when resize replay snapshot fetch fails", async () => {
const src = eventFeed()
const ui = footer()
let calls = 0
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async () => {
calls += 1
if (calls === 1) {
return ok([])
}
throw new Error("snapshot failed")
},
}),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
const reset = mock(() => Promise.resolve())
try {
expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false)
expect(reset).not.toHaveBeenCalled()
expect(ui.commits).toEqual([])
} finally {
src.close()
await transport.close()
}
})
test("disables resize replay for the session after terminal reset fails", async () => {
const src = eventFeed()
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({ stream: src.stream }),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
const reset = mock(() => Promise.reject(new Error("clear failed")))
try {
expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false)
expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false)
expect(reset).toHaveBeenCalledTimes(1)
expect(ui.commits).toContainEqual({
kind: "error",
text: "resize replay failed; disabled for this session",
phase: "start",
source: "system",
})
} finally {
src.close()
await transport.close()
}
})
test("disables resize replay when rebuilding scrollback fails after terminal reset", async () => {
const src = eventFeed()
const ui = footer()
let cleared = false
const idle = ui.api.idle
ui.api.idle = () => (cleared ? Promise.reject(new Error("render failed")) : idle())
const transport = await createSessionTransport({
sdk: sdk({ stream: src.stream }),
sessionID: "session-1",
thinking: true,
replay: true,
limits: () => ({}),
footer: ui.api,
})
const reset = mock(() => {
cleared = true
return Promise.resolve()
})
try {
expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false)
expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false)
expect(reset).toHaveBeenCalledTimes(1)
expect(ui.commits).toContainEqual({
kind: "error",
text: "resize replay failed; disabled for this session",
phase: "start",
source: "system",
})
} finally {
src.close()
await transport.close()
}
})
test("drops completed historical subagent tabs during bootstrap", async () => {
const src = eventFeed()
const ui = footer()
+3 -3
View File
@@ -22,9 +22,9 @@
"zod": "catalog:"
},
"peerDependencies": {
"@opentui/core": ">=0.3.0",
"@opentui/keymap": ">=0.3.0",
"@opentui/solid": ">=0.3.0"
"@opentui/core": ">=0.3.1",
"@opentui/keymap": ">=0.3.1",
"@opentui/solid": ">=0.3.1"
},
"peerDependenciesMeta": {
"@opentui/core": {