fix: /event bug

This commit is contained in:
Aiden Cline
2026-05-16 00:05:35 -05:00
parent 0f31fd631b
commit 1a2573dae6
4 changed files with 122 additions and 19 deletions
@@ -1,6 +1,6 @@
import { Bus } from "@/bus"
import * as Log from "@opencode-ai/core/util/log"
import { Effect } from "effect"
import { Effect, Queue } from "effect"
import * as Stream from "effect/Stream"
import { HttpServerResponse } from "effect/unstable/http"
import { HttpApiBuilder } from "effect/unstable/httpapi"
@@ -8,6 +8,7 @@ import * as Sse from "effect/unstable/encoding/Sse"
import { EventApi } from "../groups/event"
const log = Log.create({ service: "server" })
type InstanceEvent = { id: string; type: string; properties: unknown }
function eventData(data: unknown): Sse.Event {
return {
@@ -22,7 +23,7 @@ function eventResponse(bus: Bus.Interface) {
return Effect.gen(function* () {
const context = yield* Effect.context()
const events = bus.subscribeAll().pipe(
const events = connectedEvents(bus).pipe(
Stream.provideContext(context),
Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type),
)
@@ -33,8 +34,8 @@ function eventResponse(bus: Bus.Interface) {
log.info("event connected")
return HttpServerResponse.stream(
Stream.make({ id: Bus.createID(), type: "server.connected", properties: {} }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
events.pipe(
Stream.merge(heartbeat, { haltStrategy: "left" }),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),
Stream.encodeText,
@@ -52,6 +53,21 @@ function eventResponse(bus: Bus.Interface) {
})
}
function connectedEvents(bus: Bus.Interface) {
return Stream.callback<InstanceEvent>((queue) =>
Effect.acquireRelease(
bus.subscribeAllCallback((event) => Queue.offerUnsafe(queue, event)).pipe(
Effect.tap(() => Effect.sync(() => Queue.offerUnsafe(queue, connectedEvent()))),
),
(unsubscribe) => Effect.sync(unsubscribe),
),
)
}
function connectedEvent(): InstanceEvent {
return { id: Bus.createID(), type: "server.connected", properties: {} }
}
export const eventHandlers = HttpApiBuilder.group(EventApi, "event", (handlers) =>
Effect.gen(function* () {
const bus = yield* Bus.Service
+12 -7
View File
@@ -518,6 +518,11 @@ export const layer: Layer.Layer<
const storage = yield* Storage.Service
const sync = yield* SyncEvent.Service
const flags = yield* RuntimeFlags.Service
const syncRun = <Def extends SyncEvent.Definition>(
def: Def,
data: SyncEvent.Event<Def>["data"],
options?: { publish?: boolean },
) => sync.run(def, data, { ...options, publishBus: bus })
const createNext = Effect.fn("Session.createNext")(function* (input: {
id?: SessionID
@@ -553,7 +558,7 @@ export const layer: Layer.Layer<
}
log.info("created", result)
yield* sync.run(Event.Created, { sessionID: result.id, info: result })
yield* syncRun(Event.Created, { sessionID: result.id, info: result })
if (!flags.experimentalWorkspaces) {
// This only exist for backwards compatibility. We should not be
@@ -607,7 +612,7 @@ export const layer: Layer.Layer<
yield* remove(child.id)
}
yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
yield* syncRun(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
yield* sync.remove(sessionID)
} catch (e) {
log.error(e)
@@ -616,13 +621,13 @@ export const layer: Layer.Layer<
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
yield* syncRun(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
return msg
}).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* sync.run(MessageV2.Event.PartUpdated, {
yield* syncRun(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
@@ -717,7 +722,7 @@ export const layer: Layer.Layer<
return session
})
const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
const patch = (sessionID: SessionID, info: Patch) => syncRun(Event.Updated, { sessionID, info })
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
yield* patch(sessionID, { time: { updated: Date.now() } })
@@ -788,7 +793,7 @@ export const layer: Layer.Layer<
sessionID: SessionID
messageID: MessageID
}) {
yield* sync.run(MessageV2.Event.Removed, {
yield* syncRun(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
})
@@ -800,7 +805,7 @@ export const layer: Layer.Layer<
messageID: MessageID
partID: PartID
}) {
yield* sync.run(MessageV2.Event.PartRemoved, {
yield* syncRun(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
+30 -6
View File
@@ -11,9 +11,10 @@ import type { InstanceContext } from "@/project/instance-context"
import { EventSequenceTable, EventTable } from "./event.sql"
import type { WorkspaceID } from "@/control-plane/schema"
import { EventID } from "./schema"
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
import { Context, Effect, Layer, Option, Schema as EffectSchema } from "effect"
import type { DeepMutable } from "@opencode-ai/core/schema"
import { EventV2 } from "@opencode-ai/core/event"
import { EffectBridge } from "@/effect/bridge"
import { serviceUse } from "@/effect/service-use"
import { InstanceState } from "@/effect/instance-state"
import { RuntimeFlags } from "@/effect/runtime-flags"
@@ -54,12 +55,13 @@ type PublishContext = {
instance?: InstanceContext
workspace?: WorkspaceID
}
type Publish = <Def extends Definition>(def: Def, data: Properties<Def>, eventID: string) => void
export interface Interface {
readonly run: <Def extends Definition>(
def: Def,
data: Event<Def>["data"],
options?: { publish?: boolean },
options?: { publish?: boolean; publishBus?: ProjectBus.Interface },
) => Effect.Effect<void>
readonly replay: (event: SerializedEvent, options?: { publish: boolean; ownerID?: string }) => Effect.Effect<void>
readonly replayAll: (
@@ -113,6 +115,7 @@ export const layer = Layer.effect(Service)(
: undefined
process(def, event, {
publish,
publishWith: publish ? yield* makePublish() : undefined,
context,
ownerID: options?.ownerID,
experimentalWorkspaces: flags.experimentalWorkspaces,
@@ -157,7 +160,9 @@ export const layer = Layer.effect(Service)(
workspace: yield* InstanceState.workspaceID,
}
: undefined
const publishWith = publish
? yield* makePublish(options?.publishBus)
: undefined
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
@@ -172,7 +177,7 @@ export const layer = Layer.effect(Service)(
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish, context, experimentalWorkspaces: flags.experimentalWorkspaces })
process(def, event, { publish, publishWith, context, experimentalWorkspaces: flags.experimentalWorkspaces })
},
{
behavior: "immediate",
@@ -303,7 +308,13 @@ function register(def: Definition) {
function process<Def extends Definition>(
def: Def,
event: Event<Def>,
options: { publish: boolean; context?: PublishContext; ownerID?: string; experimentalWorkspaces: boolean },
options: {
publish: boolean
publishWith?: Publish
context?: PublishContext
ownerID?: string
experimentalWorkspaces: boolean
},
) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
@@ -348,7 +359,10 @@ function process<Def extends Definition>(
}
const result = convertEvent(def.type, event.data)
const publish = (data: unknown) => ProjectBus.publish(def, data as Properties<Def>, { id: event.id })
const publish = (data: unknown) =>
options.publishWith
? options.publishWith(def, data as Properties<Def>, event.id)
: ProjectBus.publish(def, data as Properties<Def>, { id: event.id })
if (result instanceof Promise) {
void result.then(publish)
} else {
@@ -372,6 +386,16 @@ function process<Def extends Definition>(
})
}
function makePublish(bus?: ProjectBus.Interface) {
return Effect.gen(function* () {
const activeBus = bus ?? Option.getOrUndefined(yield* Effect.serviceOption(ProjectBus.Service))
if (!activeBus) return undefined
const bridge = yield* EffectBridge.make()
return (<Def extends Definition>(def: Def, data: Properties<Def>, eventID: string) =>
bridge.fork(activeBus.publish(def, data, { id: eventID }))) satisfies Publish
})
}
export function effectPayloads() {
return [
...registry
@@ -1,5 +1,5 @@
import { afterEach, describe, expect } from "bun:test"
import { ConfigProvider, Effect, Layer } from "effect"
import { ConfigProvider, Deferred, Effect, Layer } from "effect"
import type * as Scope from "effect/Scope"
import { HttpRouter } from "effect/unstable/http"
import { ChildProcessSpawner } from "effect/unstable/process"
@@ -22,7 +22,7 @@ import { TestLLMServer } from "../lib/llm-server"
import path from "path"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, TestInstance, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { awaitWithTimeout, testEffect } from "../lib/effect"
const noopBootstrap = Layer.succeed(InstanceBootstrap.Service, InstanceBootstrap.Service.of({ run: Effect.void }))
const it = testEffect(
@@ -359,6 +359,23 @@ function seedMessage(directory: string, sessionID: string) {
)
}
function collectPartUpdated(
events: Awaited<ReturnType<Sdk["event"]["subscribe"]>>,
ready: Deferred.Deferred<void>,
received: Deferred.Deferred<unknown>,
) {
return call(async () => {
for await (const event of events.stream) {
const payload = record(event).payload ?? event
if (record(payload).type === "server.connected") Deferred.doneUnsafe(ready, Effect.void)
if (record(payload).type === MessageV2.Event.PartUpdated.type) {
Deferred.doneUnsafe(received, Effect.succeed(payload))
return
}
}
})
}
afterEach(async () => {
Flag.OPENCODE_SERVER_PASSWORD = original.OPENCODE_SERVER_PASSWORD
Flag.OPENCODE_SERVER_USERNAME = original.OPENCODE_SERVER_USERNAME
@@ -439,6 +456,47 @@ describe("HttpApi SDK", () => {
),
)
serverPathParity("streams sync-backed part updates on SDK instance events", (serverPath) =>
withStandardProject(serverPath, ({ sdk }) =>
Effect.gen(function* () {
const session = yield* capture(() => sdk.session.create({ title: "reasoning part event" }))
const sessionID = String(record(session.data).id)
const messageID = MessageID.ascending()
const partID = PartID.ascending()
const controller = new AbortController()
yield* Effect.addFinalizer(() => Effect.sync(() => controller.abort()))
const events = yield* call(() => sdk.event.subscribe(undefined, { signal: controller.signal }))
yield* Effect.addFinalizer(() =>
call(async () => void (await events.stream.return?.(undefined))).pipe(Effect.ignore),
)
const ready = yield* Deferred.make<void>()
const received = yield* Deferred.make<unknown>()
yield* collectPartUpdated(events, ready, received).pipe(Effect.forkScoped)
yield* awaitWithTimeout(Deferred.await(ready), "timed out waiting for /event connection")
const updated = yield* capture(() =>
sdk.part.update({
sessionID,
messageID,
partID,
part: {
id: partID,
sessionID,
messageID,
type: "reasoning",
text: "updated reasoning",
time: { start: Date.now() },
},
}),
)
expect(updated.status).toBe(200)
const event = yield* awaitWithTimeout(Deferred.await(received), "timed out waiting for message.part.updated")
expect(record(record(event).properties).part).toMatchObject({ id: partID, type: "reasoning" })
}),
),
)
serverPathParity("matches generated SDK missing session errors", (serverPath) =>
withStandardProject(serverPath, ({ sdk }) =>
Effect.gen(function* () {