From 5ddd079d9f7a1ecc100a38694d8ebb7d58a718d2 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 15:27:45 -0400 Subject: [PATCH 1/5] feat(filesystem): add AppFileSystem service wrapping Effect FileSystem App-owned service that extends Effect's built-in FileSystem with opencode helpers: isDir, isFile, readJson, writeJson, ensureDir, writeWithDirs, findUp, up, globUp. Pure path helpers (mimeType, resolve, contains, overlaps, windowsPath, normalizePath) remain as static functions on the namespace. --- packages/opencode/src/filesystem/index.ts | 194 +++++++++++ packages/opencode/src/tool/streaming-event.ts | 161 +++++++++ .../test/filesystem/filesystem.test.ts | 319 ++++++++++++++++++ .../test/tool/streaming-event.test.ts | 75 ++++ 4 files changed, 749 insertions(+) create mode 100644 packages/opencode/src/filesystem/index.ts create mode 100644 packages/opencode/src/tool/streaming-event.ts create mode 100644 packages/opencode/test/filesystem/filesystem.test.ts create mode 100644 packages/opencode/test/tool/streaming-event.test.ts diff --git a/packages/opencode/src/filesystem/index.ts b/packages/opencode/src/filesystem/index.ts new file mode 100644 index 00000000000..3c8c358fb8e --- /dev/null +++ b/packages/opencode/src/filesystem/index.ts @@ -0,0 +1,194 @@ +import { dirname, join, relative, resolve as pathResolve } from "path" +import { realpathSync } from "fs" +import { lookup } from "mime-types" +import { Effect, FileSystem, Layer, Schema, ServiceMap } from "effect" +import type { PlatformError } from "effect/PlatformError" +import { Glob } from "../util/glob" + +export namespace AppFileSystem { + export class FileSystemError extends Schema.TaggedErrorClass()("FileSystemError", { + method: Schema.String, + cause: Schema.optional(Schema.Defect), + }) {} + + export type Error = PlatformError | FileSystemError + + export interface Interface extends FileSystem.FileSystem { + readonly isDir: (path: string) => Effect.Effect + readonly isFile: (path: string) => Effect.Effect + readonly readJson: (path: string) => Effect.Effect + readonly writeJson: (path: string, data: unknown, mode?: number) => Effect.Effect + readonly ensureDir: (path: string) => Effect.Effect + readonly writeWithDirs: (path: string, content: string | Uint8Array, mode?: number) => Effect.Effect + readonly findUp: (target: string, start: string, stop?: string) => Effect.Effect + readonly up: (options: { targets: string[]; start: string; stop?: string }) => Effect.Effect + readonly globUp: (pattern: string, start: string, stop?: string) => Effect.Effect + readonly glob: (pattern: string, options?: Glob.Options) => Effect.Effect + readonly globMatch: (pattern: string, filepath: string) => boolean + } + + export class Service extends ServiceMap.Service()("@opencode/FileSystem") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem + + const isDir = Effect.fn("FileSystem.isDir")(function* (path: string) { + const info = yield* fs.stat(path).pipe(Effect.catch(() => Effect.void)) + return info?.type === "Directory" + }) + + const isFile = Effect.fn("FileSystem.isFile")(function* (path: string) { + const info = yield* fs.stat(path).pipe(Effect.catch(() => Effect.void)) + return info?.type === "File" + }) + + const readJson = Effect.fn("FileSystem.readJson")(function* (path: string) { + const text = yield* fs.readFileString(path) + return JSON.parse(text) + }) + + const writeJson = Effect.fn("FileSystem.writeJson")(function* (path: string, data: unknown, mode?: number) { + const content = JSON.stringify(data, null, 2) + yield* fs.writeFileString(path, content) + if (mode) yield* fs.chmod(path, mode) + }) + + const ensureDir = Effect.fn("FileSystem.ensureDir")(function* (path: string) { + yield* fs.makeDirectory(path, { recursive: true }) + }) + + const writeWithDirs = Effect.fn("FileSystem.writeWithDirs")(function* ( + path: string, + content: string | Uint8Array, + mode?: number, + ) { + const write = typeof content === "string" ? fs.writeFileString(path, content) : fs.writeFile(path, content) + + yield* write.pipe( + Effect.catchIf( + (e) => e.reason._tag === "NotFound", + () => + Effect.gen(function* () { + yield* fs.makeDirectory(dirname(path), { recursive: true }) + yield* write + }), + ), + ) + if (mode) yield* fs.chmod(path, mode) + }) + + const glob = Effect.fn("FileSystem.glob")(function* (pattern: string, options?: Glob.Options) { + return yield* Effect.tryPromise({ + try: () => Glob.scan(pattern, options), + catch: (cause) => new FileSystemError({ method: "glob", cause }), + }) + }) + + const findUp = Effect.fn("FileSystem.findUp")(function* (target: string, start: string, stop?: string) { + const result: string[] = [] + let current = start + while (true) { + const search = join(current, target) + if (yield* fs.exists(search)) result.push(search) + if (stop === current) break + const parent = dirname(current) + if (parent === current) break + current = parent + } + return result + }) + + const up = Effect.fn("FileSystem.up")(function* (options: { targets: string[]; start: string; stop?: string }) { + const result: string[] = [] + let current = options.start + while (true) { + for (const target of options.targets) { + const search = join(current, target) + if (yield* fs.exists(search)) result.push(search) + } + if (options.stop === current) break + const parent = dirname(current) + if (parent === current) break + current = parent + } + return result + }) + + const globUp = Effect.fn("FileSystem.globUp")(function* (pattern: string, start: string, stop?: string) { + const result: string[] = [] + let current = start + while (true) { + const matches = yield* glob(pattern, { cwd: current, absolute: true, include: "file", dot: true }).pipe( + Effect.catch(() => Effect.succeed([] as string[])), + ) + result.push(...matches) + if (stop === current) break + const parent = dirname(current) + if (parent === current) break + current = parent + } + return result + }) + + return Service.of({ + ...fs, + isDir, + isFile, + readJson, + writeJson, + ensureDir, + writeWithDirs, + findUp, + up, + globUp, + glob, + globMatch: Glob.match, + }) + }), + ) + + // Pure helpers that don't need Effect (path manipulation, sync operations) + export function mimeType(p: string): string { + return lookup(p) || "application/octet-stream" + } + + export function normalizePath(p: string): string { + if (process.platform !== "win32") return p + try { + return realpathSync.native(p) + } catch { + return p + } + } + + export function resolve(p: string): string { + const resolved = pathResolve(windowsPath(p)) + try { + return normalizePath(realpathSync(resolved)) + } catch (e: any) { + if (e?.code === "ENOENT") return normalizePath(resolved) + throw e + } + } + + export function windowsPath(p: string): string { + if (process.platform !== "win32") return p + return p + .replace(/^\/([a-zA-Z]):(?:[\\/]|$)/, (_, drive) => `${drive.toUpperCase()}:/`) + .replace(/^\/([a-zA-Z])(?:\/|$)/, (_, drive) => `${drive.toUpperCase()}:/`) + .replace(/^\/cygdrive\/([a-zA-Z])(?:\/|$)/, (_, drive) => `${drive.toUpperCase()}:/`) + .replace(/^\/mnt\/([a-zA-Z])(?:\/|$)/, (_, drive) => `${drive.toUpperCase()}:/`) + } + + export function overlaps(a: string, b: string) { + const relA = relative(a, b) + const relB = relative(b, a) + return !relA || !relA.startsWith("..") || !relB || !relB.startsWith("..") + } + + export function contains(parent: string, child: string) { + return !relative(parent, child).startsWith("..") + } +} diff --git a/packages/opencode/src/tool/streaming-event.ts b/packages/opencode/src/tool/streaming-event.ts new file mode 100644 index 00000000000..c5b12321a03 --- /dev/null +++ b/packages/opencode/src/tool/streaming-event.ts @@ -0,0 +1,161 @@ +import { Bus } from "@/bus" +import { BusEvent } from "@/bus/bus-event" +import { Instance } from "@/project/instance" +import { Effect, Fiber, Layer, ManagedRuntime, Queue, ServiceMap, Stream } from "effect" +import * as readline from "node:readline" +import z from "zod" +import { Log } from "../util/log" + +export namespace EventStream { + const log = Log.create({ service: "streaming-event" }) + + export interface Event { + readonly type: string + readonly properties: unknown + } + + export interface Api { + readonly eventStream: () => Stream.Stream + } + + export class Service extends ServiceMap.Service()("@opencode/EventStream") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + // Bridge callback-based Bus events into a Stream, but only while the + // shared stream has at least one active consumer. + const stream = yield* Stream.callback((queue) => + Effect.acquireRelease( + Effect.sync(() => { + log.info("subscribe all created") + return Bus.subscribeAll((event) => { + log.info("event", event) + Queue.offerUnsafe(queue, event) + }) + }), + (unsub) => + Effect.sync(() => { + log.info("subscribe all released") + unsub() + }), + ), + ).pipe(Stream.share({ capacity: "unbounded" })) + + return Service.of({ + eventStream: () => stream, + }) + }), + ) + + export const defaultLayer = layer + export const runtime = ManagedRuntime.make(defaultLayer) + + export function eventStream() { + return runtime.runPromise(Service.use((svc) => Effect.succeed(svc.eventStream()))) + } + + export function dispose() { + return runtime.dispose() + } +} + +const NumberEvent = BusEvent.define( + "test.number", + z.object({ + value: z.number(), + }), +) + +function consumer(name: string, stream: Stream.Stream) { + // Each consumer runs independently, but Stream.share ensures they all fan + // out from a single underlying Bus subscription. + return EventStream.runtime.runFork( + stream.pipe( + Stream.tap((event) => Effect.sync(() => console.log(`[${name}]`, event))), + Stream.runDrain, + ), + ) +} + +async function main() { + // The promise API mirrors how this service would be consumed from a server: + // get a stream once, then attach and detach listeners over time. + const stream = await EventStream.eventStream() + const subs = new Map>() + let id = 0 + + const add = () => { + id += 1 + const name = `consumer-${id}` + subs.set(name, consumer(name, stream)) + console.log(`added ${name}`) + } + + const remove = async () => { + const name = [...subs.keys()].at(-1) + if (!name) { + console.log("no subscribers") + return + } + const fiber = subs.get(name) + subs.delete(name) + if (fiber) await EventStream.runtime.runPromise(Fiber.interrupt(fiber)) + console.log(`removed ${name}`) + } + + add() + add() + + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + }) + + // `+` and `-` let us verify the subscribe / release logs as the ref-count + // moves between zero and one-or-more consumers. + console.log("Commands: '+' add subscriber, '-' remove subscriber, number emit event, 'q' quit.") + + try { + await new Promise((resolve) => { + rl.on("line", async (line) => { + const text = line.trim() + if (text === "q") { + rl.close() + resolve() + return + } + if (text === "+") { + add() + return + } + if (text === "-") { + await remove() + return + } + + const value = Number(text) + if (Number.isNaN(value)) { + console.log("use '+', '-', a number, or 'q'") + return + } + + await Bus.publish(NumberEvent, { value }) + }) + + rl.on("close", () => { + resolve() + }) + }) + } finally { + await Promise.all([...subs.values()].map((fiber) => EventStream.runtime.runPromise(Fiber.interrupt(fiber)))) + await EventStream.dispose() + } +} + +if (import.meta.main) { + void Instance.provide({ + directory: process.cwd(), + fn: () => main(), + }).catch(console.error) +} diff --git a/packages/opencode/test/filesystem/filesystem.test.ts b/packages/opencode/test/filesystem/filesystem.test.ts new file mode 100644 index 00000000000..ca73b3336bc --- /dev/null +++ b/packages/opencode/test/filesystem/filesystem.test.ts @@ -0,0 +1,319 @@ +import { describe, test, expect } from "bun:test" +import { Effect, Layer } from "effect" +import { NodeFileSystem } from "@effect/platform-node" +import { AppFileSystem } from "../../src/filesystem" +import { testEffect } from "../lib/effect" +import path from "path" + +const live = AppFileSystem.layer.pipe(Layer.provide(NodeFileSystem.layer)) +const { effect: it } = testEffect(live) + +describe("AppFileSystem", () => { + describe("isDir", () => { + it( + "returns true for directories", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + expect(yield* fs.isDir(tmp)).toBe(true) + }), + ) + + it( + "returns false for files", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "test.txt") + yield* fs.writeFileString(file, "hello") + expect(yield* fs.isDir(file)).toBe(false) + }), + ) + + it( + "returns false for non-existent paths", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + expect(yield* fs.isDir("/tmp/nonexistent-" + Math.random())).toBe(false) + }), + ) + }) + + describe("isFile", () => { + it( + "returns true for files", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "test.txt") + yield* fs.writeFileString(file, "hello") + expect(yield* fs.isFile(file)).toBe(true) + }), + ) + + it( + "returns false for directories", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + expect(yield* fs.isFile(tmp)).toBe(false) + }), + ) + }) + + describe("readJson / writeJson", () => { + it( + "round-trips JSON data", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "data.json") + const data = { name: "test", count: 42, nested: { ok: true } } + + yield* fs.writeJson(file, data) + const result = yield* fs.readJson(file) + + expect(result).toEqual(data) + }), + ) + }) + + describe("ensureDir", () => { + it( + "creates nested directories", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const nested = path.join(tmp, "a", "b", "c") + + yield* fs.ensureDir(nested) + + const info = yield* fs.stat(nested) + expect(info.type).toBe("Directory") + }), + ) + + it( + "is idempotent", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const dir = path.join(tmp, "existing") + yield* fs.makeDirectory(dir) + + yield* fs.ensureDir(dir) + + const info = yield* fs.stat(dir) + expect(info.type).toBe("Directory") + }), + ) + }) + + describe("writeWithDirs", () => { + it( + "creates parent directories if missing", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "deep", "nested", "file.txt") + + yield* fs.writeWithDirs(file, "hello") + + expect(yield* fs.readFileString(file)).toBe("hello") + }), + ) + + it( + "writes directly when parent exists", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "direct.txt") + + yield* fs.writeWithDirs(file, "world") + + expect(yield* fs.readFileString(file)).toBe("world") + }), + ) + + it( + "writes Uint8Array content", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "binary.bin") + const content = new Uint8Array([0x00, 0x01, 0x02, 0x03]) + + yield* fs.writeWithDirs(file, content) + + const result = yield* fs.readFile(file) + expect(new Uint8Array(result)).toEqual(content) + }), + ) + }) + + describe("findUp", () => { + it( + "finds target in start directory", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + yield* fs.writeFileString(path.join(tmp, "target.txt"), "found") + + const result = yield* fs.findUp("target.txt", tmp) + expect(result).toEqual([path.join(tmp, "target.txt")]) + }), + ) + + it( + "finds target in parent directories", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + yield* fs.writeFileString(path.join(tmp, "marker"), "root") + const child = path.join(tmp, "a", "b") + yield* fs.makeDirectory(child, { recursive: true }) + + const result = yield* fs.findUp("marker", child, tmp) + expect(result).toEqual([path.join(tmp, "marker")]) + }), + ) + + it( + "returns empty array when not found", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const result = yield* fs.findUp("nonexistent", tmp, tmp) + expect(result).toEqual([]) + }), + ) + }) + + describe("up", () => { + it( + "finds multiple targets walking up", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + yield* fs.writeFileString(path.join(tmp, "a.txt"), "a") + yield* fs.writeFileString(path.join(tmp, "b.txt"), "b") + const child = path.join(tmp, "sub") + yield* fs.makeDirectory(child) + yield* fs.writeFileString(path.join(child, "a.txt"), "a-child") + + const result = yield* fs.up({ targets: ["a.txt", "b.txt"], start: child, stop: tmp }) + + expect(result).toContain(path.join(child, "a.txt")) + expect(result).toContain(path.join(tmp, "a.txt")) + expect(result).toContain(path.join(tmp, "b.txt")) + }), + ) + }) + + describe("glob", () => { + it( + "finds files matching pattern", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + yield* fs.writeFileString(path.join(tmp, "a.ts"), "a") + yield* fs.writeFileString(path.join(tmp, "b.ts"), "b") + yield* fs.writeFileString(path.join(tmp, "c.json"), "c") + + const result = yield* fs.glob("*.ts", { cwd: tmp }) + expect(result.sort()).toEqual(["a.ts", "b.ts"]) + }), + ) + + it( + "supports absolute paths", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + yield* fs.writeFileString(path.join(tmp, "file.txt"), "hello") + + const result = yield* fs.glob("*.txt", { cwd: tmp, absolute: true }) + expect(result).toEqual([path.join(tmp, "file.txt")]) + }), + ) + }) + + describe("globMatch", () => { + it( + "matches patterns", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + expect(fs.globMatch("*.ts", "foo.ts")).toBe(true) + expect(fs.globMatch("*.ts", "foo.json")).toBe(false) + expect(fs.globMatch("src/**", "src/a/b.ts")).toBe(true) + }), + ) + }) + + describe("globUp", () => { + it( + "finds files walking up directories", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + yield* fs.writeFileString(path.join(tmp, "root.md"), "root") + const child = path.join(tmp, "a", "b") + yield* fs.makeDirectory(child, { recursive: true }) + yield* fs.writeFileString(path.join(child, "leaf.md"), "leaf") + + const result = yield* fs.globUp("*.md", child, tmp) + expect(result).toContain(path.join(child, "leaf.md")) + expect(result).toContain(path.join(tmp, "root.md")) + }), + ) + }) + + describe("built-in passthrough", () => { + it( + "exists works", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "exists.txt") + yield* fs.writeFileString(file, "yes") + + expect(yield* fs.exists(file)).toBe(true) + expect(yield* fs.exists(file + ".nope")).toBe(false) + }), + ) + + it( + "remove works", + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const tmp = yield* fs.makeTempDirectoryScoped() + const file = path.join(tmp, "delete-me.txt") + yield* fs.writeFileString(file, "bye") + + yield* fs.remove(file) + + expect(yield* fs.exists(file)).toBe(false) + }), + ) + }) + + describe("pure helpers", () => { + test("mimeType returns correct types", () => { + expect(AppFileSystem.mimeType("file.json")).toBe("application/json") + expect(AppFileSystem.mimeType("image.png")).toBe("image/png") + expect(AppFileSystem.mimeType("unknown.qzx")).toBe("application/octet-stream") + }) + + test("contains checks path containment", () => { + expect(AppFileSystem.contains("/a/b", "/a/b/c")).toBe(true) + expect(AppFileSystem.contains("/a/b", "/a/c")).toBe(false) + }) + + test("overlaps detects overlapping paths", () => { + expect(AppFileSystem.overlaps("/a/b", "/a/b/c")).toBe(true) + expect(AppFileSystem.overlaps("/a/b/c", "/a/b")).toBe(true) + expect(AppFileSystem.overlaps("/a", "/b")).toBe(false) + }) + }) +}) diff --git a/packages/opencode/test/tool/streaming-event.test.ts b/packages/opencode/test/tool/streaming-event.test.ts new file mode 100644 index 00000000000..162c752207d --- /dev/null +++ b/packages/opencode/test/tool/streaming-event.test.ts @@ -0,0 +1,75 @@ +import { afterAll, afterEach, describe, expect, test } from "bun:test" +import { Fiber, Stream } from "effect" +import z from "zod" +import { Bus } from "../../src/bus" +import { BusEvent } from "../../src/bus/bus-event" +import { Instance } from "../../src/project/instance" +import { EventStream } from "../../src/tool/streaming-event" +import { tmpdir } from "../fixture/fixture" + +const NumberEvent = BusEvent.define( + "test.streaming.number", + z.object({ + value: z.number(), + }), +) + +const onlyNumber = Stream.filter((event) => event.type === NumberEvent.type) + +describe("EventStream", () => { + afterEach(async () => { + await Instance.disposeAll() + }) + + afterAll(async () => { + await EventStream.dispose() + }) + + test("fans out one bus event to concurrent subscribers", async () => { + await using tmp = await tmpdir() + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const stream = await EventStream.eventStream() + const a = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) + const b = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) + + await Bun.sleep(10) + + await Bus.publish(NumberEvent, { value: 1 }) + + const left = await EventStream.runtime.runPromise(Fiber.join(a)) + const right = await EventStream.runtime.runPromise(Fiber.join(b)) + + expect(left).toEqual([{ type: "test.streaming.number", properties: { value: 1 } }]) + expect(right).toEqual([{ type: "test.streaming.number", properties: { value: 1 } }]) + }, + }) + }) + + test("reacquires after the last subscriber exits", async () => { + await using tmp = await tmpdir() + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const stream = await EventStream.eventStream() + + const a = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) + await Bun.sleep(10) + await Bus.publish(NumberEvent, { value: 1 }) + expect(await EventStream.runtime.runPromise(Fiber.join(a))).toEqual([ + { type: "test.streaming.number", properties: { value: 1 } }, + ]) + + const b = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) + await Bun.sleep(10) + await Bus.publish(NumberEvent, { value: 2 }) + expect(await EventStream.runtime.runPromise(Fiber.join(b))).toEqual([ + { type: "test.streaming.number", properties: { value: 2 } }, + ]) + }, + }) + }) +}) From 41f702e51c64025313128d6b734dee94f79b38a3 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 17:11:47 -0400 Subject: [PATCH 2/5] refactor(snapshot): use AppFileSystem instead of raw FileSystem Replace local filesystem helper wrappers (exists, mkdir, writeFile) with AppFileSystem.Service. Snapshot now yields the app-owned service and uses ensureDir, writeFileString directly with .orDie for write paths. Kept thin wrappers for readFile (empty fallback) and removeFile (swallow errors) since those are Snapshot-specific error handling. --- packages/opencode/package.json | 1 + packages/opencode/src/filesystem/index.ts | 3 +++ packages/opencode/src/snapshot/index.ts | 21 +++++++++++---------- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/packages/opencode/package.json b/packages/opencode/package.json index 4bdc3a963cd..7c037772748 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -6,6 +6,7 @@ "license": "MIT", "private": true, "scripts": { + "prepare": "effect-language-service patch", "typecheck": "tsgo --noEmit", "test": "bun test --timeout 30000", "build": "bun run script/build.ts", diff --git a/packages/opencode/src/filesystem/index.ts b/packages/opencode/src/filesystem/index.ts index 3c8c358fb8e..d8f7d6053e7 100644 --- a/packages/opencode/src/filesystem/index.ts +++ b/packages/opencode/src/filesystem/index.ts @@ -1,3 +1,4 @@ +import { NodeFileSystem } from "@effect/platform-node" import { dirname, join, relative, resolve as pathResolve } from "path" import { realpathSync } from "fs" import { lookup } from "mime-types" @@ -149,6 +150,8 @@ export namespace AppFileSystem { }), ) + export const defaultLayer = layer.pipe(Layer.provide(NodeFileSystem.layer)) + // Pure helpers that don't need Effect (path manipulation, sync operations) export function mimeType(p: string): string { return lookup(p) || "application/octet-stream" diff --git a/packages/opencode/src/snapshot/index.ts b/packages/opencode/src/snapshot/index.ts index 9f0eef56bc6..887bce33416 100644 --- a/packages/opencode/src/snapshot/index.ts +++ b/packages/opencode/src/snapshot/index.ts @@ -1,10 +1,11 @@ import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" -import { Cause, Duration, Effect, FileSystem, Layer, Schedule, ServiceMap, Stream } from "effect" +import { Cause, Duration, Effect, Layer, Schedule, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import path from "path" import z from "zod" import { InstanceContext } from "@/effect/instance-context" import { runPromiseInstance } from "@/effect/runtime" +import { AppFileSystem } from "@/filesystem" import { Config } from "../config/config" import { Global } from "../global" import { Log } from "../util/log" @@ -85,12 +86,12 @@ export namespace Snapshot { export const layer: Layer.Layer< Service, never, - InstanceContext | FileSystem.FileSystem | ChildProcessSpawner.ChildProcessSpawner + InstanceContext | AppFileSystem.Service | ChildProcessSpawner.ChildProcessSpawner > = Layer.effect( Service, Effect.gen(function* () { const ctx = yield* InstanceContext - const fs = yield* FileSystem.FileSystem + const fs = yield* AppFileSystem.Service const spawner = yield* ChildProcessSpawner.ChildProcessSpawner const directory = ctx.directory const worktree = ctx.worktree @@ -124,9 +125,8 @@ export namespace Snapshot { ), ) + // Snapshot-specific error handling on top of AppFileSystem const exists = (file: string) => fs.exists(file).pipe(Effect.orDie) - const mkdir = (dir: string) => fs.makeDirectory(dir, { recursive: true }).pipe(Effect.orDie) - const write = (file: string, text: string) => fs.writeFileString(file, text).pipe(Effect.orDie) const read = (file: string) => fs.readFileString(file).pipe(Effect.catch(() => Effect.succeed(""))) const remove = (file: string) => fs.remove(file).pipe(Effect.catch(() => Effect.void)) @@ -148,12 +148,12 @@ export namespace Snapshot { const sync = Effect.fnUntraced(function* () { const file = yield* excludes() const target = path.join(gitdir, "info", "exclude") - yield* mkdir(path.join(gitdir, "info")) + yield* fs.ensureDir(path.join(gitdir, "info")).pipe(Effect.orDie) if (!file) { - yield* write(target, "") + yield* fs.writeFileString(target, "").pipe(Effect.orDie) return } - yield* write(target, yield* read(file)) + yield* fs.writeFileString(target, yield* read(file)).pipe(Effect.orDie) }) const add = Effect.fnUntraced(function* () { @@ -178,7 +178,7 @@ export namespace Snapshot { const track = Effect.fn("Snapshot.track")(function* () { if (!(yield* enabled())) return const existed = yield* exists(gitdir) - yield* mkdir(gitdir) + yield* fs.ensureDir(gitdir).pipe(Effect.orDie) if (!existed) { yield* git(["init"], { env: { GIT_DIR: gitdir, GIT_WORK_TREE: worktree }, @@ -342,7 +342,8 @@ export namespace Snapshot { export const defaultLayer = layer.pipe( Layer.provide(NodeChildProcessSpawner.layer), - Layer.provide(NodeFileSystem.layer), + Layer.provide(AppFileSystem.defaultLayer), + Layer.provide(NodeFileSystem.layer), // needed by NodeChildProcessSpawner Layer.provide(NodePath.layer), ) } From 2f07653bcf31d12cb381ad27c304e4b32d7cc383 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 17:47:38 -0400 Subject: [PATCH 3/5] refactor: migrate Discovery and TruncateEffect to AppFileSystem - Discovery: use AppFileSystem.Service, writeWithDirs for downloads - TruncateEffect: use AppFileSystem.Service, ensureDir for output dir --- packages/opencode/src/skill/discovery.ts | 17 +++++++---------- packages/opencode/src/tool/truncate-effect.ts | 11 ++++++----- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/packages/opencode/src/skill/discovery.ts b/packages/opencode/src/skill/discovery.ts index e5279503d90..e1039750321 100644 --- a/packages/opencode/src/skill/discovery.ts +++ b/packages/opencode/src/skill/discovery.ts @@ -1,7 +1,8 @@ -import { NodeFileSystem, NodePath } from "@effect/platform-node" -import { Effect, FileSystem, Layer, Path, Schema, ServiceMap } from "effect" +import { NodePath } from "@effect/platform-node" +import { Effect, Layer, Path, Schema, ServiceMap } from "effect" import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" import { withTransientReadRetry } from "@/util/effect-http-client" +import { AppFileSystem } from "@/filesystem" import { Global } from "../global" import { Log } from "../util/log" @@ -24,12 +25,12 @@ export namespace Discovery { export class Service extends ServiceMap.Service()("@opencode/SkillDiscovery") {} - export const layer: Layer.Layer = + export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { const log = Log.create({ service: "skill-discovery" }) - const fs = yield* FileSystem.FileSystem + const fs = yield* AppFileSystem.Service const path = yield* Path.Path const http = HttpClient.filterStatusOk(withTransientReadRetry(yield* HttpClient.HttpClient)) const cache = path.join(Global.Path.cache, "skills") @@ -40,11 +41,7 @@ export namespace Discovery { return yield* HttpClientRequest.get(url).pipe( http.execute, Effect.flatMap((res) => res.arrayBuffer), - Effect.flatMap((body) => - fs - .makeDirectory(path.dirname(dest), { recursive: true }) - .pipe(Effect.flatMap(() => fs.writeFile(dest, new Uint8Array(body)))), - ), + Effect.flatMap((body) => fs.writeWithDirs(dest, new Uint8Array(body))), Effect.as(true), Effect.catch((err) => Effect.sync(() => { @@ -113,7 +110,7 @@ export namespace Discovery { export const defaultLayer: Layer.Layer = layer.pipe( Layer.provide(FetchHttpClient.layer), - Layer.provide(NodeFileSystem.layer), + Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodePath.layer), ) } diff --git a/packages/opencode/src/tool/truncate-effect.ts b/packages/opencode/src/tool/truncate-effect.ts index 60b9d0fa843..4431c18f839 100644 --- a/packages/opencode/src/tool/truncate-effect.ts +++ b/packages/opencode/src/tool/truncate-effect.ts @@ -1,7 +1,8 @@ -import { NodeFileSystem, NodePath } from "@effect/platform-node" -import { Cause, Duration, Effect, FileSystem, Layer, Schedule, ServiceMap } from "effect" +import { NodePath } from "@effect/platform-node" +import { Cause, Duration, Effect, Layer, Schedule, ServiceMap } from "effect" import path from "path" import type { Agent } from "../agent/agent" +import { AppFileSystem } from "@/filesystem" import { PermissionNext } from "../permission" import { Identifier } from "../id/id" import { Log } from "../util/log" @@ -44,7 +45,7 @@ export namespace TruncateEffect { export const layer = Layer.effect( Service, Effect.gen(function* () { - const fs = yield* FileSystem.FileSystem + const fs = yield* AppFileSystem.Service const cleanup = Effect.fn("Truncate.cleanup")(function* () { const cutoff = Identifier.timestamp(Identifier.create("tool", false, Date.now() - Duration.toMillis(RETENTION))) @@ -101,7 +102,7 @@ export namespace TruncateEffect { const preview = out.join("\n") const file = path.join(TRUNCATION_DIR, ToolID.ascending()) - yield* fs.makeDirectory(TRUNCATION_DIR, { recursive: true }).pipe(Effect.orDie) + yield* fs.ensureDir(TRUNCATION_DIR).pipe(Effect.orDie) yield* fs.writeFileString(file, text).pipe(Effect.orDie) const hint = hasTaskTool(agent) @@ -132,5 +133,5 @@ export namespace TruncateEffect { }), ) - export const defaultLayer = layer.pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer)) + export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodePath.layer)) } From 2f74c8c35e58fa61e0c5e5423edf8c227f0a99e4 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 19:12:03 -0400 Subject: [PATCH 4/5] fix: make effect-language-service patch non-fatal in prepare script --- packages/opencode/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opencode/package.json b/packages/opencode/package.json index 7c037772748..a6caca8adfb 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -6,7 +6,7 @@ "license": "MIT", "private": true, "scripts": { - "prepare": "effect-language-service patch", + "prepare": "effect-language-service patch || true", "typecheck": "tsgo --noEmit", "test": "bun test --timeout 30000", "build": "bun run script/build.ts", From f74294989715ae32d40104818946416f565802f7 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 19:34:02 -0400 Subject: [PATCH 5/5] chore: remove accidentally committed streaming-event files --- packages/opencode/src/tool/streaming-event.ts | 161 ------------------ .../test/tool/streaming-event.test.ts | 75 -------- 2 files changed, 236 deletions(-) delete mode 100644 packages/opencode/src/tool/streaming-event.ts delete mode 100644 packages/opencode/test/tool/streaming-event.test.ts diff --git a/packages/opencode/src/tool/streaming-event.ts b/packages/opencode/src/tool/streaming-event.ts deleted file mode 100644 index c5b12321a03..00000000000 --- a/packages/opencode/src/tool/streaming-event.ts +++ /dev/null @@ -1,161 +0,0 @@ -import { Bus } from "@/bus" -import { BusEvent } from "@/bus/bus-event" -import { Instance } from "@/project/instance" -import { Effect, Fiber, Layer, ManagedRuntime, Queue, ServiceMap, Stream } from "effect" -import * as readline from "node:readline" -import z from "zod" -import { Log } from "../util/log" - -export namespace EventStream { - const log = Log.create({ service: "streaming-event" }) - - export interface Event { - readonly type: string - readonly properties: unknown - } - - export interface Api { - readonly eventStream: () => Stream.Stream - } - - export class Service extends ServiceMap.Service()("@opencode/EventStream") {} - - export const layer = Layer.effect( - Service, - Effect.gen(function* () { - // Bridge callback-based Bus events into a Stream, but only while the - // shared stream has at least one active consumer. - const stream = yield* Stream.callback((queue) => - Effect.acquireRelease( - Effect.sync(() => { - log.info("subscribe all created") - return Bus.subscribeAll((event) => { - log.info("event", event) - Queue.offerUnsafe(queue, event) - }) - }), - (unsub) => - Effect.sync(() => { - log.info("subscribe all released") - unsub() - }), - ), - ).pipe(Stream.share({ capacity: "unbounded" })) - - return Service.of({ - eventStream: () => stream, - }) - }), - ) - - export const defaultLayer = layer - export const runtime = ManagedRuntime.make(defaultLayer) - - export function eventStream() { - return runtime.runPromise(Service.use((svc) => Effect.succeed(svc.eventStream()))) - } - - export function dispose() { - return runtime.dispose() - } -} - -const NumberEvent = BusEvent.define( - "test.number", - z.object({ - value: z.number(), - }), -) - -function consumer(name: string, stream: Stream.Stream) { - // Each consumer runs independently, but Stream.share ensures they all fan - // out from a single underlying Bus subscription. - return EventStream.runtime.runFork( - stream.pipe( - Stream.tap((event) => Effect.sync(() => console.log(`[${name}]`, event))), - Stream.runDrain, - ), - ) -} - -async function main() { - // The promise API mirrors how this service would be consumed from a server: - // get a stream once, then attach and detach listeners over time. - const stream = await EventStream.eventStream() - const subs = new Map>() - let id = 0 - - const add = () => { - id += 1 - const name = `consumer-${id}` - subs.set(name, consumer(name, stream)) - console.log(`added ${name}`) - } - - const remove = async () => { - const name = [...subs.keys()].at(-1) - if (!name) { - console.log("no subscribers") - return - } - const fiber = subs.get(name) - subs.delete(name) - if (fiber) await EventStream.runtime.runPromise(Fiber.interrupt(fiber)) - console.log(`removed ${name}`) - } - - add() - add() - - const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout, - }) - - // `+` and `-` let us verify the subscribe / release logs as the ref-count - // moves between zero and one-or-more consumers. - console.log("Commands: '+' add subscriber, '-' remove subscriber, number emit event, 'q' quit.") - - try { - await new Promise((resolve) => { - rl.on("line", async (line) => { - const text = line.trim() - if (text === "q") { - rl.close() - resolve() - return - } - if (text === "+") { - add() - return - } - if (text === "-") { - await remove() - return - } - - const value = Number(text) - if (Number.isNaN(value)) { - console.log("use '+', '-', a number, or 'q'") - return - } - - await Bus.publish(NumberEvent, { value }) - }) - - rl.on("close", () => { - resolve() - }) - }) - } finally { - await Promise.all([...subs.values()].map((fiber) => EventStream.runtime.runPromise(Fiber.interrupt(fiber)))) - await EventStream.dispose() - } -} - -if (import.meta.main) { - void Instance.provide({ - directory: process.cwd(), - fn: () => main(), - }).catch(console.error) -} diff --git a/packages/opencode/test/tool/streaming-event.test.ts b/packages/opencode/test/tool/streaming-event.test.ts deleted file mode 100644 index 162c752207d..00000000000 --- a/packages/opencode/test/tool/streaming-event.test.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { afterAll, afterEach, describe, expect, test } from "bun:test" -import { Fiber, Stream } from "effect" -import z from "zod" -import { Bus } from "../../src/bus" -import { BusEvent } from "../../src/bus/bus-event" -import { Instance } from "../../src/project/instance" -import { EventStream } from "../../src/tool/streaming-event" -import { tmpdir } from "../fixture/fixture" - -const NumberEvent = BusEvent.define( - "test.streaming.number", - z.object({ - value: z.number(), - }), -) - -const onlyNumber = Stream.filter((event) => event.type === NumberEvent.type) - -describe("EventStream", () => { - afterEach(async () => { - await Instance.disposeAll() - }) - - afterAll(async () => { - await EventStream.dispose() - }) - - test("fans out one bus event to concurrent subscribers", async () => { - await using tmp = await tmpdir() - - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const stream = await EventStream.eventStream() - const a = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) - const b = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) - - await Bun.sleep(10) - - await Bus.publish(NumberEvent, { value: 1 }) - - const left = await EventStream.runtime.runPromise(Fiber.join(a)) - const right = await EventStream.runtime.runPromise(Fiber.join(b)) - - expect(left).toEqual([{ type: "test.streaming.number", properties: { value: 1 } }]) - expect(right).toEqual([{ type: "test.streaming.number", properties: { value: 1 } }]) - }, - }) - }) - - test("reacquires after the last subscriber exits", async () => { - await using tmp = await tmpdir() - - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const stream = await EventStream.eventStream() - - const a = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) - await Bun.sleep(10) - await Bus.publish(NumberEvent, { value: 1 }) - expect(await EventStream.runtime.runPromise(Fiber.join(a))).toEqual([ - { type: "test.streaming.number", properties: { value: 1 } }, - ]) - - const b = EventStream.runtime.runFork(stream.pipe(onlyNumber, Stream.take(1), Stream.runCollect)) - await Bun.sleep(10) - await Bus.publish(NumberEvent, { value: 2 }) - expect(await EventStream.runtime.runPromise(Fiber.join(b))).toEqual([ - { type: "test.streaming.number", properties: { value: 2 } }, - ]) - }, - }) - }) -})