Lobster: import published core runtime (#64755)

* Lobster: import published core runtime

* Changelog: add Lobster core runtime note

* Lobster: type embedded core runtime

* Lobster: keep package-boundary tsconfig narrow
This commit is contained in:
Mariano
2026-04-13 20:38:46 +02:00
committed by GitHub
parent 66f57a6e1b
commit 1490e2b1d3
5 changed files with 125 additions and 486 deletions

View File

@@ -100,6 +100,7 @@ Docs: https://docs.openclaw.ai
- CLI/audio providers: report env-authenticated providers as configured in `openclaw infer audio providers --json`, while keeping trusted workspace provider env lookup defaults stable during auth setup. (#65491)
- Plugins/install: reinstall bundled runtime packages when the matching platform native optional child is missing, so packaged Windows installs can recover dependencies that were packed on another host OS.
- Memory/QMD: preserve explicit `memory.qmd.command` paths, create missing agent workspaces before QMD probes, and keep the current Node binary on QMD subprocess PATH so service and gateway environments do not fall back to builtin search unnecessarily.
- Plugins/Lobster: load the published `@clawdbot/lobster/core` runtime in process so bundled Lobster runs stop depending on private package internals. (#64755) Thanks @mbelinky.
## 2026.4.11

View File

@@ -69,7 +69,6 @@ Notes:
## Security
- Runs the `lobster` executable as a local subprocess.
- Runs Lobster in process via the published `@clawdbot/lobster/core` runtime.
- Does not manage OAuth/tokens.
- Uses timeouts, stdout caps, and strict JSON envelope parsing.
- Ensure `lobster` is available on `PATH` for the gateway process.

View File

@@ -0,0 +1,59 @@
declare module "@clawdbot/lobster/core" {
type LobsterApprovalRequest = {
type: "approval_request";
prompt: string;
items: unknown[];
resumeToken?: string;
} | null;
type LobsterToolContext = {
cwd?: string;
env?: Record<string, string | undefined>;
stdin?: NodeJS.ReadableStream;
stdout?: NodeJS.WritableStream;
stderr?: NodeJS.WritableStream;
signal?: AbortSignal;
registry?: unknown;
llmAdapters?: Record<string, unknown>;
};
type LobsterToolEnvelope =
| {
protocolVersion: 1;
ok: true;
status: "ok" | "needs_approval" | "needs_input" | "cancelled";
output: unknown[];
requiresApproval: LobsterApprovalRequest;
requiresInput?: {
prompt: string;
schema?: unknown;
items?: unknown[];
resumeToken?: string;
approvalId?: string;
} | null;
}
| {
protocolVersion: 1;
ok: false;
error: {
type: string;
message: string;
};
};
export function runToolRequest(params: {
pipeline?: string;
filePath?: string;
args?: Record<string, unknown>;
ctx?: LobsterToolContext;
}): Promise<LobsterToolEnvelope>;
export function resumeToolRequest(params: {
token?: string;
approvalId?: string;
approved?: boolean;
response?: unknown;
cancel?: boolean;
ctx?: LobsterToolContext;
}): Promise<LobsterToolEnvelope>;
}

View File

@@ -163,6 +163,37 @@ describe("createEmbeddedLobsterRunner", () => {
).rejects.toThrow("boom");
});
it("fails closed when the embedded runtime requests unsupported input", async () => {
const runtime = {
runToolRequest: vi.fn().mockResolvedValue({
ok: true,
protocolVersion: 1,
status: "needs_input",
output: [],
requiresApproval: null,
requiresInput: {
prompt: "Need more data",
schema: { type: "string" },
},
}),
resumeToolRequest: vi.fn(),
};
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue(runtime),
});
await expect(
runner.run({
action: "run",
pipeline: "exec --json=true echo hi",
cwd: process.cwd(),
timeoutMs: 2000,
maxStdoutBytes: 4096,
}),
).rejects.toThrow("Lobster input requests are not supported by the OpenClaw Lobster tool yet");
});
it("routes resume through the embedded runtime", async () => {
const runtime = {
runToolRequest: vi.fn(),

View File

@@ -1,14 +1,9 @@
import { randomUUID } from "node:crypto";
import { existsSync } from "node:fs";
import { createRequire } from "node:module";
import path from "node:path";
import { Readable, Writable } from "node:stream";
import { pathToFileURL } from "node:url";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
lowercasePreservingWhitespace,
normalizeLowercaseStringOrEmpty,
} from "openclaw/plugin-sdk/text-runtime";
resumeToolRequest as embeddedResumeToolRequest,
runToolRequest as embeddedRunToolRequest,
} from "@clawdbot/lobster/core";
export type LobsterEnvelope =
| {
@@ -57,7 +52,7 @@ type EmbeddedToolContext = {
type EmbeddedToolEnvelope = {
protocolVersion?: number;
ok: boolean;
status?: "ok" | "needs_approval" | "cancelled";
status?: "ok" | "needs_approval" | "needs_input" | "cancelled";
output?: unknown[];
requiresApproval?: {
type?: "approval_request";
@@ -66,6 +61,13 @@ type EmbeddedToolEnvelope = {
preview?: string;
resumeToken?: string;
} | null;
requiresInput?: {
prompt: string;
schema?: unknown;
items?: unknown[];
resumeToken?: string;
approvalId?: string;
} | null;
error?: {
type?: string;
message: string;
@@ -80,99 +82,20 @@ type EmbeddedToolRuntime = {
ctx?: EmbeddedToolContext;
}) => Promise<EmbeddedToolEnvelope>;
resumeToolRequest: (params: {
token: string;
approved: boolean;
token?: string;
approvalId?: string;
approved?: boolean;
response?: unknown;
cancel?: boolean;
ctx?: EmbeddedToolContext;
}) => Promise<EmbeddedToolEnvelope>;
};
type ToolRuntimeDeps = {
createDefaultRegistry: () => unknown;
parsePipeline: (pipeline: string) => Array<{
name: string;
args: Record<string, unknown>;
raw: string;
}>;
decodeResumeToken: (token: string) => {
kind?: string;
stateKey?: string;
filePath?: string;
};
encodeToken: (payload: Record<string, unknown>) => string;
runPipeline: (params: {
pipeline: Array<{ name: string; args: Record<string, unknown>; raw: string }>;
registry: unknown;
input: AsyncIterable<unknown> | unknown[];
stdin: NodeJS.ReadableStream;
stdout: NodeJS.WritableStream;
stderr: NodeJS.WritableStream;
env: Record<string, string | undefined>;
mode: "tool";
cwd: string;
llmAdapters?: Record<string, unknown>;
signal?: AbortSignal;
}) => Promise<{
items: unknown[];
halted?: boolean;
haltedAt?: { index?: number };
}>;
runWorkflowFile: (params: {
filePath: string;
args?: Record<string, unknown>;
ctx: EmbeddedToolContext;
resume?: Record<string, unknown>;
approved?: boolean;
}) => Promise<{
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval?: EmbeddedToolEnvelope["requiresApproval"];
}>;
readStateJson: (params: {
env: Record<string, string | undefined>;
key: string;
}) => Promise<unknown>;
writeStateJson: (params: {
env: Record<string, string | undefined>;
key: string;
value: unknown;
}) => Promise<void>;
deleteStateJson: (params: {
env: Record<string, string | undefined>;
key: string;
}) => Promise<void>;
};
type PipelineResumeState = {
pipeline: Array<{ name: string; args: Record<string, unknown>; raw: string }>;
resumeAtIndex: number;
items: unknown[];
prompt?: string;
createdAt: string;
};
type LoadEmbeddedToolRuntime = () => Promise<EmbeddedToolRuntime>;
type ApprovalRequestItem = {
type: "approval_request";
prompt: string;
items: unknown[];
resumeToken?: string;
};
type PipelineRuntimeContext = {
registry: unknown;
stdin: NodeJS.ReadableStream;
stdout: NodeJS.WritableStream;
stderr: NodeJS.WritableStream;
env: Record<string, string | undefined>;
cwd: string;
llmAdapters?: Record<string, unknown>;
signal?: AbortSignal;
};
function normalizeForCwdSandbox(p: string): string {
const normalized = path.normalize(p);
return process.platform === "win32" ? lowercasePreservingWhitespace(normalized) : normalized;
return process.platform === "win32" ? normalized.toLowerCase() : normalized;
}
export function resolveLobsterCwd(cwdRaw: unknown): string {
@@ -212,6 +135,15 @@ function createLimitedSink(maxBytes: number, label: "stdout" | "stderr") {
function normalizeEnvelope(envelope: EmbeddedToolEnvelope): LobsterEnvelope {
if (envelope.ok) {
if (envelope.status === "needs_input") {
return {
ok: false,
error: {
type: "unsupported_status",
message: "Lobster input requests are not supported by the OpenClaw Lobster tool yet",
},
};
}
return {
ok: true,
status: envelope.status ?? "ok",
@@ -244,64 +176,6 @@ function throwOnErrorEnvelope(envelope: LobsterEnvelope): Extract<LobsterEnvelop
throw new Error(envelope.error.message);
}
function asApprovalRequestItem(item: unknown): ApprovalRequestItem | null {
if (!item || typeof item !== "object") {
return null;
}
const candidate = item as Partial<ApprovalRequestItem>;
if (candidate.type !== "approval_request") {
return null;
}
if (typeof candidate.prompt !== "string" || !Array.isArray(candidate.items)) {
return null;
}
return candidate as ApprovalRequestItem;
}
function normalizeWorkflowOutput(
okEnvelope: (
status: "ok" | "needs_approval" | "cancelled",
output: unknown[],
requiresApproval: EmbeddedToolEnvelope["requiresApproval"],
) => EmbeddedToolEnvelope,
output: {
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval?: EmbeddedToolEnvelope["requiresApproval"];
},
): EmbeddedToolEnvelope {
if (output.status === "needs_approval") {
return okEnvelope("needs_approval", [], output.requiresApproval ?? null);
}
if (output.status === "cancelled") {
return okEnvelope("cancelled", [], null);
}
return okEnvelope("ok", output.output, null);
}
async function runPipelineWithRuntime(
deps: ToolRuntimeDeps,
params: {
pipeline: Array<{ name: string; args: Record<string, unknown>; raw: string }>;
input: AsyncIterable<unknown> | unknown[];
runtime: PipelineRuntimeContext;
},
) {
return await deps.runPipeline({
pipeline: params.pipeline,
registry: params.runtime.registry,
input: params.input,
stdin: params.runtime.stdin,
stdout: params.runtime.stdout,
stderr: params.runtime.stderr,
env: params.runtime.env,
mode: "tool",
cwd: params.runtime.cwd,
llmAdapters: params.runtime.llmAdapters,
signal: params.runtime.signal,
});
}
async function resolveWorkflowFile(candidate: string, cwd: string) {
const { stat } = await import("node:fs/promises");
const resolved = path.isAbsolute(candidate) ? candidate : path.resolve(cwd, candidate);
@@ -309,7 +183,7 @@ async function resolveWorkflowFile(candidate: string, cwd: string) {
if (!fileStat.isFile()) {
throw new Error("Workflow path is not a file");
}
const ext = normalizeLowercaseStringOrEmpty(path.extname(resolved));
const ext = path.extname(resolved).toLowerCase();
if (![".lobster", ".yaml", ".yml", ".json"].includes(ext)) {
throw new Error("Workflow file must end in .lobster, .yaml, .yml, or .json");
}
@@ -375,328 +249,11 @@ async function withTimeout<T>(
});
}
function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolRuntime {
const createToolContext = (ctx: EmbeddedToolContext = {}) => ({
cwd: ctx.cwd ?? process.cwd(),
env: { ...process.env, ...ctx.env },
mode: "tool" as const,
stdin: ctx.stdin ?? Readable.from([]),
stdout: ctx.stdout ?? createLimitedSink(512_000, "stdout"),
stderr: ctx.stderr ?? createLimitedSink(512_000, "stderr"),
signal: ctx.signal,
registry: ctx.registry ?? deps.createDefaultRegistry(),
llmAdapters: ctx.llmAdapters,
});
const okEnvelope = (
status: "ok" | "needs_approval" | "cancelled",
output: unknown[],
requiresApproval: EmbeddedToolEnvelope["requiresApproval"],
): EmbeddedToolEnvelope => ({
protocolVersion: 1,
ok: true,
status,
output,
requiresApproval,
});
const errorEnvelope = (type: string, message: string): EmbeddedToolEnvelope => ({
protocolVersion: 1,
ok: false,
error: { type, message },
});
const streamFromItems = (items: unknown[]) =>
(async function* () {
for (const item of items) {
yield item;
}
})();
const savePipelineResumeState = async (
env: Record<string, string | undefined>,
state: PipelineResumeState,
) => {
const stateKey = `pipeline_resume_${randomUUID()}`;
await deps.writeStateJson({ env, key: stateKey, value: state });
return stateKey;
};
const loadPipelineResumeState = async (
env: Record<string, string | undefined>,
stateKey: string,
) => {
const stored = await deps.readStateJson({ env, key: stateKey });
if (!stored || typeof stored !== "object") {
throw new Error("Pipeline resume state not found");
}
const data = stored as Partial<PipelineResumeState>;
if (!Array.isArray(data.pipeline)) {
throw new Error("Invalid pipeline resume state");
}
if (typeof data.resumeAtIndex !== "number") {
throw new Error("Invalid pipeline resume state");
}
if (!Array.isArray(data.items)) {
throw new Error("Invalid pipeline resume state");
}
return data as PipelineResumeState;
};
return {
async runToolRequest({ pipeline, filePath, args, ctx = {} }) {
const runtime = createToolContext(ctx);
const hasPipeline = typeof pipeline === "string" && pipeline.trim().length > 0;
const hasFile = typeof filePath === "string" && filePath.trim().length > 0;
if (!hasPipeline && !hasFile) {
return errorEnvelope("parse_error", "run requires either pipeline or filePath");
}
if (hasPipeline && hasFile) {
return errorEnvelope("parse_error", "run accepts either pipeline or filePath, not both");
}
if (hasFile) {
try {
const output = await deps.runWorkflowFile({
filePath: filePath,
args,
ctx: runtime,
});
return normalizeWorkflowOutput(okEnvelope, output);
} catch (error) {
return errorEnvelope("runtime_error", formatErrorMessage(error));
}
}
let parsed;
try {
parsed = deps.parsePipeline(String(pipeline));
} catch (error) {
return errorEnvelope("parse_error", formatErrorMessage(error));
}
try {
const output = await runPipelineWithRuntime(deps, {
pipeline: parsed,
input: [],
runtime,
});
const approval =
output.halted && output.items.length === 1
? asApprovalRequestItem(output.items[0])
: null;
if (approval) {
const stateKey = await savePipelineResumeState(runtime.env, {
pipeline: parsed,
resumeAtIndex: (output.haltedAt?.index ?? -1) + 1,
items: approval.items,
prompt: approval.prompt,
createdAt: new Date().toISOString(),
});
const resumeToken = deps.encodeToken({
protocolVersion: 1,
v: 1,
kind: "pipeline-resume",
stateKey,
});
return okEnvelope("needs_approval", [], {
type: "approval_request",
prompt: approval.prompt,
items: approval.items,
resumeToken,
});
}
return okEnvelope("ok", output.items, null);
} catch (error) {
return errorEnvelope("runtime_error", formatErrorMessage(error));
}
},
async resumeToolRequest({ token, approved, ctx = {} }) {
const runtime = createToolContext(ctx);
let payload: { kind?: string; stateKey?: string; filePath?: string };
try {
payload = deps.decodeResumeToken(token);
} catch (error) {
return errorEnvelope("parse_error", formatErrorMessage(error));
}
if (!approved) {
if (payload.kind === "workflow-file" && payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
if (payload.kind === "pipeline-resume" && payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
return okEnvelope("cancelled", [], null);
}
if (payload.kind === "workflow-file" && payload.filePath) {
try {
const output = await deps.runWorkflowFile({
filePath: payload.filePath,
ctx: runtime,
resume: payload as Record<string, unknown>,
approved: true,
});
return normalizeWorkflowOutput(okEnvelope, output);
} catch (error) {
return errorEnvelope("runtime_error", formatErrorMessage(error));
}
}
try {
const resumeState = await loadPipelineResumeState(runtime.env, payload.stateKey ?? "");
const remaining = resumeState.pipeline.slice(resumeState.resumeAtIndex);
const output = await runPipelineWithRuntime(deps, {
pipeline: remaining,
input: streamFromItems(resumeState.items),
runtime,
});
const approval =
output.halted && output.items.length === 1
? asApprovalRequestItem(output.items[0])
: null;
if (approval) {
const nextStateKey = await savePipelineResumeState(runtime.env, {
pipeline: remaining,
resumeAtIndex: (output.haltedAt?.index ?? -1) + 1,
items: approval.items,
prompt: approval.prompt,
createdAt: new Date().toISOString(),
});
if (payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
const resumeToken = deps.encodeToken({
protocolVersion: 1,
v: 1,
kind: "pipeline-resume",
stateKey: nextStateKey,
});
return okEnvelope("needs_approval", [], {
type: "approval_request",
prompt: approval.prompt,
items: approval.items,
resumeToken,
});
}
if (payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
return okEnvelope("ok", output.items, null);
} catch (error) {
return errorEnvelope("runtime_error", formatErrorMessage(error));
}
},
};
}
async function importInstalledLobsterModule<T>(
lobsterRoot: string,
relativePath: string,
): Promise<T> {
const target = path.join(lobsterRoot, relativePath);
return (await import(pathToFileURL(target).href)) as T;
}
function resolveInstalledLobsterRoot() {
const require = createRequire(import.meta.url);
const sdkEntry = require.resolve("@clawdbot/lobster");
let currentDir = path.dirname(sdkEntry);
while (true) {
const packageJsonPath = path.join(currentDir, "package.json");
if (existsSync(packageJsonPath)) {
return currentDir;
}
const parentDir = path.dirname(currentDir);
if (parentDir === currentDir) {
throw new Error("Unable to resolve the installed @clawdbot/lobster package root");
}
currentDir = parentDir;
}
}
async function loadEmbeddedToolRuntimeFromPackage(): Promise<EmbeddedToolRuntime> {
const lobsterRoot = resolveInstalledLobsterRoot();
const coreIndexPath = path.join(lobsterRoot, "dist/src/core/index.js");
try {
const core = await import(pathToFileURL(coreIndexPath).href);
if (typeof core.runToolRequest === "function" && typeof core.resumeToolRequest === "function") {
return {
runToolRequest: core.runToolRequest as EmbeddedToolRuntime["runToolRequest"],
resumeToolRequest: core.resumeToolRequest as EmbeddedToolRuntime["resumeToolRequest"],
};
}
} catch {
// The current published npm package does not export/ship ./core yet.
}
const [
registryModule,
parserModule,
resumeModule,
tokenModule,
runtimeModule,
workflowModule,
storeModule,
] = await Promise.all([
importInstalledLobsterModule<{
createDefaultRegistry: ToolRuntimeDeps["createDefaultRegistry"];
}>(lobsterRoot, "dist/src/commands/registry.js"),
importInstalledLobsterModule<{ parsePipeline: ToolRuntimeDeps["parsePipeline"] }>(
lobsterRoot,
"dist/src/parser.js",
),
importInstalledLobsterModule<{ decodeResumeToken: ToolRuntimeDeps["decodeResumeToken"] }>(
lobsterRoot,
"dist/src/resume.js",
),
importInstalledLobsterModule<{ encodeToken: ToolRuntimeDeps["encodeToken"] }>(
lobsterRoot,
"dist/src/token.js",
),
importInstalledLobsterModule<{ runPipeline: ToolRuntimeDeps["runPipeline"] }>(
lobsterRoot,
"dist/src/runtime.js",
),
importInstalledLobsterModule<{ runWorkflowFile: ToolRuntimeDeps["runWorkflowFile"] }>(
lobsterRoot,
"dist/src/workflows/file.js",
),
importInstalledLobsterModule<{
readStateJson: ToolRuntimeDeps["readStateJson"];
writeStateJson: ToolRuntimeDeps["writeStateJson"];
deleteStateJson: ToolRuntimeDeps["deleteStateJson"];
}>(lobsterRoot, "dist/src/state/store.js"),
]);
return createFallbackEmbeddedToolRuntime({
createDefaultRegistry: registryModule.createDefaultRegistry,
parsePipeline: parserModule.parsePipeline,
decodeResumeToken: resumeModule.decodeResumeToken,
encodeToken: tokenModule.encodeToken,
runPipeline: runtimeModule.runPipeline,
runWorkflowFile: workflowModule.runWorkflowFile,
readStateJson: storeModule.readStateJson,
writeStateJson: storeModule.writeStateJson,
deleteStateJson: storeModule.deleteStateJson,
});
return {
runToolRequest: embeddedRunToolRequest,
resumeToolRequest: embeddedResumeToolRequest,
};
}
export function createEmbeddedLobsterRunner(options?: {
@@ -704,18 +261,10 @@ export function createEmbeddedLobsterRunner(options?: {
}): LobsterRunner {
const loadRuntime = options?.loadRuntime ?? loadEmbeddedToolRuntimeFromPackage;
let runtimePromise: Promise<EmbeddedToolRuntime> | undefined;
const getRuntime = () => {
runtimePromise ??= loadRuntime().catch((error) => {
runtimePromise = undefined;
throw error;
});
return runtimePromise;
};
return {
async run(params) {
const runtime = await getRuntime();
runtimePromise ??= loadRuntime();
const runtime = await runtimePromise;
return await withTimeout(params.timeoutMs, async (signal) => {
const ctx = createEmbeddedToolContext(params, signal);