fix(session): enrich status events with parent id

This commit is contained in:
Shoubhit Dash
2026-05-31 19:03:23 +05:30
parent fd14c00481
commit c7faad0f8f
5 changed files with 66 additions and 10 deletions
+11 -2
View File
@@ -4,6 +4,8 @@ import { NonNegativeInt } from "@opencode-ai/core/schema"
import { Effect, Layer, Context, Schema } from "effect"
import { EventV2Bridge } from "@/event-v2-bridge"
import { EventV2 } from "@opencode-ai/core/event"
import { NotFoundError } from "@/storage/storage"
import { Session } from "./session"
export const Info = Schema.Union([
Schema.Struct({
@@ -37,6 +39,7 @@ export const Event = {
schema: {
sessionID: SessionID,
status: Info,
parentID: Schema.optional(SessionID),
},
}),
// deprecated
@@ -60,6 +63,7 @@ export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const events = yield* EventV2Bridge.Service
const sessions = yield* Session.Service
const state = yield* InstanceState.make(
Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map<SessionID, Info>())),
@@ -76,7 +80,12 @@ export const layer = Layer.effect(
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
const data = yield* InstanceState.get(state)
yield* events.publish(Event.Status, { sessionID, status })
const session = yield* sessions.get(sessionID).pipe(Effect.catchIf(NotFoundError.isInstance, () => Effect.void))
yield* events.publish(Event.Status, {
sessionID,
status,
...(session?.parentID ? { parentID: session.parentID } : {}),
})
if (status.type === "idle") {
yield* events.publish(Event.Idle, { sessionID })
data.delete(sessionID)
@@ -89,6 +98,6 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer.pipe(Layer.provide(EventV2Bridge.defaultLayer))
export const defaultLayer = layer.pipe(Layer.provide(Session.defaultLayer), Layer.provide(EventV2Bridge.defaultLayer))
export * as SessionStatus from "./status"
@@ -269,7 +269,7 @@ function withCompaction(options?: CompactionProcessOptions) {
function compactionProcessLayer(options?: CompactionProcessOptions) {
const events = EventV2Bridge.defaultLayer
const status = SessionStatus.layer.pipe(Layer.provide(events))
const status = SessionStatus.layer.pipe(Layer.provide(SessionNs.defaultLayer), Layer.provide(events))
const processor = options?.llm
? SessionProcessorModule.SessionProcessor.layer.pipe(
Layer.provide(summary),
@@ -171,7 +171,10 @@ const assistant = Effect.fn("TestSession.assistant")(function* (
return msg
})
const status = SessionStatus.layer.pipe(Layer.provideMerge(EventV2Bridge.defaultLayer))
const status = SessionStatus.layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provideMerge(EventV2Bridge.defaultLayer),
)
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
const deps = Layer.mergeAll(
Session.defaultLayer,
@@ -588,15 +591,18 @@ it.live("session.processor effect tests publish retry status updates", () =>
yield* llm.error(503, { error: "boom" })
yield* llm.text("")
const chat = yield* session.create({})
const root = yield* session.create({})
const chat = yield* session.create({ parentID: root.id })
const parent = yield* user(chat.id, "retry")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const states: number[] = []
const states: Array<{ attempt: number; parentID?: SessionID }> = []
const off = yield* events.listen((evt) => {
if (evt.type !== SessionStatus.Event.Status.type) return Effect.void
const data = evt.data as typeof SessionStatus.Event.Status.data.Type
if (data.sessionID === chat.id && data.status.type === "retry") states.push(data.status.attempt)
if (data.sessionID === chat.id && data.status.type === "retry") {
states.push({ attempt: data.status.attempt, parentID: data.parentID })
}
return Effect.void
})
const handle = yield* processors.create({
@@ -626,7 +632,7 @@ it.live("session.processor effect tests publish retry status updates", () =>
expect(value).toBe("continue")
expect(yield* llm.calls).toBe(2)
expect(states).toStrictEqual([1])
expect(states).toStrictEqual([{ attempt: 1, parentID: root.id }])
}),
{ config: (url) => providerCfg(url) },
),
+39 -1
View File
@@ -153,7 +153,10 @@ const lsp = Layer.succeed(
}),
)
const status = SessionStatus.layer.pipe(Layer.provideMerge(EventV2Bridge.defaultLayer))
const status = SessionStatus.layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provideMerge(EventV2Bridge.defaultLayer),
)
const run = SessionRunState.layer.pipe(Layer.provide(status))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
@@ -867,6 +870,41 @@ it.instance(
3_000,
)
it.instance(
"loop status events identify child sessions through cancellation",
() =>
Effect.gen(function* () {
const { llm } = yield* useServerConfig(providerCfg)
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const events = yield* EventV2Bridge.Service
const statuses: Array<typeof SessionStatus.Event.Status.data.Type> = []
yield* llm.hang
const parent = yield* sessions.create({})
const child = yield* sessions.create({ parentID: parent.id })
yield* user(child.id, "hi")
const off = yield* events.listen((event) => {
if (event.type !== SessionStatus.Event.Status.type) return Effect.void
const data = event.data as typeof SessionStatus.Event.Status.data.Type
if (data.sessionID === child.id) statuses.push(data)
return Effect.void
})
yield* Effect.addFinalizer(() => off)
const fiber = yield* prompt.loop({ sessionID: child.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
yield* prompt.cancel(child.id)
yield* Fiber.await(fiber)
expect(statuses.some((event) => event.status.type === "idle")).toBe(true)
expect(statuses.every((event) => event.parentID === parent.id)).toBe(true)
}),
3_000,
)
// Cancel semantics
it.instance(
@@ -109,7 +109,10 @@ const lsp = Layer.succeed(
}),
)
const status = SessionStatus.layer.pipe(Layer.provideMerge(EventV2Bridge.defaultLayer))
const status = SessionStatus.layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provideMerge(EventV2Bridge.defaultLayer),
)
const run = SessionRunState.layer.pipe(Layer.provide(status))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)