diff --git a/CHANGELOG.md b/CHANGELOG.md index a560f5b3e03..2e64738e392 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ Docs: https://docs.openclaw.ai - Matrix/exec approvals: clarify unavailable-approval replies so Matrix no longer claims chat approvals are unsupported when native exec approvals are merely unconfigured. (#61424) Thanks @gumadeiras. - Docs/IRC: replace public IRC hostname examples with `irc.example.com` and recommend private servers for bot coordination while listing common public networks for intentional use. - Memory/dreaming: write dreaming trail content to top-level `DREAMS.md` instead of daily memory notes, update `/dreaming` help text to point there, and keep `DREAMS.md` available for explicit reads without pulling it into default recall. Thanks @davemorin. +- Plugins/Lobster: run bundled Lobster workflows in process instead of spawning the external CLI, reducing transport overhead and unblocking native runtime integration. (#61523) Thanks @mbelinky. ### Fixes diff --git a/extensions/lobster/package.json b/extensions/lobster/package.json index e070faf1ec0..1918a55ea67 100644 --- a/extensions/lobster/package.json +++ b/extensions/lobster/package.json @@ -4,6 +4,7 @@ "description": "Lobster workflow tool plugin (typed pipelines + resumable approvals)", "type": "module", "dependencies": { + "@clawdbot/lobster": "2026.1.24", "@sinclair/typebox": "0.34.49" }, "openclaw": { diff --git a/extensions/lobster/src/lobster-runner.test.ts b/extensions/lobster/src/lobster-runner.test.ts new file mode 100644 index 00000000000..28c910c9292 --- /dev/null +++ b/extensions/lobster/src/lobster-runner.test.ts @@ -0,0 +1,286 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createEmbeddedLobsterRunner, resolveLobsterCwd } from "./lobster-runner.js"; + +describe("resolveLobsterCwd", () => { + it("defaults to the current working directory", () => { + expect(resolveLobsterCwd(undefined)).toBe(process.cwd()); + }); + + it("keeps relative paths inside the repo root", () => { + expect(resolveLobsterCwd("extensions/lobster")).toBe( + path.resolve(process.cwd(), "extensions/lobster"), + ); + }); +}); + +describe("createEmbeddedLobsterRunner", () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("runs inline pipelines through the embedded runtime", async () => { + const runtime = { + runToolRequest: vi.fn().mockResolvedValue({ + ok: true, + protocolVersion: 1, + status: "ok", + output: [{ hello: "world" }], + requiresApproval: null, + }), + resumeToolRequest: vi.fn(), + }; + + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue(runtime), + }); + + const envelope = await runner.run({ + action: "run", + pipeline: "exec --json=true echo hi", + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }); + + expect(runtime.runToolRequest).toHaveBeenCalledTimes(1); + expect(runtime.runToolRequest).toHaveBeenCalledWith({ + pipeline: "exec --json=true echo hi", + ctx: expect.objectContaining({ + cwd: process.cwd(), + mode: "tool", + signal: expect.any(AbortSignal), + }), + }); + expect(envelope).toEqual({ + ok: true, + status: "ok", + output: [{ hello: "world" }], + requiresApproval: null, + }); + }); + + it("detects workflow files and parses argsJson", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-runner-")); + const workflowPath = path.join(tempDir, "workflow.lobster"); + await fs.writeFile(workflowPath, "steps: []\n", "utf8"); + + try { + const runtime = { + runToolRequest: vi.fn().mockResolvedValue({ + ok: true, + protocolVersion: 1, + status: "ok", + output: [], + requiresApproval: null, + }), + resumeToolRequest: vi.fn(), + }; + + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue(runtime), + }); + + await runner.run({ + action: "run", + pipeline: "workflow.lobster", + argsJson: '{"limit":3}', + cwd: tempDir, + timeoutMs: 2000, + maxStdoutBytes: 4096, + }); + + expect(runtime.runToolRequest).toHaveBeenCalledWith({ + filePath: workflowPath, + args: { limit: 3 }, + ctx: expect.objectContaining({ + cwd: tempDir, + mode: "tool", + }), + }); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("returns a parse error when workflow args are invalid JSON", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-runner-")); + const workflowPath = path.join(tempDir, "workflow.lobster"); + await fs.writeFile(workflowPath, "steps: []\n", "utf8"); + + try { + const runtime = { + runToolRequest: vi.fn(), + resumeToolRequest: vi.fn(), + }; + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue(runtime), + }); + + await expect( + runner.run({ + action: "run", + pipeline: "workflow.lobster", + argsJson: "{bad", + cwd: tempDir, + timeoutMs: 2000, + maxStdoutBytes: 4096, + }), + ).rejects.toThrow("run --args-json must be valid JSON"); + expect(runtime.runToolRequest).not.toHaveBeenCalled(); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("throws when the embedded runtime returns an error envelope", async () => { + const runtime = { + runToolRequest: vi.fn().mockResolvedValue({ + ok: false, + protocolVersion: 1, + error: { + type: "runtime_error", + message: "boom", + }, + }), + resumeToolRequest: vi.fn(), + }; + + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue(runtime), + }); + + await expect( + runner.run({ + action: "run", + pipeline: "exec --json=true echo hi", + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }), + ).rejects.toThrow("boom"); + }); + + it("routes resume through the embedded runtime", async () => { + const runtime = { + runToolRequest: vi.fn(), + resumeToolRequest: vi.fn().mockResolvedValue({ + ok: true, + protocolVersion: 1, + status: "cancelled", + output: [], + requiresApproval: null, + }), + }; + + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue(runtime), + }); + + const envelope = await runner.run({ + action: "resume", + token: "resume-token", + approve: false, + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }); + + expect(runtime.resumeToolRequest).toHaveBeenCalledWith({ + token: "resume-token", + approved: false, + ctx: expect.objectContaining({ + cwd: process.cwd(), + mode: "tool", + signal: expect.any(AbortSignal), + }), + }); + expect(envelope).toEqual({ + ok: true, + status: "cancelled", + output: [], + requiresApproval: null, + }); + }); + + it("requires a pipeline for run", async () => { + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue({ + runToolRequest: vi.fn(), + resumeToolRequest: vi.fn(), + }), + }); + + await expect( + runner.run({ + action: "run", + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }), + ).rejects.toThrow(/pipeline required/); + }); + + it("requires token and approve for resume", async () => { + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue({ + runToolRequest: vi.fn(), + resumeToolRequest: vi.fn(), + }), + }); + + await expect( + runner.run({ + action: "resume", + approve: true, + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }), + ).rejects.toThrow(/token required/); + + await expect( + runner.run({ + action: "resume", + token: "resume-token", + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }), + ).rejects.toThrow(/approve required/); + }); + + it("aborts long-running embedded work", async () => { + const runtime = { + runToolRequest: vi.fn( + async ({ ctx }: { ctx?: { signal?: AbortSignal } }) => + await new Promise((resolve, reject) => { + ctx?.signal?.addEventListener("abort", () => { + reject(ctx.signal?.reason ?? new Error("aborted")); + }); + setTimeout( + () => resolve({ ok: true, status: "ok", output: [], requiresApproval: null }), + 500, + ); + }), + ), + resumeToolRequest: vi.fn(), + }; + + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue(runtime), + }); + + await expect( + runner.run({ + action: "run", + pipeline: "exec --json=true echo hi", + cwd: process.cwd(), + timeoutMs: 200, + maxStdoutBytes: 4096, + }), + ).rejects.toThrow(/timed out|aborted/); + }); +}); diff --git a/extensions/lobster/src/lobster-runner.ts b/extensions/lobster/src/lobster-runner.ts new file mode 100644 index 00000000000..93598594e12 --- /dev/null +++ b/extensions/lobster/src/lobster-runner.ts @@ -0,0 +1,728 @@ +import { randomUUID } from "node:crypto"; +import { createRequire } from "node:module"; +import path from "node:path"; +import { Readable, Writable } from "node:stream"; +import { pathToFileURL } from "node:url"; + +export type LobsterEnvelope = + | { + ok: true; + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval: null | { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; + }; + } + | { + ok: false; + error: { type?: string; message: string }; + }; + +export type LobsterRunnerParams = { + action: "run" | "resume"; + pipeline?: string; + argsJson?: string; + token?: string; + approve?: boolean; + cwd: string; + timeoutMs: number; + maxStdoutBytes: number; +}; + +export type LobsterRunner = { + run: (params: LobsterRunnerParams) => Promise; +}; + +type EmbeddedToolContext = { + cwd?: string; + env?: Record; + mode?: "tool" | "human" | "sdk"; + stdin?: NodeJS.ReadableStream; + stdout?: NodeJS.WritableStream; + stderr?: NodeJS.WritableStream; + signal?: AbortSignal; + registry?: unknown; + llmAdapters?: Record; +}; + +type EmbeddedToolEnvelope = { + protocolVersion?: number; + ok: boolean; + status?: "ok" | "needs_approval" | "cancelled"; + output?: unknown[]; + requiresApproval?: { + type?: "approval_request"; + prompt: string; + items: unknown[]; + preview?: string; + resumeToken?: string; + } | null; + error?: { + type?: string; + message: string; + }; +}; + +type EmbeddedToolRuntime = { + runToolRequest: (params: { + pipeline?: string; + filePath?: string; + args?: Record; + ctx?: EmbeddedToolContext; + }) => Promise; + resumeToolRequest: (params: { + token: string; + approved: boolean; + ctx?: EmbeddedToolContext; + }) => Promise; +}; + +type ToolRuntimeDeps = { + createDefaultRegistry: () => unknown; + parsePipeline: (pipeline: string) => Array<{ + name: string; + args: Record; + raw: string; + }>; + decodeResumeToken: (token: string) => { + kind?: string; + stateKey?: string; + filePath?: string; + }; + encodeToken: (payload: Record) => string; + runPipeline: (params: { + pipeline: Array<{ name: string; args: Record; raw: string }>; + registry: unknown; + input: AsyncIterable | unknown[]; + stdin: NodeJS.ReadableStream; + stdout: NodeJS.WritableStream; + stderr: NodeJS.WritableStream; + env: Record; + mode: "tool"; + cwd: string; + llmAdapters?: Record; + signal?: AbortSignal; + }) => Promise<{ + items: unknown[]; + halted?: boolean; + haltedAt?: { index?: number }; + }>; + runWorkflowFile: (params: { + filePath: string; + args?: Record; + ctx: EmbeddedToolContext; + resume?: Record; + approved?: boolean; + }) => Promise<{ + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval?: EmbeddedToolEnvelope["requiresApproval"]; + }>; + readStateJson: (params: { + env: Record; + key: string; + }) => Promise; + writeStateJson: (params: { + env: Record; + key: string; + value: unknown; + }) => Promise; + deleteStateJson: (params: { + env: Record; + key: string; + }) => Promise; +}; + +type PipelineResumeState = { + pipeline: Array<{ name: string; args: Record; raw: string }>; + resumeAtIndex: number; + items: unknown[]; + prompt?: string; + createdAt: string; +}; + +type LoadEmbeddedToolRuntime = () => Promise; + +type ApprovalRequestItem = { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; +}; + +function normalizeForCwdSandbox(p: string): string { + const normalized = path.normalize(p); + return process.platform === "win32" ? normalized.toLowerCase() : normalized; +} + +export function resolveLobsterCwd(cwdRaw: unknown): string { + if (typeof cwdRaw !== "string" || !cwdRaw.trim()) { + return process.cwd(); + } + const cwd = cwdRaw.trim(); + if (path.isAbsolute(cwd)) { + throw new Error("cwd must be a relative path"); + } + const base = process.cwd(); + const resolved = path.resolve(base, cwd); + + const rel = path.relative(normalizeForCwdSandbox(base), normalizeForCwdSandbox(resolved)); + if (rel === "" || rel === ".") { + return resolved; + } + if (rel.startsWith("..") || path.isAbsolute(rel)) { + throw new Error("cwd must stay within the gateway working directory"); + } + return resolved; +} + +function createLimitedSink(maxBytes: number, label: "stdout" | "stderr") { + let bytes = 0; + return new Writable({ + write(chunk, _encoding, callback) { + bytes += Buffer.byteLength(String(chunk), "utf8"); + if (bytes > maxBytes) { + callback(new Error(`lobster ${label} exceeded maxStdoutBytes`)); + return; + } + callback(); + }, + }); +} + +function normalizeEnvelope(envelope: EmbeddedToolEnvelope): LobsterEnvelope { + if (envelope.ok) { + return { + ok: true, + status: envelope.status ?? "ok", + output: Array.isArray(envelope.output) ? envelope.output : [], + requiresApproval: envelope.requiresApproval + ? { + type: "approval_request", + prompt: envelope.requiresApproval.prompt, + items: envelope.requiresApproval.items, + ...(envelope.requiresApproval.resumeToken + ? { resumeToken: envelope.requiresApproval.resumeToken } + : {}), + } + : null, + }; + } + return { + ok: false, + error: { + type: envelope.error?.type, + message: envelope.error?.message ?? "lobster runtime failed", + }, + }; +} + +function throwOnErrorEnvelope(envelope: LobsterEnvelope): Extract { + if (envelope.ok) { + return envelope; + } + throw new Error(envelope.error.message); +} + +function asApprovalRequestItem(item: unknown): ApprovalRequestItem | null { + if (!item || typeof item !== "object") { + return null; + } + const candidate = item as Partial; + if (candidate.type !== "approval_request") { + return null; + } + if (typeof candidate.prompt !== "string" || !Array.isArray(candidate.items)) { + return null; + } + return candidate as ApprovalRequestItem; +} + +async function resolveWorkflowFile(candidate: string, cwd: string) { + const { stat } = await import("node:fs/promises"); + const resolved = path.isAbsolute(candidate) ? candidate : path.resolve(cwd, candidate); + const fileStat = await stat(resolved); + if (!fileStat.isFile()) { + throw new Error("Workflow path is not a file"); + } + const ext = path.extname(resolved).toLowerCase(); + if (![".lobster", ".yaml", ".yml", ".json"].includes(ext)) { + throw new Error("Workflow file must end in .lobster, .yaml, .yml, or .json"); + } + return resolved; +} + +async function detectWorkflowFile(candidate: string, cwd: string) { + const trimmed = candidate.trim(); + if (!trimmed || trimmed.includes("|")) { + return null; + } + try { + return await resolveWorkflowFile(trimmed, cwd); + } catch { + return null; + } +} + +function parseWorkflowArgs(argsJson: string) { + return JSON.parse(argsJson) as Record; +} + +function createEmbeddedToolContext( + params: LobsterRunnerParams, + signal?: AbortSignal, +): EmbeddedToolContext { + const env = { ...process.env } as Record; + return { + cwd: params.cwd, + env, + mode: "tool", + stdin: Readable.from([]), + stdout: createLimitedSink(Math.max(1024, params.maxStdoutBytes), "stdout"), + stderr: createLimitedSink(Math.max(1024, params.maxStdoutBytes), "stderr"), + signal, + }; +} + +async function withTimeout( + timeoutMs: number, + fn: (signal?: AbortSignal) => Promise, +): Promise { + const timeout = Math.max(200, timeoutMs); + const controller = new AbortController(); + return await new Promise((resolve, reject) => { + const onTimeout = () => { + const error = new Error("lobster runtime timed out"); + controller.abort(error); + reject(error); + }; + + const timer = setTimeout(onTimeout, timeout); + void fn(controller.signal).then( + (value) => { + clearTimeout(timer); + resolve(value); + }, + (error) => { + clearTimeout(timer); + reject(error); + }, + ); + }); +} + +function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolRuntime { + const createToolContext = (ctx: EmbeddedToolContext = {}) => ({ + cwd: ctx.cwd ?? process.cwd(), + env: { ...process.env, ...ctx.env }, + mode: "tool" as const, + stdin: ctx.stdin ?? Readable.from([]), + stdout: ctx.stdout ?? createLimitedSink(512_000, "stdout"), + stderr: ctx.stderr ?? createLimitedSink(512_000, "stderr"), + signal: ctx.signal, + registry: ctx.registry ?? deps.createDefaultRegistry(), + llmAdapters: ctx.llmAdapters, + }); + + const okEnvelope = ( + status: "ok" | "needs_approval" | "cancelled", + output: unknown[], + requiresApproval: EmbeddedToolEnvelope["requiresApproval"], + ): EmbeddedToolEnvelope => ({ + protocolVersion: 1, + ok: true, + status, + output, + requiresApproval, + }); + + const errorEnvelope = (type: string, message: string): EmbeddedToolEnvelope => ({ + protocolVersion: 1, + ok: false, + error: { type, message }, + }); + + const streamFromItems = (items: unknown[]) => + (async function* () { + for (const item of items) { + yield item; + } + })(); + + const savePipelineResumeState = async ( + env: Record, + state: PipelineResumeState, + ) => { + const stateKey = `pipeline_resume_${randomUUID()}`; + await deps.writeStateJson({ env, key: stateKey, value: state }); + return stateKey; + }; + + const loadPipelineResumeState = async ( + env: Record, + stateKey: string, + ) => { + const stored = await deps.readStateJson({ env, key: stateKey }); + if (!stored || typeof stored !== "object") { + throw new Error("Pipeline resume state not found"); + } + const data = stored as Partial; + if (!Array.isArray(data.pipeline)) { + throw new Error("Invalid pipeline resume state"); + } + if (typeof data.resumeAtIndex !== "number") { + throw new Error("Invalid pipeline resume state"); + } + if (!Array.isArray(data.items)) { + throw new Error("Invalid pipeline resume state"); + } + return data as PipelineResumeState; + }; + + return { + async runToolRequest({ pipeline, filePath, args, ctx = {} }) { + const runtime = createToolContext(ctx); + const hasPipeline = typeof pipeline === "string" && pipeline.trim().length > 0; + const hasFile = typeof filePath === "string" && filePath.trim().length > 0; + + if (!hasPipeline && !hasFile) { + return errorEnvelope("parse_error", "run requires either pipeline or filePath"); + } + if (hasPipeline && hasFile) { + return errorEnvelope("parse_error", "run accepts either pipeline or filePath, not both"); + } + + if (hasFile) { + try { + const output = await deps.runWorkflowFile({ + filePath: filePath!, + args, + ctx: runtime, + }); + + if (output.status === "needs_approval") { + return okEnvelope("needs_approval", [], output.requiresApproval ?? null); + } + if (output.status === "cancelled") { + return okEnvelope("cancelled", [], null); + } + return okEnvelope("ok", output.output, null); + } catch (error) { + return errorEnvelope( + "runtime_error", + error instanceof Error ? error.message : String(error), + ); + } + } + + let parsed; + try { + parsed = deps.parsePipeline(String(pipeline)); + } catch (error) { + return errorEnvelope("parse_error", error instanceof Error ? error.message : String(error)); + } + + try { + const output = await deps.runPipeline({ + pipeline: parsed, + registry: runtime.registry, + input: [], + stdin: runtime.stdin!, + stdout: runtime.stdout!, + stderr: runtime.stderr!, + env: runtime.env, + mode: "tool", + cwd: runtime.cwd, + llmAdapters: runtime.llmAdapters, + signal: runtime.signal, + }); + + const approval = + output.halted && output.items.length === 1 + ? asApprovalRequestItem(output.items[0]) + : null; + + if (approval) { + const stateKey = await savePipelineResumeState(runtime.env, { + pipeline: parsed, + resumeAtIndex: (output.haltedAt?.index ?? -1) + 1, + items: approval.items, + prompt: approval.prompt, + createdAt: new Date().toISOString(), + }); + + const resumeToken = deps.encodeToken({ + protocolVersion: 1, + v: 1, + kind: "pipeline-resume", + stateKey, + }); + + return okEnvelope("needs_approval", [], { + type: "approval_request", + prompt: approval.prompt, + items: approval.items, + resumeToken, + }); + } + + return okEnvelope("ok", output.items, null); + } catch (error) { + return errorEnvelope( + "runtime_error", + error instanceof Error ? error.message : String(error), + ); + } + }, + + async resumeToolRequest({ token, approved, ctx = {} }) { + const runtime = createToolContext(ctx); + let payload: { kind?: string; stateKey?: string; filePath?: string }; + + try { + payload = deps.decodeResumeToken(token); + } catch (error) { + return errorEnvelope("parse_error", error instanceof Error ? error.message : String(error)); + } + + if (!approved) { + if (payload.kind === "workflow-file" && payload.stateKey) { + await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); + } + if (payload.kind === "pipeline-resume" && payload.stateKey) { + await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); + } + return okEnvelope("cancelled", [], null); + } + + if (payload.kind === "workflow-file" && payload.filePath) { + try { + const output = await deps.runWorkflowFile({ + filePath: payload.filePath, + ctx: runtime, + resume: payload as Record, + approved: true, + }); + + if (output.status === "needs_approval") { + return okEnvelope("needs_approval", [], output.requiresApproval ?? null); + } + if (output.status === "cancelled") { + return okEnvelope("cancelled", [], null); + } + return okEnvelope("ok", output.output, null); + } catch (error) { + return errorEnvelope( + "runtime_error", + error instanceof Error ? error.message : String(error), + ); + } + } + + try { + const resumeState = await loadPipelineResumeState(runtime.env, payload.stateKey ?? ""); + const remaining = resumeState.pipeline.slice(resumeState.resumeAtIndex); + + const output = await deps.runPipeline({ + pipeline: remaining, + registry: runtime.registry, + input: streamFromItems(resumeState.items), + stdin: runtime.stdin!, + stdout: runtime.stdout!, + stderr: runtime.stderr!, + env: runtime.env, + mode: "tool", + cwd: runtime.cwd, + llmAdapters: runtime.llmAdapters, + signal: runtime.signal, + }); + + const approval = + output.halted && output.items.length === 1 + ? asApprovalRequestItem(output.items[0]) + : null; + + if (approval) { + const nextStateKey = await savePipelineResumeState(runtime.env, { + pipeline: remaining, + resumeAtIndex: (output.haltedAt?.index ?? -1) + 1, + items: approval.items, + prompt: approval.prompt, + createdAt: new Date().toISOString(), + }); + if (payload.stateKey) { + await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); + } + + const resumeToken = deps.encodeToken({ + protocolVersion: 1, + v: 1, + kind: "pipeline-resume", + stateKey: nextStateKey, + }); + + return okEnvelope("needs_approval", [], { + type: "approval_request", + prompt: approval.prompt, + items: approval.items, + resumeToken, + }); + } + + if (payload.stateKey) { + await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); + } + return okEnvelope("ok", output.items, null); + } catch (error) { + return errorEnvelope( + "runtime_error", + error instanceof Error ? error.message : String(error), + ); + } + }, + }; +} + +async function importInstalledLobsterModule( + lobsterRoot: string, + relativePath: string, +): Promise { + const target = path.join(lobsterRoot, relativePath); + return (await import(pathToFileURL(target).href)) as T; +} + +function resolveInstalledLobsterRoot() { + const require = createRequire(import.meta.url); + const sdkEntry = require.resolve("@clawdbot/lobster"); + return path.resolve(path.dirname(sdkEntry), "../../.."); +} + +async function loadEmbeddedToolRuntimeFromPackage(): Promise { + const lobsterRoot = resolveInstalledLobsterRoot(); + const coreIndexPath = path.join(lobsterRoot, "dist/src/core/index.js"); + + try { + const core = await import(pathToFileURL(coreIndexPath).href); + if (typeof core.runToolRequest === "function" && typeof core.resumeToolRequest === "function") { + return { + runToolRequest: core.runToolRequest as EmbeddedToolRuntime["runToolRequest"], + resumeToolRequest: core.resumeToolRequest as EmbeddedToolRuntime["resumeToolRequest"], + }; + } + } catch { + // The current published npm package does not export/ship ./core yet. + } + + const [ + registryModule, + parserModule, + resumeModule, + tokenModule, + runtimeModule, + workflowModule, + storeModule, + ] = await Promise.all([ + importInstalledLobsterModule<{ + createDefaultRegistry: ToolRuntimeDeps["createDefaultRegistry"]; + }>(lobsterRoot, "dist/src/commands/registry.js"), + importInstalledLobsterModule<{ parsePipeline: ToolRuntimeDeps["parsePipeline"] }>( + lobsterRoot, + "dist/src/parser.js", + ), + importInstalledLobsterModule<{ decodeResumeToken: ToolRuntimeDeps["decodeResumeToken"] }>( + lobsterRoot, + "dist/src/resume.js", + ), + importInstalledLobsterModule<{ encodeToken: ToolRuntimeDeps["encodeToken"] }>( + lobsterRoot, + "dist/src/token.js", + ), + importInstalledLobsterModule<{ runPipeline: ToolRuntimeDeps["runPipeline"] }>( + lobsterRoot, + "dist/src/runtime.js", + ), + importInstalledLobsterModule<{ runWorkflowFile: ToolRuntimeDeps["runWorkflowFile"] }>( + lobsterRoot, + "dist/src/workflows/file.js", + ), + importInstalledLobsterModule<{ + readStateJson: ToolRuntimeDeps["readStateJson"]; + writeStateJson: ToolRuntimeDeps["writeStateJson"]; + deleteStateJson: ToolRuntimeDeps["deleteStateJson"]; + }>(lobsterRoot, "dist/src/state/store.js"), + ]); + + return createFallbackEmbeddedToolRuntime({ + createDefaultRegistry: registryModule.createDefaultRegistry, + parsePipeline: parserModule.parsePipeline, + decodeResumeToken: resumeModule.decodeResumeToken, + encodeToken: tokenModule.encodeToken, + runPipeline: runtimeModule.runPipeline, + runWorkflowFile: workflowModule.runWorkflowFile, + readStateJson: storeModule.readStateJson, + writeStateJson: storeModule.writeStateJson, + deleteStateJson: storeModule.deleteStateJson, + }); +} + +export function createEmbeddedLobsterRunner(options?: { + loadRuntime?: LoadEmbeddedToolRuntime; +}): LobsterRunner { + const loadRuntime = options?.loadRuntime ?? loadEmbeddedToolRuntimeFromPackage; + return { + async run(params) { + const runtime = await loadRuntime(); + return await withTimeout(params.timeoutMs, async (signal) => { + const ctx = createEmbeddedToolContext(params, signal); + + if (params.action === "run") { + const pipeline = params.pipeline?.trim() ?? ""; + if (!pipeline) { + throw new Error("pipeline required"); + } + + const filePath = await detectWorkflowFile(pipeline, params.cwd); + if (filePath) { + const parsedArgsJson = params.argsJson?.trim() ?? ""; + let args: Record | undefined; + if (parsedArgsJson) { + try { + args = parseWorkflowArgs(parsedArgsJson); + } catch { + throw new Error("run --args-json must be valid JSON"); + } + } + return throwOnErrorEnvelope( + normalizeEnvelope(await runtime.runToolRequest({ filePath, args, ctx })), + ); + } + + return throwOnErrorEnvelope( + normalizeEnvelope(await runtime.runToolRequest({ pipeline, ctx })), + ); + } + + const token = params.token?.trim() ?? ""; + if (!token) { + throw new Error("token required"); + } + if (typeof params.approve !== "boolean") { + throw new Error("approve required"); + } + + return throwOnErrorEnvelope( + normalizeEnvelope( + await runtime.resumeToolRequest({ + token, + approved: params.approve, + ctx, + }), + ), + ); + }); + }, + }; +} diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 31b58a6bb02..7e56b57ec7f 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -1,31 +1,6 @@ -import { EventEmitter } from "node:events"; -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { PassThrough } from "node:stream"; -import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { createTestPluginApi } from "../../../test/helpers/plugins/plugin-api.js"; import type { OpenClawPluginApi, OpenClawPluginToolContext } from "../runtime-api.js"; -import { - createWindowsCmdShimFixture, - restorePlatformPathEnv, - setProcessPlatform, - snapshotPlatformPathEnv, -} from "./test-helpers.js"; -import { resolveWindowsLobsterSpawn } from "./windows-spawn.js"; - -const spawnState = vi.hoisted(() => ({ - queue: [] as Array<{ stdout: string; stderr?: string; exitCode?: number }>, - spawn: vi.fn(), -})); - -vi.mock("node:child_process", async () => { - const actual = await vi.importActual("node:child_process"); - return { - ...actual, - spawn: (...args: unknown[]) => spawnState.spawn(...args), - }; -}); let createLobsterTool: typeof import("./lobster-tool.js").createLobsterTool; @@ -55,154 +30,124 @@ function fakeCtx(overrides: Partial = {}): OpenClawPl }; } -async function expectUnwrappedShim(params: { - scriptPath: string; - shimPath: string; - shimLine: string; -}) { - await createWindowsCmdShimFixture(params); - - const target = resolveWindowsLobsterSpawn(params.shimPath, ["run", "noop"], process.env); - expect(target.command).toBe(process.execPath); - expect(target.argv).toEqual([params.scriptPath, "run", "noop"]); - expect(target.windowsHide).toBe(true); -} - describe("lobster plugin tool", () => { - let tempDir = ""; - const originalProcessState = snapshotPlatformPathEnv(); - - beforeAll(async () => { + it("returns the Lobster envelope in details", async () => { ({ createLobsterTool } = await import("./lobster-tool.js")); - tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-plugin-")); - }); - - afterEach(() => { - restorePlatformPathEnv(originalProcessState); - }); - - afterAll(async () => { - if (!tempDir) { - return; - } - if (process.platform === "win32") { - await fs.rm(tempDir, { recursive: true, force: true, maxRetries: 10, retryDelay: 50 }); - } else { - await fs.rm(tempDir, { recursive: true, force: true }); - } - }); - - beforeEach(() => { - spawnState.queue.length = 0; - spawnState.spawn.mockReset(); - spawnState.spawn.mockImplementation(() => { - const next = spawnState.queue.shift() ?? { stdout: "" }; - const stdout = new PassThrough(); - const stderr = new PassThrough(); - const child = new EventEmitter() as EventEmitter & { - stdout: PassThrough; - stderr: PassThrough; - kill: (signal?: string) => boolean; - }; - child.stdout = stdout; - child.stderr = stderr; - child.kill = () => true; - - setImmediate(() => { - if (next.stderr) { - stderr.end(next.stderr); - } else { - stderr.end(); - } - stdout.end(next.stdout); - child.emit("exit", next.exitCode ?? 0); - }); - - return child; - }); - }); - - const queueSuccessfulEnvelope = (hello = "world") => { - spawnState.queue.push({ - stdout: JSON.stringify({ - ok: true, - status: "ok", - output: [{ hello }], - requiresApproval: null, - }), - }); - }; - - it("runs lobster and returns parsed envelope in details", async () => { - spawnState.queue.push({ - stdout: JSON.stringify({ + const runner = { + run: vi.fn().mockResolvedValue({ ok: true, status: "ok", output: [{ hello: "world" }], requiresApproval: null, }), - }); + }; - const tool = createLobsterTool(fakeApi()); + const tool = createLobsterTool(fakeApi(), { runner }); const res = await tool.execute("call1", { action: "run", pipeline: "noop", timeoutMs: 1000, }); - expect(spawnState.spawn).toHaveBeenCalled(); - expect(res.details).toMatchObject({ ok: true, status: "ok" }); - }); - - it("tolerates noisy stdout before the JSON envelope", async () => { - const payload = { ok: true, status: "ok", output: [], requiresApproval: null }; - spawnState.queue.push({ - stdout: `noise before json\n${JSON.stringify(payload)}`, - }); - - const tool = createLobsterTool(fakeApi()); - const res = await tool.execute("call-noisy", { + expect(runner.run).toHaveBeenCalledWith({ action: "run", pipeline: "noop", + cwd: process.cwd(), timeoutMs: 1000, + maxStdoutBytes: 512_000, + }); + expect(res.details).toMatchObject({ + ok: true, + status: "ok", + output: [{ hello: "world" }], + requiresApproval: null, + }); + }); + + it("supports approval envelopes without changing the tool contract", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const runner = { + run: vi.fn().mockResolvedValue({ + ok: true, + status: "needs_approval", + output: [], + requiresApproval: { + type: "approval_request", + prompt: "Send these alerts?", + items: [{ id: "alert-1" }], + resumeToken: "resume-token-1", + }, + }), + }; + + const tool = createLobsterTool(fakeApi(), { runner }); + const res = await tool.execute("call-injected-runner", { + action: "run", + pipeline: "noop", + argsJson: '{"since_hours":1}', + timeoutMs: 1500, + maxStdoutBytes: 4096, }); - expect(res.details).toMatchObject({ ok: true, status: "ok" }); + expect(runner.run).toHaveBeenCalledWith({ + action: "run", + pipeline: "noop", + argsJson: '{"since_hours":1}', + cwd: process.cwd(), + timeoutMs: 1500, + maxStdoutBytes: 4096, + }); + expect(res.details).toMatchObject({ + ok: true, + status: "needs_approval", + requiresApproval: { + type: "approval_request", + prompt: "Send these alerts?", + resumeToken: "resume-token-1", + }, + }); + }); + + it("throws when the runner returns an error envelope", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { + run: vi.fn().mockResolvedValue({ + ok: false, + error: { + type: "runtime_error", + message: "boom", + }, + }), + }, + }); + + await expect( + tool.execute("call-runner-error", { + action: "run", + pipeline: "noop", + }), + ).rejects.toThrow("boom"); }); it("requires action", async () => { - const tool = createLobsterTool(fakeApi()); + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + }); await expect(tool.execute("call-action-missing", {})).rejects.toThrow(/action required/); }); - it("requires pipeline for run action", async () => { - const tool = createLobsterTool(fakeApi()); - await expect( - tool.execute("call-pipeline-missing", { - action: "run", - }), - ).rejects.toThrow(/pipeline required/); - }); - - it("requires token and approve for resume action", async () => { - const tool = createLobsterTool(fakeApi()); - await expect( - tool.execute("call-resume-token-missing", { - action: "resume", - approve: true, - }), - ).rejects.toThrow(/token required/); - await expect( - tool.execute("call-resume-approve-missing", { - action: "resume", - token: "resume-token", - }), - ).rejects.toThrow(/approve required/); - }); - it("rejects unknown action", async () => { - const tool = createLobsterTool(fakeApi()); + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + }); await expect( tool.execute("call-action-unknown", { action: "explode", @@ -211,9 +156,13 @@ describe("lobster plugin tool", () => { }); it("rejects absolute cwd", async () => { - const tool = createLobsterTool(fakeApi()); + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + }); await expect( - tool.execute("call2c", { + tool.execute("call-absolute-cwd", { action: "run", pipeline: "noop", cwd: "/tmp", @@ -222,9 +171,13 @@ describe("lobster plugin tool", () => { }); it("rejects cwd that escapes the gateway working directory", async () => { - const tool = createLobsterTool(fakeApi()); + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + }); await expect( - tool.execute("call2d", { + tool.execute("call-escape-cwd", { action: "run", pipeline: "noop", cwd: "../../etc", @@ -232,175 +185,20 @@ describe("lobster plugin tool", () => { ).rejects.toThrow(/must stay within/); }); - it("rejects invalid JSON from lobster", async () => { - spawnState.queue.push({ stdout: "nope" }); - - const tool = createLobsterTool(fakeApi()); - await expect( - tool.execute("call3", { - action: "run", - pipeline: "noop", - }), - ).rejects.toThrow(/invalid JSON/); - }); - - it("runs Windows cmd shims through Node without enabling shell", async () => { - setProcessPlatform("win32"); - const shimScriptPath = path.join(tempDir, "shim-dist", "lobster-cli.cjs"); - const shimPath = path.join(tempDir, "shim-bin", "lobster.cmd"); - await createWindowsCmdShimFixture({ - shimPath, - scriptPath: shimScriptPath, - shimLine: `"%dp0%\\..\\shim-dist\\lobster-cli.cjs" %*`, - }); - process.env.PATHEXT = ".CMD;.EXE"; - process.env.PATH = `${path.dirname(shimPath)};${process.env.PATH ?? ""}`; - queueSuccessfulEnvelope(); - - const tool = createLobsterTool(fakeApi()); - await tool.execute("call-win-shim", { - action: "run", - pipeline: "noop", - }); - - const [command, argv, options] = spawnState.spawn.mock.calls[0] ?? []; - expect(command).toBe(process.execPath); - expect(argv).toEqual([shimScriptPath, "run", "--mode", "tool", "noop"]); - expect(options).toMatchObject({ windowsHide: true }); - expect(options).not.toHaveProperty("shell"); - }); - - it("does not retry a failed Windows spawn with shell fallback", async () => { - setProcessPlatform("win32"); - spawnState.spawn.mockReset(); - spawnState.spawn.mockImplementationOnce(() => { - const child = new EventEmitter() as EventEmitter & { - stdout: PassThrough; - stderr: PassThrough; - kill: (signal?: string) => boolean; - }; - child.stdout = new PassThrough(); - child.stderr = new PassThrough(); - child.kill = () => true; - const err = Object.assign(new Error("spawn failed"), { code: "ENOENT" }); - setImmediate(() => child.emit("error", err)); - return child; - }); - - const tool = createLobsterTool(fakeApi()); - await expect( - tool.execute("call-win-no-retry", { - action: "run", - pipeline: "noop", - }), - ).rejects.toThrow(/spawn failed/); - expect(spawnState.spawn).toHaveBeenCalledTimes(1); - }); - it("can be gated off in sandboxed contexts", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + const api = fakeApi(); const factoryTool = (ctx: OpenClawPluginToolContext) => { if (ctx.sandboxed) { return null; } - return createLobsterTool(api); + return createLobsterTool(api, { + runner: { run: vi.fn() }, + }); }; expect(factoryTool(fakeCtx({ sandboxed: true }))).toBeNull(); expect(factoryTool(fakeCtx({ sandboxed: false }))?.name).toBe("lobster"); }); }); - -describe("resolveWindowsLobsterSpawn", () => { - let tempDir = ""; - const originalProcessState = snapshotPlatformPathEnv(); - - beforeEach(async () => { - tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-win-spawn-")); - setProcessPlatform("win32"); - }); - - afterEach(async () => { - restorePlatformPathEnv(originalProcessState); - if (tempDir) { - await fs.rm(tempDir, { recursive: true, force: true }); - tempDir = ""; - } - }); - - it("unwraps cmd shim with %dp0% token", async () => { - const scriptPath = path.join(tempDir, "shim-dist", "lobster-cli.cjs"); - const shimPath = path.join(tempDir, "shim", "lobster.cmd"); - await expectUnwrappedShim({ - shimPath, - scriptPath, - shimLine: `"%dp0%\\..\\shim-dist\\lobster-cli.cjs" %*`, - }); - }); - - it("unwraps cmd shim with %~dp0% token", async () => { - const scriptPath = path.join(tempDir, "shim-dist", "lobster-cli.cjs"); - const shimPath = path.join(tempDir, "shim", "lobster.cmd"); - await expectUnwrappedShim({ - shimPath, - scriptPath, - shimLine: `"%~dp0%\\..\\shim-dist\\lobster-cli.cjs" %*`, - }); - }); - - it("ignores node.exe shim entries and picks lobster script", async () => { - const shimDir = path.join(tempDir, "shim-with-node"); - const scriptPath = path.join(tempDir, "shim-dist-node", "lobster-cli.cjs"); - const shimPath = path.join(shimDir, "lobster.cmd"); - await fs.mkdir(path.dirname(scriptPath), { recursive: true }); - await fs.mkdir(shimDir, { recursive: true }); - await fs.writeFile(path.join(shimDir, "node.exe"), "", "utf8"); - await fs.writeFile(scriptPath, "module.exports = {};\n", "utf8"); - await fs.writeFile( - shimPath, - `@echo off\r\n"%~dp0%\\node.exe" "%~dp0%\\..\\shim-dist-node\\lobster-cli.cjs" %*\r\n`, - "utf8", - ); - - const target = resolveWindowsLobsterSpawn(shimPath, ["run", "noop"], process.env); - expect(target.command).toBe(process.execPath); - expect(target.argv).toEqual([scriptPath, "run", "noop"]); - expect(target.windowsHide).toBe(true); - }); - - it("resolves lobster.cmd from PATH and unwraps npm layout shim", async () => { - const binDir = path.join(tempDir, "node_modules", ".bin"); - const packageDir = path.join(tempDir, "node_modules", "lobster"); - const scriptPath = path.join(packageDir, "dist", "cli.js"); - const shimPath = path.join(binDir, "lobster.cmd"); - await fs.mkdir(path.dirname(scriptPath), { recursive: true }); - await fs.mkdir(binDir, { recursive: true }); - await fs.writeFile(shimPath, "@echo off\r\n", "utf8"); - await fs.writeFile( - path.join(packageDir, "package.json"), - JSON.stringify({ name: "lobster", version: "0.0.0", bin: { lobster: "dist/cli.js" } }), - "utf8", - ); - await fs.writeFile(scriptPath, "module.exports = {};\n", "utf8"); - - const env = { - ...process.env, - PATH: `${binDir};${process.env.PATH ?? ""}`, - PATHEXT: ".CMD;.EXE", - }; - const target = resolveWindowsLobsterSpawn("lobster", ["run", "noop"], env); - expect(target.command).toBe(process.execPath); - expect(target.argv).toEqual([scriptPath, "run", "noop"]); - expect(target.windowsHide).toBe(true); - }); - - it("fails fast when wrapper cannot be resolved without shell execution", async () => { - const badShimPath = path.join(tempDir, "bad-shim", "lobster.cmd"); - await fs.mkdir(path.dirname(badShimPath), { recursive: true }); - await fs.writeFile(badShimPath, "@echo off\r\nREM no entrypoint\r\n", "utf8"); - - expect(() => resolveWindowsLobsterSpawn(badShimPath, ["run", "noop"], process.env)).toThrow( - /without shell execution/, - ); - }); -}); diff --git a/extensions/lobster/src/lobster-tool.ts b/extensions/lobster/src/lobster-tool.ts index fa3994bb45d..aff2d66aeb1 100644 --- a/extensions/lobster/src/lobster-tool.ts +++ b/extensions/lobster/src/lobster-tool.ts @@ -1,213 +1,14 @@ -import { spawn } from "node:child_process"; -import path from "node:path"; import { Type } from "@sinclair/typebox"; import type { OpenClawPluginApi } from "../runtime-api.js"; -import { resolveWindowsLobsterSpawn } from "./windows-spawn.js"; +import { + createEmbeddedLobsterRunner, + resolveLobsterCwd, + type LobsterRunner, + type LobsterRunnerParams, +} from "./lobster-runner.js"; -type LobsterEnvelope = - | { - ok: true; - status: "ok" | "needs_approval" | "cancelled"; - output: unknown[]; - requiresApproval: null | { - type: "approval_request"; - prompt: string; - items: unknown[]; - resumeToken?: string; - }; - } - | { - ok: false; - error: { type?: string; message: string }; - }; - -function normalizeForCwdSandbox(p: string): string { - const normalized = path.normalize(p); - return process.platform === "win32" ? normalized.toLowerCase() : normalized; -} - -function resolveCwd(cwdRaw: unknown): string { - if (typeof cwdRaw !== "string" || !cwdRaw.trim()) { - return process.cwd(); - } - const cwd = cwdRaw.trim(); - if (path.isAbsolute(cwd)) { - throw new Error("cwd must be a relative path"); - } - const base = process.cwd(); - const resolved = path.resolve(base, cwd); - - const rel = path.relative(normalizeForCwdSandbox(base), normalizeForCwdSandbox(resolved)); - if (rel === "" || rel === ".") { - return resolved; - } - if (rel.startsWith("..") || path.isAbsolute(rel)) { - throw new Error("cwd must stay within the gateway working directory"); - } - return resolved; -} - -async function runLobsterSubprocessOnce(params: { - execPath: string; - argv: string[]; - cwd: string; - timeoutMs: number; - maxStdoutBytes: number; -}) { - const { execPath, argv, cwd } = params; - const timeoutMs = Math.max(200, params.timeoutMs); - const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); - - const env = { ...process.env, LOBSTER_MODE: "tool" } as Record; - const nodeOptions = env.NODE_OPTIONS ?? ""; - if (nodeOptions.includes("--inspect")) { - delete env.NODE_OPTIONS; - } - const spawnTarget = - process.platform === "win32" - ? resolveWindowsLobsterSpawn(execPath, argv, env) - : { command: execPath, argv }; - - return await new Promise<{ stdout: string }>((resolve, reject) => { - const child = spawn(spawnTarget.command, spawnTarget.argv, { - cwd, - stdio: ["ignore", "pipe", "pipe"], - env, - windowsHide: spawnTarget.windowsHide, - }); - - let stdout = ""; - let stdoutBytes = 0; - let stderr = ""; - let settled = false; - - const settle = ( - result: { ok: true; value: { stdout: string } } | { ok: false; error: Error }, - ) => { - if (settled) { - return; - } - settled = true; - clearTimeout(timer); - if (result.ok) { - resolve(result.value); - } else { - reject(result.error); - } - }; - - const failAndTerminate = (message: string) => { - try { - child.kill("SIGKILL"); - } finally { - settle({ ok: false, error: new Error(message) }); - } - }; - - child.stdout?.setEncoding("utf8"); - child.stderr?.setEncoding("utf8"); - - child.stdout?.on("data", (chunk) => { - const str = String(chunk); - stdoutBytes += Buffer.byteLength(str, "utf8"); - if (stdoutBytes > maxStdoutBytes) { - failAndTerminate("lobster output exceeded maxStdoutBytes"); - return; - } - stdout += str; - }); - - child.stderr?.on("data", (chunk) => { - stderr += String(chunk); - }); - - const timer = setTimeout(() => { - failAndTerminate("lobster subprocess timed out"); - }, timeoutMs); - - child.once("error", (err) => { - settle({ ok: false, error: err }); - }); - - child.once("exit", (code) => { - if (code !== 0) { - settle({ - ok: false, - error: new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`), - }); - return; - } - settle({ ok: true, value: { stdout } }); - }); - }); -} - -function parseEnvelope(stdout: string): LobsterEnvelope { - const trimmed = stdout.trim(); - - const tryParse = (input: string) => { - try { - return JSON.parse(input) as unknown; - } catch { - return undefined; - } - }; - - let parsed: unknown = tryParse(trimmed); - - // Some environments can leak extra stdout (e.g. warnings/logs) before the - // final JSON envelope. Be tolerant and parse the last JSON-looking suffix. - if (parsed === undefined) { - const suffixMatch = trimmed.match(/({[\s\S]*}|\[[\s\S]*])\s*$/); - if (suffixMatch?.[1]) { - parsed = tryParse(suffixMatch[1]); - } - } - - if (parsed === undefined) { - throw new Error("lobster returned invalid JSON"); - } - - if (!parsed || typeof parsed !== "object") { - throw new Error("lobster returned invalid JSON envelope"); - } - - const ok = (parsed as { ok?: unknown }).ok; - if (ok === true || ok === false) { - return parsed as LobsterEnvelope; - } - - throw new Error("lobster returned invalid JSON envelope"); -} - -function buildLobsterArgv(action: string, params: Record): string[] { - if (action === "run") { - const pipeline = typeof params.pipeline === "string" ? params.pipeline : ""; - if (!pipeline.trim()) { - throw new Error("pipeline required"); - } - const argv = ["run", "--mode", "tool", pipeline]; - const argsJson = typeof params.argsJson === "string" ? params.argsJson : ""; - if (argsJson.trim()) { - argv.push("--args-json", argsJson); - } - return argv; - } - if (action === "resume") { - const token = typeof params.token === "string" ? params.token : ""; - if (!token.trim()) { - throw new Error("token required"); - } - const approve = params.approve; - if (typeof approve !== "boolean") { - throw new Error("approve required"); - } - return ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; - } - throw new Error(`Unknown action: ${action}`); -} - -export function createLobsterTool(api: OpenClawPluginApi) { +export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: LobsterRunner }) { + const runner = options?.runner ?? createEmbeddedLobsterRunner(); return { name: "lobster", label: "Lobster Workflow", @@ -234,28 +35,33 @@ export function createLobsterTool(api: OpenClawPluginApi) { if (!action) { throw new Error("action required"); } + if (action !== "run" && action !== "resume") { + throw new Error(`Unknown action: ${action}`); + } - const execPath = "lobster"; - const cwd = resolveCwd(params.cwd); + const cwd = resolveLobsterCwd(params.cwd); const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 20_000; const maxStdoutBytes = typeof params.maxStdoutBytes === "number" ? params.maxStdoutBytes : 512_000; - const argv = buildLobsterArgv(action, params); - if (api.runtime?.version && api.logger?.debug) { api.logger.debug(`lobster plugin runtime=${api.runtime.version}`); } - const { stdout } = await runLobsterSubprocessOnce({ - execPath, - argv, + const runnerParams: LobsterRunnerParams = { + action, + ...(typeof params.pipeline === "string" ? { pipeline: params.pipeline } : {}), + ...(typeof params.argsJson === "string" ? { argsJson: params.argsJson } : {}), + ...(typeof params.token === "string" ? { token: params.token } : {}), + ...(typeof params.approve === "boolean" ? { approve: params.approve } : {}), cwd, timeoutMs, maxStdoutBytes, - }); - - const envelope = parseEnvelope(stdout); + }; + const envelope = await runner.run(runnerParams); + if (!envelope.ok) { + throw new Error(envelope.error.message); + } return { content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }], diff --git a/extensions/lobster/src/windows-spawn.ts b/extensions/lobster/src/windows-spawn.ts deleted file mode 100644 index 22541f866a8..00000000000 --- a/extensions/lobster/src/windows-spawn.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { - applyWindowsSpawnProgramPolicy, - materializeWindowsSpawnProgram, - resolveWindowsSpawnProgramCandidate, -} from "../runtime-api.js"; - -type SpawnTarget = { - command: string; - argv: string[]; - windowsHide?: boolean; -}; - -export function resolveWindowsLobsterSpawn( - execPath: string, - argv: string[], - env: NodeJS.ProcessEnv, -): SpawnTarget { - const candidate = resolveWindowsSpawnProgramCandidate({ - command: execPath, - env, - packageName: "lobster", - }); - const program = applyWindowsSpawnProgramPolicy({ - candidate, - allowShellFallback: false, - }); - const resolved = materializeWindowsSpawnProgram(program, argv); - if (resolved.shell) { - throw new Error("lobster wrapper resolved to shell fallback unexpectedly"); - } - return { - command: resolved.command, - argv: resolved.argv, - windowsHide: resolved.windowsHide, - }; -} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b1c44f61e04..8e7113d9b32 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -447,6 +447,9 @@ importers: extensions/lobster: dependencies: + '@clawdbot/lobster': + specifier: 2026.1.24 + version: 2026.1.24 '@sinclair/typebox': specifier: 0.34.49 version: 0.34.49 @@ -1108,6 +1111,11 @@ packages: '@clack/prompts@1.2.0': resolution: {integrity: sha512-4jmztR9fMqPMjz6H/UZXj0zEmE43ha1euENwkckKKel4XpSfokExPo5AiVStdHSAlHekz4d0CA/r45Ok1E4D3w==} + '@clawdbot/lobster@2026.1.24': + resolution: {integrity: sha512-vHrMy4NErcq6suyGByfQSdalnvaMu4dRd10BJdeMp60V6cYtuHJSR2Ay5l0kb4iSPyk4dZKrXNNpLzeqHRcAfA==} + engines: {node: '>=20'} + hasBin: true + '@cloudflare/workers-types@4.20260120.0': resolution: {integrity: sha512-B8pueG+a5S+mdK3z8oKu1ShcxloZ7qWb68IEyLLaepvdryIbNC7JVPcY0bWsjS56UQVKc5fnyRge3yZIwc9bxw==} @@ -7388,6 +7396,11 @@ snapshots: fast-wrap-ansi: 0.1.6 sisteransi: 1.0.5 + '@clawdbot/lobster@2026.1.24': + dependencies: + ajv: 8.18.0 + yaml: 2.8.3 + '@cloudflare/workers-types@4.20260120.0': optional: true