From 1490e2b1d3777ae3b242e0993a3142724110e2a9 Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Mon, 13 Apr 2026 20:38:46 +0200 Subject: [PATCH] Lobster: import published core runtime (#64755) * Lobster: import published core runtime * Changelog: add Lobster core runtime note * Lobster: type embedded core runtime * Lobster: keep package-boundary tsconfig narrow --- CHANGELOG.md | 1 + extensions/lobster/README.md | 3 +- extensions/lobster/src/lobster-core.d.ts | 59 ++ extensions/lobster/src/lobster-runner.test.ts | 31 ++ extensions/lobster/src/lobster-runner.ts | 517 ++---------------- 5 files changed, 125 insertions(+), 486 deletions(-) create mode 100644 extensions/lobster/src/lobster-core.d.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 788f38ea892..33b272c73ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ Docs: https://docs.openclaw.ai - CLI/audio providers: report env-authenticated providers as configured in `openclaw infer audio providers --json`, while keeping trusted workspace provider env lookup defaults stable during auth setup. (#65491) - Plugins/install: reinstall bundled runtime packages when the matching platform native optional child is missing, so packaged Windows installs can recover dependencies that were packed on another host OS. - Memory/QMD: preserve explicit `memory.qmd.command` paths, create missing agent workspaces before QMD probes, and keep the current Node binary on QMD subprocess PATH so service and gateway environments do not fall back to builtin search unnecessarily. +- Plugins/Lobster: load the published `@clawdbot/lobster/core` runtime in process so bundled Lobster runs stop depending on private package internals. (#64755) Thanks @mbelinky. ## 2026.4.11 diff --git a/extensions/lobster/README.md b/extensions/lobster/README.md index 03c083e6227..96379b39eae 100644 --- a/extensions/lobster/README.md +++ b/extensions/lobster/README.md @@ -69,7 +69,6 @@ Notes: ## Security -- Runs the `lobster` executable as a local subprocess. +- Runs Lobster in process via the published `@clawdbot/lobster/core` runtime. - Does not manage OAuth/tokens. - Uses timeouts, stdout caps, and strict JSON envelope parsing. -- Ensure `lobster` is available on `PATH` for the gateway process. diff --git a/extensions/lobster/src/lobster-core.d.ts b/extensions/lobster/src/lobster-core.d.ts new file mode 100644 index 00000000000..c7a869dba42 --- /dev/null +++ b/extensions/lobster/src/lobster-core.d.ts @@ -0,0 +1,59 @@ +declare module "@clawdbot/lobster/core" { + type LobsterApprovalRequest = { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; + } | null; + + type LobsterToolContext = { + cwd?: string; + env?: Record; + stdin?: NodeJS.ReadableStream; + stdout?: NodeJS.WritableStream; + stderr?: NodeJS.WritableStream; + signal?: AbortSignal; + registry?: unknown; + llmAdapters?: Record; + }; + + type LobsterToolEnvelope = + | { + protocolVersion: 1; + ok: true; + status: "ok" | "needs_approval" | "needs_input" | "cancelled"; + output: unknown[]; + requiresApproval: LobsterApprovalRequest; + requiresInput?: { + prompt: string; + schema?: unknown; + items?: unknown[]; + resumeToken?: string; + approvalId?: string; + } | null; + } + | { + protocolVersion: 1; + ok: false; + error: { + type: string; + message: string; + }; + }; + + export function runToolRequest(params: { + pipeline?: string; + filePath?: string; + args?: Record; + ctx?: LobsterToolContext; + }): Promise; + + export function resumeToolRequest(params: { + token?: string; + approvalId?: string; + approved?: boolean; + response?: unknown; + cancel?: boolean; + ctx?: LobsterToolContext; + }): Promise; +} diff --git a/extensions/lobster/src/lobster-runner.test.ts b/extensions/lobster/src/lobster-runner.test.ts index 2d6d6795904..4176de12c14 100644 --- a/extensions/lobster/src/lobster-runner.test.ts +++ b/extensions/lobster/src/lobster-runner.test.ts @@ -163,6 +163,37 @@ describe("createEmbeddedLobsterRunner", () => { ).rejects.toThrow("boom"); }); + it("fails closed when the embedded runtime requests unsupported input", async () => { + const runtime = { + runToolRequest: vi.fn().mockResolvedValue({ + ok: true, + protocolVersion: 1, + status: "needs_input", + output: [], + requiresApproval: null, + requiresInput: { + prompt: "Need more data", + schema: { type: "string" }, + }, + }), + resumeToolRequest: vi.fn(), + }; + + const runner = createEmbeddedLobsterRunner({ + loadRuntime: vi.fn().mockResolvedValue(runtime), + }); + + await expect( + runner.run({ + action: "run", + pipeline: "exec --json=true echo hi", + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }), + ).rejects.toThrow("Lobster input requests are not supported by the OpenClaw Lobster tool yet"); + }); + it("routes resume through the embedded runtime", async () => { const runtime = { runToolRequest: vi.fn(), diff --git a/extensions/lobster/src/lobster-runner.ts b/extensions/lobster/src/lobster-runner.ts index 469fbf37797..26b8d6a8acb 100644 --- a/extensions/lobster/src/lobster-runner.ts +++ b/extensions/lobster/src/lobster-runner.ts @@ -1,14 +1,9 @@ -import { randomUUID } from "node:crypto"; -import { existsSync } from "node:fs"; -import { createRequire } from "node:module"; import path from "node:path"; import { Readable, Writable } from "node:stream"; -import { pathToFileURL } from "node:url"; -import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { - lowercasePreservingWhitespace, - normalizeLowercaseStringOrEmpty, -} from "openclaw/plugin-sdk/text-runtime"; + resumeToolRequest as embeddedResumeToolRequest, + runToolRequest as embeddedRunToolRequest, +} from "@clawdbot/lobster/core"; export type LobsterEnvelope = | { @@ -57,7 +52,7 @@ type EmbeddedToolContext = { type EmbeddedToolEnvelope = { protocolVersion?: number; ok: boolean; - status?: "ok" | "needs_approval" | "cancelled"; + status?: "ok" | "needs_approval" | "needs_input" | "cancelled"; output?: unknown[]; requiresApproval?: { type?: "approval_request"; @@ -66,6 +61,13 @@ type EmbeddedToolEnvelope = { preview?: string; resumeToken?: string; } | null; + requiresInput?: { + prompt: string; + schema?: unknown; + items?: unknown[]; + resumeToken?: string; + approvalId?: string; + } | null; error?: { type?: string; message: string; @@ -80,99 +82,20 @@ type EmbeddedToolRuntime = { ctx?: EmbeddedToolContext; }) => Promise; resumeToolRequest: (params: { - token: string; - approved: boolean; + token?: string; + approvalId?: string; + approved?: boolean; + response?: unknown; + cancel?: boolean; ctx?: EmbeddedToolContext; }) => Promise; }; -type ToolRuntimeDeps = { - createDefaultRegistry: () => unknown; - parsePipeline: (pipeline: string) => Array<{ - name: string; - args: Record; - raw: string; - }>; - decodeResumeToken: (token: string) => { - kind?: string; - stateKey?: string; - filePath?: string; - }; - encodeToken: (payload: Record) => string; - runPipeline: (params: { - pipeline: Array<{ name: string; args: Record; raw: string }>; - registry: unknown; - input: AsyncIterable | unknown[]; - stdin: NodeJS.ReadableStream; - stdout: NodeJS.WritableStream; - stderr: NodeJS.WritableStream; - env: Record; - mode: "tool"; - cwd: string; - llmAdapters?: Record; - signal?: AbortSignal; - }) => Promise<{ - items: unknown[]; - halted?: boolean; - haltedAt?: { index?: number }; - }>; - runWorkflowFile: (params: { - filePath: string; - args?: Record; - ctx: EmbeddedToolContext; - resume?: Record; - approved?: boolean; - }) => Promise<{ - status: "ok" | "needs_approval" | "cancelled"; - output: unknown[]; - requiresApproval?: EmbeddedToolEnvelope["requiresApproval"]; - }>; - readStateJson: (params: { - env: Record; - key: string; - }) => Promise; - writeStateJson: (params: { - env: Record; - key: string; - value: unknown; - }) => Promise; - deleteStateJson: (params: { - env: Record; - key: string; - }) => Promise; -}; - -type PipelineResumeState = { - pipeline: Array<{ name: string; args: Record; raw: string }>; - resumeAtIndex: number; - items: unknown[]; - prompt?: string; - createdAt: string; -}; - type LoadEmbeddedToolRuntime = () => Promise; -type ApprovalRequestItem = { - type: "approval_request"; - prompt: string; - items: unknown[]; - resumeToken?: string; -}; - -type PipelineRuntimeContext = { - registry: unknown; - stdin: NodeJS.ReadableStream; - stdout: NodeJS.WritableStream; - stderr: NodeJS.WritableStream; - env: Record; - cwd: string; - llmAdapters?: Record; - signal?: AbortSignal; -}; - function normalizeForCwdSandbox(p: string): string { const normalized = path.normalize(p); - return process.platform === "win32" ? lowercasePreservingWhitespace(normalized) : normalized; + return process.platform === "win32" ? normalized.toLowerCase() : normalized; } export function resolveLobsterCwd(cwdRaw: unknown): string { @@ -212,6 +135,15 @@ function createLimitedSink(maxBytes: number, label: "stdout" | "stderr") { function normalizeEnvelope(envelope: EmbeddedToolEnvelope): LobsterEnvelope { if (envelope.ok) { + if (envelope.status === "needs_input") { + return { + ok: false, + error: { + type: "unsupported_status", + message: "Lobster input requests are not supported by the OpenClaw Lobster tool yet", + }, + }; + } return { ok: true, status: envelope.status ?? "ok", @@ -244,64 +176,6 @@ function throwOnErrorEnvelope(envelope: LobsterEnvelope): Extract; - if (candidate.type !== "approval_request") { - return null; - } - if (typeof candidate.prompt !== "string" || !Array.isArray(candidate.items)) { - return null; - } - return candidate as ApprovalRequestItem; -} - -function normalizeWorkflowOutput( - okEnvelope: ( - status: "ok" | "needs_approval" | "cancelled", - output: unknown[], - requiresApproval: EmbeddedToolEnvelope["requiresApproval"], - ) => EmbeddedToolEnvelope, - output: { - status: "ok" | "needs_approval" | "cancelled"; - output: unknown[]; - requiresApproval?: EmbeddedToolEnvelope["requiresApproval"]; - }, -): EmbeddedToolEnvelope { - if (output.status === "needs_approval") { - return okEnvelope("needs_approval", [], output.requiresApproval ?? null); - } - if (output.status === "cancelled") { - return okEnvelope("cancelled", [], null); - } - return okEnvelope("ok", output.output, null); -} - -async function runPipelineWithRuntime( - deps: ToolRuntimeDeps, - params: { - pipeline: Array<{ name: string; args: Record; raw: string }>; - input: AsyncIterable | unknown[]; - runtime: PipelineRuntimeContext; - }, -) { - return await deps.runPipeline({ - pipeline: params.pipeline, - registry: params.runtime.registry, - input: params.input, - stdin: params.runtime.stdin, - stdout: params.runtime.stdout, - stderr: params.runtime.stderr, - env: params.runtime.env, - mode: "tool", - cwd: params.runtime.cwd, - llmAdapters: params.runtime.llmAdapters, - signal: params.runtime.signal, - }); -} - async function resolveWorkflowFile(candidate: string, cwd: string) { const { stat } = await import("node:fs/promises"); const resolved = path.isAbsolute(candidate) ? candidate : path.resolve(cwd, candidate); @@ -309,7 +183,7 @@ async function resolveWorkflowFile(candidate: string, cwd: string) { if (!fileStat.isFile()) { throw new Error("Workflow path is not a file"); } - const ext = normalizeLowercaseStringOrEmpty(path.extname(resolved)); + const ext = path.extname(resolved).toLowerCase(); if (![".lobster", ".yaml", ".yml", ".json"].includes(ext)) { throw new Error("Workflow file must end in .lobster, .yaml, .yml, or .json"); } @@ -375,328 +249,11 @@ async function withTimeout( }); } -function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolRuntime { - const createToolContext = (ctx: EmbeddedToolContext = {}) => ({ - cwd: ctx.cwd ?? process.cwd(), - env: { ...process.env, ...ctx.env }, - mode: "tool" as const, - stdin: ctx.stdin ?? Readable.from([]), - stdout: ctx.stdout ?? createLimitedSink(512_000, "stdout"), - stderr: ctx.stderr ?? createLimitedSink(512_000, "stderr"), - signal: ctx.signal, - registry: ctx.registry ?? deps.createDefaultRegistry(), - llmAdapters: ctx.llmAdapters, - }); - - const okEnvelope = ( - status: "ok" | "needs_approval" | "cancelled", - output: unknown[], - requiresApproval: EmbeddedToolEnvelope["requiresApproval"], - ): EmbeddedToolEnvelope => ({ - protocolVersion: 1, - ok: true, - status, - output, - requiresApproval, - }); - - const errorEnvelope = (type: string, message: string): EmbeddedToolEnvelope => ({ - protocolVersion: 1, - ok: false, - error: { type, message }, - }); - - const streamFromItems = (items: unknown[]) => - (async function* () { - for (const item of items) { - yield item; - } - })(); - - const savePipelineResumeState = async ( - env: Record, - state: PipelineResumeState, - ) => { - const stateKey = `pipeline_resume_${randomUUID()}`; - await deps.writeStateJson({ env, key: stateKey, value: state }); - return stateKey; - }; - - const loadPipelineResumeState = async ( - env: Record, - stateKey: string, - ) => { - const stored = await deps.readStateJson({ env, key: stateKey }); - if (!stored || typeof stored !== "object") { - throw new Error("Pipeline resume state not found"); - } - const data = stored as Partial; - if (!Array.isArray(data.pipeline)) { - throw new Error("Invalid pipeline resume state"); - } - if (typeof data.resumeAtIndex !== "number") { - throw new Error("Invalid pipeline resume state"); - } - if (!Array.isArray(data.items)) { - throw new Error("Invalid pipeline resume state"); - } - return data as PipelineResumeState; - }; - - return { - async runToolRequest({ pipeline, filePath, args, ctx = {} }) { - const runtime = createToolContext(ctx); - const hasPipeline = typeof pipeline === "string" && pipeline.trim().length > 0; - const hasFile = typeof filePath === "string" && filePath.trim().length > 0; - - if (!hasPipeline && !hasFile) { - return errorEnvelope("parse_error", "run requires either pipeline or filePath"); - } - if (hasPipeline && hasFile) { - return errorEnvelope("parse_error", "run accepts either pipeline or filePath, not both"); - } - - if (hasFile) { - try { - const output = await deps.runWorkflowFile({ - filePath: filePath, - args, - ctx: runtime, - }); - return normalizeWorkflowOutput(okEnvelope, output); - } catch (error) { - return errorEnvelope("runtime_error", formatErrorMessage(error)); - } - } - - let parsed; - try { - parsed = deps.parsePipeline(String(pipeline)); - } catch (error) { - return errorEnvelope("parse_error", formatErrorMessage(error)); - } - - try { - const output = await runPipelineWithRuntime(deps, { - pipeline: parsed, - input: [], - runtime, - }); - - const approval = - output.halted && output.items.length === 1 - ? asApprovalRequestItem(output.items[0]) - : null; - - if (approval) { - const stateKey = await savePipelineResumeState(runtime.env, { - pipeline: parsed, - resumeAtIndex: (output.haltedAt?.index ?? -1) + 1, - items: approval.items, - prompt: approval.prompt, - createdAt: new Date().toISOString(), - }); - - const resumeToken = deps.encodeToken({ - protocolVersion: 1, - v: 1, - kind: "pipeline-resume", - stateKey, - }); - - return okEnvelope("needs_approval", [], { - type: "approval_request", - prompt: approval.prompt, - items: approval.items, - resumeToken, - }); - } - - return okEnvelope("ok", output.items, null); - } catch (error) { - return errorEnvelope("runtime_error", formatErrorMessage(error)); - } - }, - - async resumeToolRequest({ token, approved, ctx = {} }) { - const runtime = createToolContext(ctx); - let payload: { kind?: string; stateKey?: string; filePath?: string }; - - try { - payload = deps.decodeResumeToken(token); - } catch (error) { - return errorEnvelope("parse_error", formatErrorMessage(error)); - } - - if (!approved) { - if (payload.kind === "workflow-file" && payload.stateKey) { - await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); - } - if (payload.kind === "pipeline-resume" && payload.stateKey) { - await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); - } - return okEnvelope("cancelled", [], null); - } - - if (payload.kind === "workflow-file" && payload.filePath) { - try { - const output = await deps.runWorkflowFile({ - filePath: payload.filePath, - ctx: runtime, - resume: payload as Record, - approved: true, - }); - return normalizeWorkflowOutput(okEnvelope, output); - } catch (error) { - return errorEnvelope("runtime_error", formatErrorMessage(error)); - } - } - - try { - const resumeState = await loadPipelineResumeState(runtime.env, payload.stateKey ?? ""); - const remaining = resumeState.pipeline.slice(resumeState.resumeAtIndex); - - const output = await runPipelineWithRuntime(deps, { - pipeline: remaining, - input: streamFromItems(resumeState.items), - runtime, - }); - - const approval = - output.halted && output.items.length === 1 - ? asApprovalRequestItem(output.items[0]) - : null; - - if (approval) { - const nextStateKey = await savePipelineResumeState(runtime.env, { - pipeline: remaining, - resumeAtIndex: (output.haltedAt?.index ?? -1) + 1, - items: approval.items, - prompt: approval.prompt, - createdAt: new Date().toISOString(), - }); - if (payload.stateKey) { - await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); - } - - const resumeToken = deps.encodeToken({ - protocolVersion: 1, - v: 1, - kind: "pipeline-resume", - stateKey: nextStateKey, - }); - - return okEnvelope("needs_approval", [], { - type: "approval_request", - prompt: approval.prompt, - items: approval.items, - resumeToken, - }); - } - - if (payload.stateKey) { - await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey }); - } - return okEnvelope("ok", output.items, null); - } catch (error) { - return errorEnvelope("runtime_error", formatErrorMessage(error)); - } - }, - }; -} - -async function importInstalledLobsterModule( - lobsterRoot: string, - relativePath: string, -): Promise { - const target = path.join(lobsterRoot, relativePath); - return (await import(pathToFileURL(target).href)) as T; -} - -function resolveInstalledLobsterRoot() { - const require = createRequire(import.meta.url); - const sdkEntry = require.resolve("@clawdbot/lobster"); - let currentDir = path.dirname(sdkEntry); - - while (true) { - const packageJsonPath = path.join(currentDir, "package.json"); - if (existsSync(packageJsonPath)) { - return currentDir; - } - const parentDir = path.dirname(currentDir); - if (parentDir === currentDir) { - throw new Error("Unable to resolve the installed @clawdbot/lobster package root"); - } - currentDir = parentDir; - } -} - async function loadEmbeddedToolRuntimeFromPackage(): Promise { - const lobsterRoot = resolveInstalledLobsterRoot(); - const coreIndexPath = path.join(lobsterRoot, "dist/src/core/index.js"); - - try { - const core = await import(pathToFileURL(coreIndexPath).href); - if (typeof core.runToolRequest === "function" && typeof core.resumeToolRequest === "function") { - return { - runToolRequest: core.runToolRequest as EmbeddedToolRuntime["runToolRequest"], - resumeToolRequest: core.resumeToolRequest as EmbeddedToolRuntime["resumeToolRequest"], - }; - } - } catch { - // The current published npm package does not export/ship ./core yet. - } - - const [ - registryModule, - parserModule, - resumeModule, - tokenModule, - runtimeModule, - workflowModule, - storeModule, - ] = await Promise.all([ - importInstalledLobsterModule<{ - createDefaultRegistry: ToolRuntimeDeps["createDefaultRegistry"]; - }>(lobsterRoot, "dist/src/commands/registry.js"), - importInstalledLobsterModule<{ parsePipeline: ToolRuntimeDeps["parsePipeline"] }>( - lobsterRoot, - "dist/src/parser.js", - ), - importInstalledLobsterModule<{ decodeResumeToken: ToolRuntimeDeps["decodeResumeToken"] }>( - lobsterRoot, - "dist/src/resume.js", - ), - importInstalledLobsterModule<{ encodeToken: ToolRuntimeDeps["encodeToken"] }>( - lobsterRoot, - "dist/src/token.js", - ), - importInstalledLobsterModule<{ runPipeline: ToolRuntimeDeps["runPipeline"] }>( - lobsterRoot, - "dist/src/runtime.js", - ), - importInstalledLobsterModule<{ runWorkflowFile: ToolRuntimeDeps["runWorkflowFile"] }>( - lobsterRoot, - "dist/src/workflows/file.js", - ), - importInstalledLobsterModule<{ - readStateJson: ToolRuntimeDeps["readStateJson"]; - writeStateJson: ToolRuntimeDeps["writeStateJson"]; - deleteStateJson: ToolRuntimeDeps["deleteStateJson"]; - }>(lobsterRoot, "dist/src/state/store.js"), - ]); - - return createFallbackEmbeddedToolRuntime({ - createDefaultRegistry: registryModule.createDefaultRegistry, - parsePipeline: parserModule.parsePipeline, - decodeResumeToken: resumeModule.decodeResumeToken, - encodeToken: tokenModule.encodeToken, - runPipeline: runtimeModule.runPipeline, - runWorkflowFile: workflowModule.runWorkflowFile, - readStateJson: storeModule.readStateJson, - writeStateJson: storeModule.writeStateJson, - deleteStateJson: storeModule.deleteStateJson, - }); + return { + runToolRequest: embeddedRunToolRequest, + resumeToolRequest: embeddedResumeToolRequest, + }; } export function createEmbeddedLobsterRunner(options?: { @@ -704,18 +261,10 @@ export function createEmbeddedLobsterRunner(options?: { }): LobsterRunner { const loadRuntime = options?.loadRuntime ?? loadEmbeddedToolRuntimeFromPackage; let runtimePromise: Promise | undefined; - - const getRuntime = () => { - runtimePromise ??= loadRuntime().catch((error) => { - runtimePromise = undefined; - throw error; - }); - return runtimePromise; - }; - return { async run(params) { - const runtime = await getRuntime(); + runtimePromise ??= loadRuntime(); + const runtime = await runtimePromise; return await withTimeout(params.timeoutMs, async (signal) => { const ctx = createEmbeddedToolContext(params, signal);