mirror of
https://github.com/anomalyco/opencode.git
synced 2026-06-02 06:16:48 +02:00
revert(session): remove threaded status context
This commit is contained in:
@@ -57,7 +57,6 @@ type Input = {
|
||||
assistantMessage: SessionLegacy.Assistant
|
||||
sessionID: SessionID
|
||||
model: Provider.Model
|
||||
statusContext?: SessionStatus.SetContext
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
@@ -778,7 +777,7 @@ export const layer = Layer.effect(
|
||||
sessionID: ctx.assistantMessage.sessionID,
|
||||
error: ctx.assistantMessage.error,
|
||||
})
|
||||
yield* status.set(ctx.sessionID, { type: "idle" }, ctx.statusContext)
|
||||
yield* status.set(ctx.sessionID, { type: "idle" })
|
||||
})
|
||||
|
||||
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
|
||||
@@ -790,7 +789,7 @@ export const layer = Layer.effect(
|
||||
yield* Effect.gen(function* () {
|
||||
ctx.currentText = undefined
|
||||
ctx.reasoningMap = {}
|
||||
yield* status.set(ctx.sessionID, { type: "busy" }, ctx.statusContext)
|
||||
yield* status.set(ctx.sessionID, { type: "busy" })
|
||||
const stream = llm.stream(streamInput)
|
||||
|
||||
yield* stream.pipe(
|
||||
@@ -830,17 +829,13 @@ export const layer = Layer.effect(
|
||||
: Effect.void
|
||||
return event.pipe(
|
||||
Effect.andThen(
|
||||
status.set(
|
||||
ctx.sessionID,
|
||||
{
|
||||
type: "retry",
|
||||
attempt: info.attempt,
|
||||
message: info.message,
|
||||
action: info.action,
|
||||
next: info.next,
|
||||
},
|
||||
ctx.statusContext,
|
||||
),
|
||||
status.set(ctx.sessionID, {
|
||||
type: "retry",
|
||||
attempt: info.attempt,
|
||||
message: info.message,
|
||||
action: info.action,
|
||||
next: info.next,
|
||||
}),
|
||||
),
|
||||
)
|
||||
},
|
||||
|
||||
@@ -140,8 +140,7 @@ export const layer = Layer.effect(
|
||||
|
||||
const cancel = Effect.fn("SessionPrompt.cancel")(function* (sessionID: SessionID) {
|
||||
yield* elog.info("cancel", { sessionID })
|
||||
const session = yield* sessions.get(sessionID).pipe(Effect.catchCause(() => Effect.succeed(undefined)))
|
||||
yield* state.cancel(sessionID, session?.parentID ? { parentID: session.parentID } : undefined)
|
||||
yield* state.cancel(sessionID)
|
||||
})
|
||||
|
||||
const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) {
|
||||
@@ -1252,10 +1251,9 @@ export const layer = Layer.effect(
|
||||
let structured: unknown
|
||||
let step = 0
|
||||
const session = yield* sessions.get(sessionID).pipe(Effect.orDie)
|
||||
const statusContext = session.parentID ? { parentID: session.parentID } : {}
|
||||
|
||||
while (true) {
|
||||
yield* status.set(sessionID, { type: "busy" }, statusContext)
|
||||
yield* status.set(sessionID, { type: "busy" })
|
||||
yield* slog.info("loop", { step })
|
||||
|
||||
let msgs = yield* MessageV2.filterCompactedEffect(sessionID).pipe(
|
||||
@@ -1383,7 +1381,6 @@ export const layer = Layer.effect(
|
||||
assistantMessage: msg,
|
||||
sessionID,
|
||||
model,
|
||||
statusContext,
|
||||
})
|
||||
.pipe(Effect.onInterrupt(() => finalizeInterruptedAssistant))
|
||||
|
||||
@@ -1507,13 +1504,7 @@ export const layer = Layer.effect(
|
||||
|
||||
const loop: (input: LoopInput) => Effect.Effect<SessionLegacy.WithParts> = Effect.fn("SessionPrompt.loop")(
|
||||
function* (input: LoopInput) {
|
||||
const session = yield* sessions.get(input.sessionID).pipe(Effect.orDie)
|
||||
return yield* state.ensureRunning(
|
||||
input.sessionID,
|
||||
session.parentID ? { parentID: session.parentID } : {},
|
||||
lastAssistant(input.sessionID),
|
||||
runLoop(input.sessionID),
|
||||
)
|
||||
return yield* state.ensureRunning(input.sessionID, lastAssistant(input.sessionID), runLoop(input.sessionID))
|
||||
},
|
||||
)
|
||||
|
||||
@@ -1521,14 +1512,7 @@ export const layer = Layer.effect(
|
||||
"SessionPrompt.shell",
|
||||
)(function* (input: ShellInput) {
|
||||
const ready = yield* Latch.make()
|
||||
const session = yield* sessions.get(input.sessionID).pipe(Effect.orDie)
|
||||
return yield* state.startShell(
|
||||
input.sessionID,
|
||||
session.parentID ? { parentID: session.parentID } : {},
|
||||
lastAssistant(input.sessionID),
|
||||
shellImpl(input, ready),
|
||||
ready,
|
||||
)
|
||||
return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready)
|
||||
})
|
||||
|
||||
const command = Effect.fn("SessionPrompt.command")(function* (input: CommandInput) {
|
||||
|
||||
@@ -10,16 +10,14 @@ import { SessionStatus } from "./status"
|
||||
|
||||
export interface Interface {
|
||||
readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect<void, Session.BusyError>
|
||||
readonly cancel: (sessionID: SessionID, statusContext?: SessionStatus.SetContext) => Effect.Effect<void>
|
||||
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
|
||||
readonly ensureRunning: (
|
||||
sessionID: SessionID,
|
||||
statusContext: SessionStatus.SetContext,
|
||||
onInterrupt: Effect.Effect<SessionLegacy.WithParts>,
|
||||
work: Effect.Effect<SessionLegacy.WithParts>,
|
||||
) => Effect.Effect<SessionLegacy.WithParts>
|
||||
readonly startShell: (
|
||||
sessionID: SessionID,
|
||||
statusContext: SessionStatus.SetContext,
|
||||
onInterrupt: Effect.Effect<SessionLegacy.WithParts>,
|
||||
work: Effect.Effect<SessionLegacy.WithParts>,
|
||||
ready?: Latch.Latch,
|
||||
@@ -53,7 +51,6 @@ export const layer = Layer.effect(
|
||||
|
||||
const runner = Effect.fn("SessionRunState.runner")(function* (
|
||||
sessionID: SessionID,
|
||||
statusContext: SessionStatus.SetContext,
|
||||
onInterrupt: Effect.Effect<SessionLegacy.WithParts>,
|
||||
) {
|
||||
const data = yield* InstanceState.get(state)
|
||||
@@ -62,9 +59,9 @@ export const layer = Layer.effect(
|
||||
const next = Runner.make<SessionLegacy.WithParts>(data.scope, {
|
||||
onIdle: Effect.gen(function* () {
|
||||
data.runners.delete(sessionID)
|
||||
yield* status.set(sessionID, { type: "idle" }, statusContext)
|
||||
yield* status.set(sessionID, { type: "idle" })
|
||||
}),
|
||||
onBusy: status.set(sessionID, { type: "busy" }, statusContext),
|
||||
onBusy: status.set(sessionID, { type: "busy" }),
|
||||
onInterrupt,
|
||||
})
|
||||
data.runners.set(sessionID, next)
|
||||
@@ -77,15 +74,12 @@ export const layer = Layer.effect(
|
||||
if (existing?.busy) yield* busyError(sessionID)
|
||||
})
|
||||
|
||||
const cancel = Effect.fn("SessionRunState.cancel")(function* (
|
||||
sessionID: SessionID,
|
||||
statusContext?: SessionStatus.SetContext,
|
||||
) {
|
||||
const cancel = Effect.fn("SessionRunState.cancel")(function* (sessionID: SessionID) {
|
||||
yield* cancelBackgroundJobs(background, sessionID)
|
||||
const data = yield* InstanceState.get(state)
|
||||
const existing = data.runners.get(sessionID)
|
||||
if (!existing || !existing.busy) {
|
||||
yield* status.set(sessionID, { type: "idle" }, statusContext)
|
||||
yield* status.set(sessionID, { type: "idle" })
|
||||
return
|
||||
}
|
||||
yield* existing.cancel
|
||||
@@ -93,21 +87,19 @@ export const layer = Layer.effect(
|
||||
|
||||
const ensureRunning = Effect.fn("SessionRunState.ensureRunning")(function* (
|
||||
sessionID: SessionID,
|
||||
statusContext: SessionStatus.SetContext,
|
||||
onInterrupt: Effect.Effect<SessionLegacy.WithParts>,
|
||||
work: Effect.Effect<SessionLegacy.WithParts>,
|
||||
) {
|
||||
return yield* (yield* runner(sessionID, statusContext, onInterrupt)).ensureRunning(work)
|
||||
return yield* (yield* runner(sessionID, onInterrupt)).ensureRunning(work)
|
||||
})
|
||||
|
||||
const startShell = Effect.fn("SessionRunState.startShell")(function* (
|
||||
sessionID: SessionID,
|
||||
statusContext: SessionStatus.SetContext,
|
||||
onInterrupt: Effect.Effect<SessionLegacy.WithParts>,
|
||||
work: Effect.Effect<SessionLegacy.WithParts>,
|
||||
ready?: Latch.Latch,
|
||||
) {
|
||||
return yield* (yield* runner(sessionID, statusContext, onInterrupt))
|
||||
return yield* (yield* runner(sessionID, onInterrupt))
|
||||
.startShell(work, ready)
|
||||
.pipe(Effect.catchTag("RunnerBusy", () => Effect.fail(busyError(sessionID))))
|
||||
})
|
||||
|
||||
@@ -31,18 +31,12 @@ export const Info = Schema.Union([
|
||||
]).annotate({ identifier: "SessionStatus" })
|
||||
export type Info = Schema.Schema.Type<typeof Info>
|
||||
|
||||
export const SetContext = Schema.Struct({
|
||||
parentID: Schema.optional(SessionID),
|
||||
})
|
||||
export type SetContext = Schema.Schema.Type<typeof SetContext>
|
||||
|
||||
export const Event = {
|
||||
Status: EventV2.define({
|
||||
type: "session.status",
|
||||
schema: {
|
||||
sessionID: SessionID,
|
||||
status: Info,
|
||||
parentID: Schema.optional(SessionID),
|
||||
},
|
||||
}),
|
||||
// deprecated
|
||||
@@ -57,7 +51,7 @@ export const Event = {
|
||||
export interface Interface {
|
||||
readonly get: (sessionID: SessionID) => Effect.Effect<Info>
|
||||
readonly list: () => Effect.Effect<Map<SessionID, Info>>
|
||||
readonly set: (sessionID: SessionID, status: Info, context?: SetContext) => Effect.Effect<void>
|
||||
readonly set: (sessionID: SessionID, status: Info) => Effect.Effect<void>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/SessionStatus") {}
|
||||
@@ -80,13 +74,9 @@ export const layer = Layer.effect(
|
||||
return new Map(yield* InstanceState.get(state))
|
||||
})
|
||||
|
||||
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info, context?: SetContext) {
|
||||
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
|
||||
const data = yield* InstanceState.get(state)
|
||||
yield* events.publish(Event.Status, {
|
||||
sessionID,
|
||||
status,
|
||||
...(context?.parentID ? { parentID: context.parentID } : {}),
|
||||
})
|
||||
yield* events.publish(Event.Status, { sessionID, status })
|
||||
if (status.type === "idle") {
|
||||
yield* events.publish(Event.Idle, { sessionID })
|
||||
data.delete(sessionID)
|
||||
|
||||
@@ -867,40 +867,6 @@ it.instance(
|
||||
3_000,
|
||||
)
|
||||
|
||||
it.instance(
|
||||
"loop includes parentID on child session status events",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { llm } = yield* useServerConfig(providerCfg)
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
const events = yield* EventV2Bridge.Service
|
||||
const ready = yield* Deferred.make<void>()
|
||||
|
||||
yield* llm.hang
|
||||
|
||||
const parent = yield* sessions.create({})
|
||||
const child = yield* sessions.create({ parentID: parent.id })
|
||||
yield* user(child.id, "hi")
|
||||
|
||||
const off = yield* events.listen((evt) => {
|
||||
if (evt.type !== SessionStatus.Event.Status.type) return Effect.void
|
||||
const data = evt.data as typeof SessionStatus.Event.Status.data.Type
|
||||
if (data.sessionID !== child.id || data.status.type !== "busy") return Effect.void
|
||||
if (data.parentID !== parent.id) return Effect.void
|
||||
Deferred.doneUnsafe(ready, Effect.void)
|
||||
return Effect.void
|
||||
})
|
||||
yield* Effect.addFinalizer(() => off)
|
||||
|
||||
const fiber = yield* prompt.loop({ sessionID: child.id }).pipe(Effect.forkChild)
|
||||
yield* Deferred.await(ready).pipe(Effect.timeout("2 seconds"))
|
||||
yield* prompt.cancel(child.id)
|
||||
yield* Fiber.await(fiber)
|
||||
}),
|
||||
3_000,
|
||||
)
|
||||
|
||||
// Cancel semantics
|
||||
|
||||
it.instance(
|
||||
|
||||
@@ -1188,7 +1188,6 @@ export type GlobalEvent = {
|
||||
properties: {
|
||||
sessionID: string
|
||||
status: SessionStatus
|
||||
parentID?: string
|
||||
}
|
||||
}
|
||||
| {
|
||||
@@ -4276,7 +4275,6 @@ export type EventSessionStatus = {
|
||||
properties: {
|
||||
sessionID: string
|
||||
status: SessionStatus
|
||||
parentID?: string
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user