fix(acp): honor session/cancel by aborting the running turn (#30145)

Co-authored-by: Shoubhit Dash <shoubhit2005@gmail.com>
This commit is contained in:
smagnuso
2026-06-01 01:38:28 -07:00
committed by GitHub
parent fd2278eefc
commit 50b4ad89b3
2 changed files with 52 additions and 47 deletions
+20 -13
View File
@@ -330,25 +330,34 @@ export function make(input: {
} }
}) })
const abortBackingSession = Effect.fn("ACP.abortBackingSession")(function* (current: ACPSession.Info) {
yield* request(
() => input.sdk.session.abort({ directory: current.cwd, sessionID: current.id }, { throwOnError: true }),
"session",
).pipe(
Effect.catch((error) =>
Effect.sync(() => {
log.error("failed to abort ACP backing session", { error, sessionID: current.id })
}),
),
)
})
const closeSession = Effect.fn("ACP.closeSession")(function* (params: CloseSessionRequest) { const closeSession = Effect.fn("ACP.closeSession")(function* (params: CloseSessionRequest) {
const removed = yield* session.remove(params.sessionId) const removed = yield* session.remove(params.sessionId)
registeredMcp.delete(params.sessionId) registeredMcp.delete(params.sessionId)
sessionSnapshots.delete(params.sessionId) sessionSnapshots.delete(params.sessionId)
if (!removed) return {} if (!removed) return {}
yield* request( yield* abortBackingSession(removed)
() => input.sdk.session.abort({ directory: removed.cwd, sessionID: params.sessionId }, { throwOnError: true }),
"session",
).pipe(
Effect.catch((error) =>
Effect.sync(() => {
log.error("failed to abort session while closing ACP session", { error, sessionID: params.sessionId })
}),
),
)
return {} return {}
}) })
const cancel = Effect.fn("ACP.cancel")(function* (params: CancelNotification) {
const current = yield* session.get(params.sessionId)
yield* abortBackingSession(current)
})
const forkSession = Effect.fn("ACP.forkSession")(function* (params: ForkSessionRequest) { const forkSession = Effect.fn("ACP.forkSession")(function* (params: ForkSessionRequest) {
const snapshot = yield* directorySnapshot(params.cwd) const snapshot = yield* directorySnapshot(params.cwd)
const forked = yield* request( const forked = yield* request(
@@ -563,9 +572,7 @@ export function make(input: {
yield* sendUsageUpdate(input.usage, input.sdk, input.connection, current.id, current.cwd) yield* sendUsageUpdate(input.usage, input.sdk, input.connection, current.id, current.cwd)
return promptResponse(undefined, params.messageId) return promptResponse(undefined, params.messageId)
}), }),
cancel: Effect.fn("ACP.cancel")(function* (_input: CancelNotification) { cancel,
return yield* new ACPError.UnsupportedOperationError({ method: "session/cancel" })
}),
} }
} }
@@ -12,10 +12,9 @@ import type {
} from "@agentclientprotocol/sdk" } from "@agentclientprotocol/sdk"
import type { OpencodeClient } from "@opencode-ai/sdk/v2" import type { OpencodeClient } from "@opencode-ai/sdk/v2"
import { ProviderV2 } from "@opencode-ai/core/provider" import { ProviderV2 } from "@opencode-ai/core/provider"
import { Effect, ManagedRuntime } from "effect" import { Effect } from "effect"
import * as ACPService from "@/acp/service" import * as ACPService from "@/acp/service"
import * as ACPError from "@/acp/error" import * as ACPError from "@/acp/error"
import { ACPSession } from "@/acp/session"
import { UsageService } from "@/acp/usage" import { UsageService } from "@/acp/usage"
import type { Provider } from "@/provider/provider" import type { Provider } from "@/provider/provider"
@@ -142,7 +141,10 @@ const provider: Provider.Info = {
} }
describe("ACP service sessions", () => { describe("ACP service sessions", () => {
const makeService = (messages: readonly { info: unknown; parts: readonly unknown[] }[] = []) => { const makeService = (
messages: readonly { info: unknown; parts: readonly unknown[] }[] = [],
options?: { abort?: (input: { sessionID: string }) => Promise<{ data: boolean }> },
) => {
const updates: SessionNotification[] = [] const updates: SessionNotification[] = []
const mcpAdds: string[] = [] const mcpAdds: string[] = []
const aborts: string[] = [] const aborts: string[] = []
@@ -220,10 +222,12 @@ describe("ACP service sessions", () => {
summarizes.push(input) summarizes.push(input)
return Promise.resolve({ data: true }) return Promise.resolve({ data: true })
}, },
abort: (input: { sessionID: string }) => { abort:
options?.abort ??
((input: { sessionID: string }) => {
aborts.push(input.sessionID) aborts.push(input.sessionID)
return Promise.resolve({ data: true }) return Promise.resolve({ data: true })
}, }),
fork: (input: { sessionID: string }) => { fork: (input: { sessionID: string }) => {
forks.push(input.sessionID) forks.push(input.sessionID)
return Promise.resolve({ data: { id: `fork_${input.sessionID}` } }) return Promise.resolve({ data: { id: `fork_${input.sessionID}` } })
@@ -381,34 +385,28 @@ describe("ACP service sessions", () => {
expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({}) expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({})
}) })
it("does not fail close when backing abort fails", async () => { it("cancel aborts the backing session and keeps the ACP session", async () => {
const sessionService = ManagedRuntime.make(ACPSession.defaultLayer).runSync( const { service, aborts } = makeService()
ACPSession.Service.use((service) => Effect.succeed(service)), const created = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] }))
)
const { service } = makeService()
const sdk = {
config: {
providers: () => Promise.resolve({ data: { providers: [provider], default: { test: modelID } } }),
get: () => Promise.resolve({ data: {} }),
},
app: {
agents: () => Promise.resolve({ data: [{ name: "build", mode: "primary", permission: [], options: {} }] }),
skills: () => Promise.resolve({ data: [] }),
},
command: {
list: () => Promise.resolve({ data: [] }),
},
session: {
abort: () => Promise.reject(new Error("nope")),
},
mcp: {
add: () => Promise.resolve({ data: {} }),
},
} as unknown as OpencodeClient
const closing = ACPService.make({ sdk, session: sessionService })
await Effect.runPromise(sessionService.create({ id: "ses_close", cwd: "/workspace" }))
expect(await Effect.runPromise(closing.closeSession({ sessionId: "ses_close" }))).toEqual({}) await Effect.runPromise(service.cancel({ sessionId: created.sessionId }))
// The running turn was aborted via the core session API.
expect(aborts).toEqual([created.sessionId])
// Unlike closeSession, the ACP session is still present afterwards so
// the client can keep prompting.
const stillUsable = await Effect.runPromise(
service.setSessionConfigOption({ sessionId: created.sessionId, configId: "effort", value: "high" }),
)
expect(stillUsable).toBeDefined()
})
it("does not fail cancel or close when the backing abort fails", async () => {
const { service } = makeService([], { abort: () => Promise.reject(new Error("nope")) })
const created = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] }))
await Effect.runPromise(service.cancel({ sessionId: created.sessionId }))
expect(await Effect.runPromise(service.closeSession({ sessionId: created.sessionId }))).toEqual({})
expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({}) expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({})
}) })