From 66ab7332de2b5087170b786fc8959e6048a0dd75 Mon Sep 17 00:00:00 2001 From: James Long Date: Wed, 18 Mar 2026 17:28:04 -0400 Subject: [PATCH] fix(core): switch /event route to effect and implement queue --- packages/opencode/src/server/routes/event.ts | 91 ++++++++++++++++++++ packages/opencode/src/server/server.ts | 63 +------------- 2 files changed, 93 insertions(+), 61 deletions(-) create mode 100644 packages/opencode/src/server/routes/event.ts diff --git a/packages/opencode/src/server/routes/event.ts b/packages/opencode/src/server/routes/event.ts new file mode 100644 index 00000000000..4e54a3b6491 --- /dev/null +++ b/packages/opencode/src/server/routes/event.ts @@ -0,0 +1,91 @@ +import { Hono } from "hono" +import { describeRoute, resolver } from "hono-openapi" +import { streamSSE } from "hono/streaming" +import { Effect, Queue, Stream } from "effect" +import { Log } from "@/util/log" +import { BusEvent } from "@/bus/bus-event" +import { Bus } from "@/bus" +import { lazy } from "../../util/lazy" + +const log = Log.create({ service: "server" }) + +export const EventRoutes = lazy(() => + new Hono().get( + "/event", + describeRoute({ + summary: "Subscribe to events", + description: "Get events", + operationId: "event.subscribe", + responses: { + 200: { + description: "Event stream", + content: { + "text/event-stream": { + schema: resolver(BusEvent.payloads()), + }, + }, + }, + }, + }), + async (c) => { + log.info("event connected") + c.header("X-Accel-Buffering", "no") + c.header("X-Content-Type-Options", "nosniff") + return streamSSE(c, async (stream) => { + await Effect.runPromise( + Stream.callback((q) => + Effect.acquireRelease( + Effect.sync(() => { + stream.onAbort(() => { + Queue.endUnsafe(q) + }) + + Queue.offerUnsafe( + q, + JSON.stringify({ + type: "server.connected", + properties: {}, + }), + ) + + const unsub = Bus.subscribeAll((event) => { + Queue.offerUnsafe(q, JSON.stringify(event)) + if (event.type === Bus.InstanceDisposed.type) { + Queue.endUnsafe(q) + } + }) + + // Send heartbeat every 10s to prevent stalled proxy streams. + const heartbeat = setInterval(() => { + Queue.offerUnsafe( + q, + JSON.stringify({ + type: "server.heartbeat", + properties: {}, + }), + ) + }, 10_000) + + return { heartbeat, unsub } + }), + (x) => + Effect.sync(() => { + clearInterval(x.heartbeat) + x.unsub() + Queue.endUnsafe(q) + log.info("event disconnected") + }), + ), + ).pipe( + Stream.runForEach((data) => + Effect.tryPromise({ + try: () => stream.writeSSE({ data }), + catch: () => {}, + }), + ), + ), + ) + }) + }, + ), +) diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index c485654fdf8..a68becb1fba 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -1,10 +1,7 @@ -import { BusEvent } from "@/bus/bus-event" -import { Bus } from "@/bus" import { Log } from "../util/log" import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi" import { Hono } from "hono" import { cors } from "hono/cors" -import { streamSSE } from "hono/streaming" import { proxy } from "hono/proxy" import { basicAuth } from "hono/basic-auth" import z from "zod" @@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file" import { ConfigRoutes } from "./routes/config" import { ExperimentalRoutes } from "./routes/experimental" import { ProviderRoutes } from "./routes/provider" +import { EventRoutes } from "./routes/event" import { InstanceBootstrap } from "../project/bootstrap" import { NotFoundError } from "../storage/db" import type { ContentfulStatusCode } from "hono/utils/http-status" @@ -251,6 +249,7 @@ export namespace Server { .route("/question", QuestionRoutes()) .route("/provider", ProviderRoutes()) .route("/", FileRoutes()) + .route("/", EventRoutes()) .route("/mcp", McpRoutes()) .route("/tui", TuiRoutes()) .post( @@ -498,64 +497,6 @@ export namespace Server { return c.json(await Format.status()) }, ) - .get( - "/event", - describeRoute({ - summary: "Subscribe to events", - description: "Get events", - operationId: "event.subscribe", - responses: { - 200: { - description: "Event stream", - content: { - "text/event-stream": { - schema: resolver(BusEvent.payloads()), - }, - }, - }, - }, - }), - async (c) => { - log.info("event connected") - c.header("X-Accel-Buffering", "no") - c.header("X-Content-Type-Options", "nosniff") - return streamSSE(c, async (stream) => { - stream.writeSSE({ - data: JSON.stringify({ - type: "server.connected", - properties: {}, - }), - }) - const unsub = Bus.subscribeAll(async (event) => { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - if (event.type === Bus.InstanceDisposed.type) { - stream.close() - } - }) - - // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { - stream.writeSSE({ - data: JSON.stringify({ - type: "server.heartbeat", - properties: {}, - }), - }) - }, 10_000) - - await new Promise((resolve) => { - stream.onAbort(() => { - clearInterval(heartbeat) - unsub() - resolve() - log.info("event disconnected") - }) - }) - }) - }, - ) .all("/*", async (c) => { const path = c.req.path