Lobster: run workflows in process (#61523)

* Lobster: run workflows in process

* docs: note in-process lobster runtime

* docs: add lobster changelog attribution
This commit is contained in:
Mariano
2026-04-06 01:30:47 +02:00
committed by GitHub
parent 989ea3e6df
commit 7f97fa6ed5
8 changed files with 1161 additions and 564 deletions

View File

@@ -39,6 +39,7 @@ Docs: https://docs.openclaw.ai
- Matrix/exec approvals: clarify unavailable-approval replies so Matrix no longer claims chat approvals are unsupported when native exec approvals are merely unconfigured. (#61424) Thanks @gumadeiras.
- Docs/IRC: replace public IRC hostname examples with `irc.example.com` and recommend private servers for bot coordination while listing common public networks for intentional use.
- Memory/dreaming: write dreaming trail content to top-level `DREAMS.md` instead of daily memory notes, update `/dreaming` help text to point there, and keep `DREAMS.md` available for explicit reads without pulling it into default recall. Thanks @davemorin.
- Plugins/Lobster: run bundled Lobster workflows in process instead of spawning the external CLI, reducing transport overhead and unblocking native runtime integration. (#61523) Thanks @mbelinky.
### Fixes

View File

@@ -4,6 +4,7 @@
"description": "Lobster workflow tool plugin (typed pipelines + resumable approvals)",
"type": "module",
"dependencies": {
"@clawdbot/lobster": "2026.1.24",
"@sinclair/typebox": "0.34.49"
},
"openclaw": {

View File

@@ -0,0 +1,286 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createEmbeddedLobsterRunner, resolveLobsterCwd } from "./lobster-runner.js";
describe("resolveLobsterCwd", () => {
it("defaults to the current working directory", () => {
expect(resolveLobsterCwd(undefined)).toBe(process.cwd());
});
it("keeps relative paths inside the repo root", () => {
expect(resolveLobsterCwd("extensions/lobster")).toBe(
path.resolve(process.cwd(), "extensions/lobster"),
);
});
});
describe("createEmbeddedLobsterRunner", () => {
afterEach(() => {
vi.restoreAllMocks();
});
it("runs inline pipelines through the embedded runtime", async () => {
const runtime = {
runToolRequest: vi.fn().mockResolvedValue({
ok: true,
protocolVersion: 1,
status: "ok",
output: [{ hello: "world" }],
requiresApproval: null,
}),
resumeToolRequest: vi.fn(),
};
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue(runtime),
});
const envelope = await runner.run({
action: "run",
pipeline: "exec --json=true echo hi",
cwd: process.cwd(),
timeoutMs: 2000,
maxStdoutBytes: 4096,
});
expect(runtime.runToolRequest).toHaveBeenCalledTimes(1);
expect(runtime.runToolRequest).toHaveBeenCalledWith({
pipeline: "exec --json=true echo hi",
ctx: expect.objectContaining({
cwd: process.cwd(),
mode: "tool",
signal: expect.any(AbortSignal),
}),
});
expect(envelope).toEqual({
ok: true,
status: "ok",
output: [{ hello: "world" }],
requiresApproval: null,
});
});
it("detects workflow files and parses argsJson", async () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-runner-"));
const workflowPath = path.join(tempDir, "workflow.lobster");
await fs.writeFile(workflowPath, "steps: []\n", "utf8");
try {
const runtime = {
runToolRequest: vi.fn().mockResolvedValue({
ok: true,
protocolVersion: 1,
status: "ok",
output: [],
requiresApproval: null,
}),
resumeToolRequest: vi.fn(),
};
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue(runtime),
});
await runner.run({
action: "run",
pipeline: "workflow.lobster",
argsJson: '{"limit":3}',
cwd: tempDir,
timeoutMs: 2000,
maxStdoutBytes: 4096,
});
expect(runtime.runToolRequest).toHaveBeenCalledWith({
filePath: workflowPath,
args: { limit: 3 },
ctx: expect.objectContaining({
cwd: tempDir,
mode: "tool",
}),
});
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("returns a parse error when workflow args are invalid JSON", async () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-runner-"));
const workflowPath = path.join(tempDir, "workflow.lobster");
await fs.writeFile(workflowPath, "steps: []\n", "utf8");
try {
const runtime = {
runToolRequest: vi.fn(),
resumeToolRequest: vi.fn(),
};
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue(runtime),
});
await expect(
runner.run({
action: "run",
pipeline: "workflow.lobster",
argsJson: "{bad",
cwd: tempDir,
timeoutMs: 2000,
maxStdoutBytes: 4096,
}),
).rejects.toThrow("run --args-json must be valid JSON");
expect(runtime.runToolRequest).not.toHaveBeenCalled();
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("throws when the embedded runtime returns an error envelope", async () => {
const runtime = {
runToolRequest: vi.fn().mockResolvedValue({
ok: false,
protocolVersion: 1,
error: {
type: "runtime_error",
message: "boom",
},
}),
resumeToolRequest: vi.fn(),
};
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue(runtime),
});
await expect(
runner.run({
action: "run",
pipeline: "exec --json=true echo hi",
cwd: process.cwd(),
timeoutMs: 2000,
maxStdoutBytes: 4096,
}),
).rejects.toThrow("boom");
});
it("routes resume through the embedded runtime", async () => {
const runtime = {
runToolRequest: vi.fn(),
resumeToolRequest: vi.fn().mockResolvedValue({
ok: true,
protocolVersion: 1,
status: "cancelled",
output: [],
requiresApproval: null,
}),
};
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue(runtime),
});
const envelope = await runner.run({
action: "resume",
token: "resume-token",
approve: false,
cwd: process.cwd(),
timeoutMs: 2000,
maxStdoutBytes: 4096,
});
expect(runtime.resumeToolRequest).toHaveBeenCalledWith({
token: "resume-token",
approved: false,
ctx: expect.objectContaining({
cwd: process.cwd(),
mode: "tool",
signal: expect.any(AbortSignal),
}),
});
expect(envelope).toEqual({
ok: true,
status: "cancelled",
output: [],
requiresApproval: null,
});
});
it("requires a pipeline for run", async () => {
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue({
runToolRequest: vi.fn(),
resumeToolRequest: vi.fn(),
}),
});
await expect(
runner.run({
action: "run",
cwd: process.cwd(),
timeoutMs: 2000,
maxStdoutBytes: 4096,
}),
).rejects.toThrow(/pipeline required/);
});
it("requires token and approve for resume", async () => {
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue({
runToolRequest: vi.fn(),
resumeToolRequest: vi.fn(),
}),
});
await expect(
runner.run({
action: "resume",
approve: true,
cwd: process.cwd(),
timeoutMs: 2000,
maxStdoutBytes: 4096,
}),
).rejects.toThrow(/token required/);
await expect(
runner.run({
action: "resume",
token: "resume-token",
cwd: process.cwd(),
timeoutMs: 2000,
maxStdoutBytes: 4096,
}),
).rejects.toThrow(/approve required/);
});
it("aborts long-running embedded work", async () => {
const runtime = {
runToolRequest: vi.fn(
async ({ ctx }: { ctx?: { signal?: AbortSignal } }) =>
await new Promise((resolve, reject) => {
ctx?.signal?.addEventListener("abort", () => {
reject(ctx.signal?.reason ?? new Error("aborted"));
});
setTimeout(
() => resolve({ ok: true, status: "ok", output: [], requiresApproval: null }),
500,
);
}),
),
resumeToolRequest: vi.fn(),
};
const runner = createEmbeddedLobsterRunner({
loadRuntime: vi.fn().mockResolvedValue(runtime),
});
await expect(
runner.run({
action: "run",
pipeline: "exec --json=true echo hi",
cwd: process.cwd(),
timeoutMs: 200,
maxStdoutBytes: 4096,
}),
).rejects.toThrow(/timed out|aborted/);
});
});

View File

@@ -0,0 +1,728 @@
import { randomUUID } from "node:crypto";
import { createRequire } from "node:module";
import path from "node:path";
import { Readable, Writable } from "node:stream";
import { pathToFileURL } from "node:url";
export type LobsterEnvelope =
| {
ok: true;
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval: null | {
type: "approval_request";
prompt: string;
items: unknown[];
resumeToken?: string;
};
}
| {
ok: false;
error: { type?: string; message: string };
};
export type LobsterRunnerParams = {
action: "run" | "resume";
pipeline?: string;
argsJson?: string;
token?: string;
approve?: boolean;
cwd: string;
timeoutMs: number;
maxStdoutBytes: number;
};
export type LobsterRunner = {
run: (params: LobsterRunnerParams) => Promise<LobsterEnvelope>;
};
type EmbeddedToolContext = {
cwd?: string;
env?: Record<string, string | undefined>;
mode?: "tool" | "human" | "sdk";
stdin?: NodeJS.ReadableStream;
stdout?: NodeJS.WritableStream;
stderr?: NodeJS.WritableStream;
signal?: AbortSignal;
registry?: unknown;
llmAdapters?: Record<string, unknown>;
};
type EmbeddedToolEnvelope = {
protocolVersion?: number;
ok: boolean;
status?: "ok" | "needs_approval" | "cancelled";
output?: unknown[];
requiresApproval?: {
type?: "approval_request";
prompt: string;
items: unknown[];
preview?: string;
resumeToken?: string;
} | null;
error?: {
type?: string;
message: string;
};
};
type EmbeddedToolRuntime = {
runToolRequest: (params: {
pipeline?: string;
filePath?: string;
args?: Record<string, unknown>;
ctx?: EmbeddedToolContext;
}) => Promise<EmbeddedToolEnvelope>;
resumeToolRequest: (params: {
token: string;
approved: boolean;
ctx?: EmbeddedToolContext;
}) => Promise<EmbeddedToolEnvelope>;
};
type ToolRuntimeDeps = {
createDefaultRegistry: () => unknown;
parsePipeline: (pipeline: string) => Array<{
name: string;
args: Record<string, unknown>;
raw: string;
}>;
decodeResumeToken: (token: string) => {
kind?: string;
stateKey?: string;
filePath?: string;
};
encodeToken: (payload: Record<string, unknown>) => string;
runPipeline: (params: {
pipeline: Array<{ name: string; args: Record<string, unknown>; raw: string }>;
registry: unknown;
input: AsyncIterable<unknown> | unknown[];
stdin: NodeJS.ReadableStream;
stdout: NodeJS.WritableStream;
stderr: NodeJS.WritableStream;
env: Record<string, string | undefined>;
mode: "tool";
cwd: string;
llmAdapters?: Record<string, unknown>;
signal?: AbortSignal;
}) => Promise<{
items: unknown[];
halted?: boolean;
haltedAt?: { index?: number };
}>;
runWorkflowFile: (params: {
filePath: string;
args?: Record<string, unknown>;
ctx: EmbeddedToolContext;
resume?: Record<string, unknown>;
approved?: boolean;
}) => Promise<{
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval?: EmbeddedToolEnvelope["requiresApproval"];
}>;
readStateJson: (params: {
env: Record<string, string | undefined>;
key: string;
}) => Promise<unknown>;
writeStateJson: (params: {
env: Record<string, string | undefined>;
key: string;
value: unknown;
}) => Promise<void>;
deleteStateJson: (params: {
env: Record<string, string | undefined>;
key: string;
}) => Promise<void>;
};
type PipelineResumeState = {
pipeline: Array<{ name: string; args: Record<string, unknown>; raw: string }>;
resumeAtIndex: number;
items: unknown[];
prompt?: string;
createdAt: string;
};
type LoadEmbeddedToolRuntime = () => Promise<EmbeddedToolRuntime>;
type ApprovalRequestItem = {
type: "approval_request";
prompt: string;
items: unknown[];
resumeToken?: string;
};
function normalizeForCwdSandbox(p: string): string {
const normalized = path.normalize(p);
return process.platform === "win32" ? normalized.toLowerCase() : normalized;
}
export function resolveLobsterCwd(cwdRaw: unknown): string {
if (typeof cwdRaw !== "string" || !cwdRaw.trim()) {
return process.cwd();
}
const cwd = cwdRaw.trim();
if (path.isAbsolute(cwd)) {
throw new Error("cwd must be a relative path");
}
const base = process.cwd();
const resolved = path.resolve(base, cwd);
const rel = path.relative(normalizeForCwdSandbox(base), normalizeForCwdSandbox(resolved));
if (rel === "" || rel === ".") {
return resolved;
}
if (rel.startsWith("..") || path.isAbsolute(rel)) {
throw new Error("cwd must stay within the gateway working directory");
}
return resolved;
}
function createLimitedSink(maxBytes: number, label: "stdout" | "stderr") {
let bytes = 0;
return new Writable({
write(chunk, _encoding, callback) {
bytes += Buffer.byteLength(String(chunk), "utf8");
if (bytes > maxBytes) {
callback(new Error(`lobster ${label} exceeded maxStdoutBytes`));
return;
}
callback();
},
});
}
function normalizeEnvelope(envelope: EmbeddedToolEnvelope): LobsterEnvelope {
if (envelope.ok) {
return {
ok: true,
status: envelope.status ?? "ok",
output: Array.isArray(envelope.output) ? envelope.output : [],
requiresApproval: envelope.requiresApproval
? {
type: "approval_request",
prompt: envelope.requiresApproval.prompt,
items: envelope.requiresApproval.items,
...(envelope.requiresApproval.resumeToken
? { resumeToken: envelope.requiresApproval.resumeToken }
: {}),
}
: null,
};
}
return {
ok: false,
error: {
type: envelope.error?.type,
message: envelope.error?.message ?? "lobster runtime failed",
},
};
}
function throwOnErrorEnvelope(envelope: LobsterEnvelope): Extract<LobsterEnvelope, { ok: true }> {
if (envelope.ok) {
return envelope;
}
throw new Error(envelope.error.message);
}
function asApprovalRequestItem(item: unknown): ApprovalRequestItem | null {
if (!item || typeof item !== "object") {
return null;
}
const candidate = item as Partial<ApprovalRequestItem>;
if (candidate.type !== "approval_request") {
return null;
}
if (typeof candidate.prompt !== "string" || !Array.isArray(candidate.items)) {
return null;
}
return candidate as ApprovalRequestItem;
}
async function resolveWorkflowFile(candidate: string, cwd: string) {
const { stat } = await import("node:fs/promises");
const resolved = path.isAbsolute(candidate) ? candidate : path.resolve(cwd, candidate);
const fileStat = await stat(resolved);
if (!fileStat.isFile()) {
throw new Error("Workflow path is not a file");
}
const ext = path.extname(resolved).toLowerCase();
if (![".lobster", ".yaml", ".yml", ".json"].includes(ext)) {
throw new Error("Workflow file must end in .lobster, .yaml, .yml, or .json");
}
return resolved;
}
async function detectWorkflowFile(candidate: string, cwd: string) {
const trimmed = candidate.trim();
if (!trimmed || trimmed.includes("|")) {
return null;
}
try {
return await resolveWorkflowFile(trimmed, cwd);
} catch {
return null;
}
}
function parseWorkflowArgs(argsJson: string) {
return JSON.parse(argsJson) as Record<string, unknown>;
}
function createEmbeddedToolContext(
params: LobsterRunnerParams,
signal?: AbortSignal,
): EmbeddedToolContext {
const env = { ...process.env } as Record<string, string | undefined>;
return {
cwd: params.cwd,
env,
mode: "tool",
stdin: Readable.from([]),
stdout: createLimitedSink(Math.max(1024, params.maxStdoutBytes), "stdout"),
stderr: createLimitedSink(Math.max(1024, params.maxStdoutBytes), "stderr"),
signal,
};
}
async function withTimeout<T>(
timeoutMs: number,
fn: (signal?: AbortSignal) => Promise<T>,
): Promise<T> {
const timeout = Math.max(200, timeoutMs);
const controller = new AbortController();
return await new Promise<T>((resolve, reject) => {
const onTimeout = () => {
const error = new Error("lobster runtime timed out");
controller.abort(error);
reject(error);
};
const timer = setTimeout(onTimeout, timeout);
void fn(controller.signal).then(
(value) => {
clearTimeout(timer);
resolve(value);
},
(error) => {
clearTimeout(timer);
reject(error);
},
);
});
}
function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolRuntime {
const createToolContext = (ctx: EmbeddedToolContext = {}) => ({
cwd: ctx.cwd ?? process.cwd(),
env: { ...process.env, ...ctx.env },
mode: "tool" as const,
stdin: ctx.stdin ?? Readable.from([]),
stdout: ctx.stdout ?? createLimitedSink(512_000, "stdout"),
stderr: ctx.stderr ?? createLimitedSink(512_000, "stderr"),
signal: ctx.signal,
registry: ctx.registry ?? deps.createDefaultRegistry(),
llmAdapters: ctx.llmAdapters,
});
const okEnvelope = (
status: "ok" | "needs_approval" | "cancelled",
output: unknown[],
requiresApproval: EmbeddedToolEnvelope["requiresApproval"],
): EmbeddedToolEnvelope => ({
protocolVersion: 1,
ok: true,
status,
output,
requiresApproval,
});
const errorEnvelope = (type: string, message: string): EmbeddedToolEnvelope => ({
protocolVersion: 1,
ok: false,
error: { type, message },
});
const streamFromItems = (items: unknown[]) =>
(async function* () {
for (const item of items) {
yield item;
}
})();
const savePipelineResumeState = async (
env: Record<string, string | undefined>,
state: PipelineResumeState,
) => {
const stateKey = `pipeline_resume_${randomUUID()}`;
await deps.writeStateJson({ env, key: stateKey, value: state });
return stateKey;
};
const loadPipelineResumeState = async (
env: Record<string, string | undefined>,
stateKey: string,
) => {
const stored = await deps.readStateJson({ env, key: stateKey });
if (!stored || typeof stored !== "object") {
throw new Error("Pipeline resume state not found");
}
const data = stored as Partial<PipelineResumeState>;
if (!Array.isArray(data.pipeline)) {
throw new Error("Invalid pipeline resume state");
}
if (typeof data.resumeAtIndex !== "number") {
throw new Error("Invalid pipeline resume state");
}
if (!Array.isArray(data.items)) {
throw new Error("Invalid pipeline resume state");
}
return data as PipelineResumeState;
};
return {
async runToolRequest({ pipeline, filePath, args, ctx = {} }) {
const runtime = createToolContext(ctx);
const hasPipeline = typeof pipeline === "string" && pipeline.trim().length > 0;
const hasFile = typeof filePath === "string" && filePath.trim().length > 0;
if (!hasPipeline && !hasFile) {
return errorEnvelope("parse_error", "run requires either pipeline or filePath");
}
if (hasPipeline && hasFile) {
return errorEnvelope("parse_error", "run accepts either pipeline or filePath, not both");
}
if (hasFile) {
try {
const output = await deps.runWorkflowFile({
filePath: filePath!,
args,
ctx: runtime,
});
if (output.status === "needs_approval") {
return okEnvelope("needs_approval", [], output.requiresApproval ?? null);
}
if (output.status === "cancelled") {
return okEnvelope("cancelled", [], null);
}
return okEnvelope("ok", output.output, null);
} catch (error) {
return errorEnvelope(
"runtime_error",
error instanceof Error ? error.message : String(error),
);
}
}
let parsed;
try {
parsed = deps.parsePipeline(String(pipeline));
} catch (error) {
return errorEnvelope("parse_error", error instanceof Error ? error.message : String(error));
}
try {
const output = await deps.runPipeline({
pipeline: parsed,
registry: runtime.registry,
input: [],
stdin: runtime.stdin!,
stdout: runtime.stdout!,
stderr: runtime.stderr!,
env: runtime.env,
mode: "tool",
cwd: runtime.cwd,
llmAdapters: runtime.llmAdapters,
signal: runtime.signal,
});
const approval =
output.halted && output.items.length === 1
? asApprovalRequestItem(output.items[0])
: null;
if (approval) {
const stateKey = await savePipelineResumeState(runtime.env, {
pipeline: parsed,
resumeAtIndex: (output.haltedAt?.index ?? -1) + 1,
items: approval.items,
prompt: approval.prompt,
createdAt: new Date().toISOString(),
});
const resumeToken = deps.encodeToken({
protocolVersion: 1,
v: 1,
kind: "pipeline-resume",
stateKey,
});
return okEnvelope("needs_approval", [], {
type: "approval_request",
prompt: approval.prompt,
items: approval.items,
resumeToken,
});
}
return okEnvelope("ok", output.items, null);
} catch (error) {
return errorEnvelope(
"runtime_error",
error instanceof Error ? error.message : String(error),
);
}
},
async resumeToolRequest({ token, approved, ctx = {} }) {
const runtime = createToolContext(ctx);
let payload: { kind?: string; stateKey?: string; filePath?: string };
try {
payload = deps.decodeResumeToken(token);
} catch (error) {
return errorEnvelope("parse_error", error instanceof Error ? error.message : String(error));
}
if (!approved) {
if (payload.kind === "workflow-file" && payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
if (payload.kind === "pipeline-resume" && payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
return okEnvelope("cancelled", [], null);
}
if (payload.kind === "workflow-file" && payload.filePath) {
try {
const output = await deps.runWorkflowFile({
filePath: payload.filePath,
ctx: runtime,
resume: payload as Record<string, unknown>,
approved: true,
});
if (output.status === "needs_approval") {
return okEnvelope("needs_approval", [], output.requiresApproval ?? null);
}
if (output.status === "cancelled") {
return okEnvelope("cancelled", [], null);
}
return okEnvelope("ok", output.output, null);
} catch (error) {
return errorEnvelope(
"runtime_error",
error instanceof Error ? error.message : String(error),
);
}
}
try {
const resumeState = await loadPipelineResumeState(runtime.env, payload.stateKey ?? "");
const remaining = resumeState.pipeline.slice(resumeState.resumeAtIndex);
const output = await deps.runPipeline({
pipeline: remaining,
registry: runtime.registry,
input: streamFromItems(resumeState.items),
stdin: runtime.stdin!,
stdout: runtime.stdout!,
stderr: runtime.stderr!,
env: runtime.env,
mode: "tool",
cwd: runtime.cwd,
llmAdapters: runtime.llmAdapters,
signal: runtime.signal,
});
const approval =
output.halted && output.items.length === 1
? asApprovalRequestItem(output.items[0])
: null;
if (approval) {
const nextStateKey = await savePipelineResumeState(runtime.env, {
pipeline: remaining,
resumeAtIndex: (output.haltedAt?.index ?? -1) + 1,
items: approval.items,
prompt: approval.prompt,
createdAt: new Date().toISOString(),
});
if (payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
const resumeToken = deps.encodeToken({
protocolVersion: 1,
v: 1,
kind: "pipeline-resume",
stateKey: nextStateKey,
});
return okEnvelope("needs_approval", [], {
type: "approval_request",
prompt: approval.prompt,
items: approval.items,
resumeToken,
});
}
if (payload.stateKey) {
await deps.deleteStateJson({ env: runtime.env, key: payload.stateKey });
}
return okEnvelope("ok", output.items, null);
} catch (error) {
return errorEnvelope(
"runtime_error",
error instanceof Error ? error.message : String(error),
);
}
},
};
}
async function importInstalledLobsterModule<T>(
lobsterRoot: string,
relativePath: string,
): Promise<T> {
const target = path.join(lobsterRoot, relativePath);
return (await import(pathToFileURL(target).href)) as T;
}
function resolveInstalledLobsterRoot() {
const require = createRequire(import.meta.url);
const sdkEntry = require.resolve("@clawdbot/lobster");
return path.resolve(path.dirname(sdkEntry), "../../..");
}
async function loadEmbeddedToolRuntimeFromPackage(): Promise<EmbeddedToolRuntime> {
const lobsterRoot = resolveInstalledLobsterRoot();
const coreIndexPath = path.join(lobsterRoot, "dist/src/core/index.js");
try {
const core = await import(pathToFileURL(coreIndexPath).href);
if (typeof core.runToolRequest === "function" && typeof core.resumeToolRequest === "function") {
return {
runToolRequest: core.runToolRequest as EmbeddedToolRuntime["runToolRequest"],
resumeToolRequest: core.resumeToolRequest as EmbeddedToolRuntime["resumeToolRequest"],
};
}
} catch {
// The current published npm package does not export/ship ./core yet.
}
const [
registryModule,
parserModule,
resumeModule,
tokenModule,
runtimeModule,
workflowModule,
storeModule,
] = await Promise.all([
importInstalledLobsterModule<{
createDefaultRegistry: ToolRuntimeDeps["createDefaultRegistry"];
}>(lobsterRoot, "dist/src/commands/registry.js"),
importInstalledLobsterModule<{ parsePipeline: ToolRuntimeDeps["parsePipeline"] }>(
lobsterRoot,
"dist/src/parser.js",
),
importInstalledLobsterModule<{ decodeResumeToken: ToolRuntimeDeps["decodeResumeToken"] }>(
lobsterRoot,
"dist/src/resume.js",
),
importInstalledLobsterModule<{ encodeToken: ToolRuntimeDeps["encodeToken"] }>(
lobsterRoot,
"dist/src/token.js",
),
importInstalledLobsterModule<{ runPipeline: ToolRuntimeDeps["runPipeline"] }>(
lobsterRoot,
"dist/src/runtime.js",
),
importInstalledLobsterModule<{ runWorkflowFile: ToolRuntimeDeps["runWorkflowFile"] }>(
lobsterRoot,
"dist/src/workflows/file.js",
),
importInstalledLobsterModule<{
readStateJson: ToolRuntimeDeps["readStateJson"];
writeStateJson: ToolRuntimeDeps["writeStateJson"];
deleteStateJson: ToolRuntimeDeps["deleteStateJson"];
}>(lobsterRoot, "dist/src/state/store.js"),
]);
return createFallbackEmbeddedToolRuntime({
createDefaultRegistry: registryModule.createDefaultRegistry,
parsePipeline: parserModule.parsePipeline,
decodeResumeToken: resumeModule.decodeResumeToken,
encodeToken: tokenModule.encodeToken,
runPipeline: runtimeModule.runPipeline,
runWorkflowFile: workflowModule.runWorkflowFile,
readStateJson: storeModule.readStateJson,
writeStateJson: storeModule.writeStateJson,
deleteStateJson: storeModule.deleteStateJson,
});
}
export function createEmbeddedLobsterRunner(options?: {
loadRuntime?: LoadEmbeddedToolRuntime;
}): LobsterRunner {
const loadRuntime = options?.loadRuntime ?? loadEmbeddedToolRuntimeFromPackage;
return {
async run(params) {
const runtime = await loadRuntime();
return await withTimeout(params.timeoutMs, async (signal) => {
const ctx = createEmbeddedToolContext(params, signal);
if (params.action === "run") {
const pipeline = params.pipeline?.trim() ?? "";
if (!pipeline) {
throw new Error("pipeline required");
}
const filePath = await detectWorkflowFile(pipeline, params.cwd);
if (filePath) {
const parsedArgsJson = params.argsJson?.trim() ?? "";
let args: Record<string, unknown> | undefined;
if (parsedArgsJson) {
try {
args = parseWorkflowArgs(parsedArgsJson);
} catch {
throw new Error("run --args-json must be valid JSON");
}
}
return throwOnErrorEnvelope(
normalizeEnvelope(await runtime.runToolRequest({ filePath, args, ctx })),
);
}
return throwOnErrorEnvelope(
normalizeEnvelope(await runtime.runToolRequest({ pipeline, ctx })),
);
}
const token = params.token?.trim() ?? "";
if (!token) {
throw new Error("token required");
}
if (typeof params.approve !== "boolean") {
throw new Error("approve required");
}
return throwOnErrorEnvelope(
normalizeEnvelope(
await runtime.resumeToolRequest({
token,
approved: params.approve,
ctx,
}),
),
);
});
},
};
}

View File

@@ -1,31 +1,6 @@
import { EventEmitter } from "node:events";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { PassThrough } from "node:stream";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { createTestPluginApi } from "../../../test/helpers/plugins/plugin-api.js";
import type { OpenClawPluginApi, OpenClawPluginToolContext } from "../runtime-api.js";
import {
createWindowsCmdShimFixture,
restorePlatformPathEnv,
setProcessPlatform,
snapshotPlatformPathEnv,
} from "./test-helpers.js";
import { resolveWindowsLobsterSpawn } from "./windows-spawn.js";
const spawnState = vi.hoisted(() => ({
queue: [] as Array<{ stdout: string; stderr?: string; exitCode?: number }>,
spawn: vi.fn(),
}));
vi.mock("node:child_process", async () => {
const actual = await vi.importActual<typeof import("node:child_process")>("node:child_process");
return {
...actual,
spawn: (...args: unknown[]) => spawnState.spawn(...args),
};
});
let createLobsterTool: typeof import("./lobster-tool.js").createLobsterTool;
@@ -55,154 +30,124 @@ function fakeCtx(overrides: Partial<OpenClawPluginToolContext> = {}): OpenClawPl
};
}
async function expectUnwrappedShim(params: {
scriptPath: string;
shimPath: string;
shimLine: string;
}) {
await createWindowsCmdShimFixture(params);
const target = resolveWindowsLobsterSpawn(params.shimPath, ["run", "noop"], process.env);
expect(target.command).toBe(process.execPath);
expect(target.argv).toEqual([params.scriptPath, "run", "noop"]);
expect(target.windowsHide).toBe(true);
}
describe("lobster plugin tool", () => {
let tempDir = "";
const originalProcessState = snapshotPlatformPathEnv();
beforeAll(async () => {
it("returns the Lobster envelope in details", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-plugin-"));
});
afterEach(() => {
restorePlatformPathEnv(originalProcessState);
});
afterAll(async () => {
if (!tempDir) {
return;
}
if (process.platform === "win32") {
await fs.rm(tempDir, { recursive: true, force: true, maxRetries: 10, retryDelay: 50 });
} else {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
beforeEach(() => {
spawnState.queue.length = 0;
spawnState.spawn.mockReset();
spawnState.spawn.mockImplementation(() => {
const next = spawnState.queue.shift() ?? { stdout: "" };
const stdout = new PassThrough();
const stderr = new PassThrough();
const child = new EventEmitter() as EventEmitter & {
stdout: PassThrough;
stderr: PassThrough;
kill: (signal?: string) => boolean;
};
child.stdout = stdout;
child.stderr = stderr;
child.kill = () => true;
setImmediate(() => {
if (next.stderr) {
stderr.end(next.stderr);
} else {
stderr.end();
}
stdout.end(next.stdout);
child.emit("exit", next.exitCode ?? 0);
});
return child;
});
});
const queueSuccessfulEnvelope = (hello = "world") => {
spawnState.queue.push({
stdout: JSON.stringify({
ok: true,
status: "ok",
output: [{ hello }],
requiresApproval: null,
}),
});
};
it("runs lobster and returns parsed envelope in details", async () => {
spawnState.queue.push({
stdout: JSON.stringify({
const runner = {
run: vi.fn().mockResolvedValue({
ok: true,
status: "ok",
output: [{ hello: "world" }],
requiresApproval: null,
}),
});
};
const tool = createLobsterTool(fakeApi());
const tool = createLobsterTool(fakeApi(), { runner });
const res = await tool.execute("call1", {
action: "run",
pipeline: "noop",
timeoutMs: 1000,
});
expect(spawnState.spawn).toHaveBeenCalled();
expect(res.details).toMatchObject({ ok: true, status: "ok" });
});
it("tolerates noisy stdout before the JSON envelope", async () => {
const payload = { ok: true, status: "ok", output: [], requiresApproval: null };
spawnState.queue.push({
stdout: `noise before json\n${JSON.stringify(payload)}`,
});
const tool = createLobsterTool(fakeApi());
const res = await tool.execute("call-noisy", {
expect(runner.run).toHaveBeenCalledWith({
action: "run",
pipeline: "noop",
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 512_000,
});
expect(res.details).toMatchObject({
ok: true,
status: "ok",
output: [{ hello: "world" }],
requiresApproval: null,
});
});
it("supports approval envelopes without changing the tool contract", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
const runner = {
run: vi.fn().mockResolvedValue({
ok: true,
status: "needs_approval",
output: [],
requiresApproval: {
type: "approval_request",
prompt: "Send these alerts?",
items: [{ id: "alert-1" }],
resumeToken: "resume-token-1",
},
}),
};
const tool = createLobsterTool(fakeApi(), { runner });
const res = await tool.execute("call-injected-runner", {
action: "run",
pipeline: "noop",
argsJson: '{"since_hours":1}',
timeoutMs: 1500,
maxStdoutBytes: 4096,
});
expect(res.details).toMatchObject({ ok: true, status: "ok" });
expect(runner.run).toHaveBeenCalledWith({
action: "run",
pipeline: "noop",
argsJson: '{"since_hours":1}',
cwd: process.cwd(),
timeoutMs: 1500,
maxStdoutBytes: 4096,
});
expect(res.details).toMatchObject({
ok: true,
status: "needs_approval",
requiresApproval: {
type: "approval_request",
prompt: "Send these alerts?",
resumeToken: "resume-token-1",
},
});
});
it("throws when the runner returns an error envelope", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
const tool = createLobsterTool(fakeApi(), {
runner: {
run: vi.fn().mockResolvedValue({
ok: false,
error: {
type: "runtime_error",
message: "boom",
},
}),
},
});
await expect(
tool.execute("call-runner-error", {
action: "run",
pipeline: "noop",
}),
).rejects.toThrow("boom");
});
it("requires action", async () => {
const tool = createLobsterTool(fakeApi());
({ createLobsterTool } = await import("./lobster-tool.js"));
const tool = createLobsterTool(fakeApi(), {
runner: { run: vi.fn() },
});
await expect(tool.execute("call-action-missing", {})).rejects.toThrow(/action required/);
});
it("requires pipeline for run action", async () => {
const tool = createLobsterTool(fakeApi());
await expect(
tool.execute("call-pipeline-missing", {
action: "run",
}),
).rejects.toThrow(/pipeline required/);
});
it("requires token and approve for resume action", async () => {
const tool = createLobsterTool(fakeApi());
await expect(
tool.execute("call-resume-token-missing", {
action: "resume",
approve: true,
}),
).rejects.toThrow(/token required/);
await expect(
tool.execute("call-resume-approve-missing", {
action: "resume",
token: "resume-token",
}),
).rejects.toThrow(/approve required/);
});
it("rejects unknown action", async () => {
const tool = createLobsterTool(fakeApi());
({ createLobsterTool } = await import("./lobster-tool.js"));
const tool = createLobsterTool(fakeApi(), {
runner: { run: vi.fn() },
});
await expect(
tool.execute("call-action-unknown", {
action: "explode",
@@ -211,9 +156,13 @@ describe("lobster plugin tool", () => {
});
it("rejects absolute cwd", async () => {
const tool = createLobsterTool(fakeApi());
({ createLobsterTool } = await import("./lobster-tool.js"));
const tool = createLobsterTool(fakeApi(), {
runner: { run: vi.fn() },
});
await expect(
tool.execute("call2c", {
tool.execute("call-absolute-cwd", {
action: "run",
pipeline: "noop",
cwd: "/tmp",
@@ -222,9 +171,13 @@ describe("lobster plugin tool", () => {
});
it("rejects cwd that escapes the gateway working directory", async () => {
const tool = createLobsterTool(fakeApi());
({ createLobsterTool } = await import("./lobster-tool.js"));
const tool = createLobsterTool(fakeApi(), {
runner: { run: vi.fn() },
});
await expect(
tool.execute("call2d", {
tool.execute("call-escape-cwd", {
action: "run",
pipeline: "noop",
cwd: "../../etc",
@@ -232,175 +185,20 @@ describe("lobster plugin tool", () => {
).rejects.toThrow(/must stay within/);
});
it("rejects invalid JSON from lobster", async () => {
spawnState.queue.push({ stdout: "nope" });
const tool = createLobsterTool(fakeApi());
await expect(
tool.execute("call3", {
action: "run",
pipeline: "noop",
}),
).rejects.toThrow(/invalid JSON/);
});
it("runs Windows cmd shims through Node without enabling shell", async () => {
setProcessPlatform("win32");
const shimScriptPath = path.join(tempDir, "shim-dist", "lobster-cli.cjs");
const shimPath = path.join(tempDir, "shim-bin", "lobster.cmd");
await createWindowsCmdShimFixture({
shimPath,
scriptPath: shimScriptPath,
shimLine: `"%dp0%\\..\\shim-dist\\lobster-cli.cjs" %*`,
});
process.env.PATHEXT = ".CMD;.EXE";
process.env.PATH = `${path.dirname(shimPath)};${process.env.PATH ?? ""}`;
queueSuccessfulEnvelope();
const tool = createLobsterTool(fakeApi());
await tool.execute("call-win-shim", {
action: "run",
pipeline: "noop",
});
const [command, argv, options] = spawnState.spawn.mock.calls[0] ?? [];
expect(command).toBe(process.execPath);
expect(argv).toEqual([shimScriptPath, "run", "--mode", "tool", "noop"]);
expect(options).toMatchObject({ windowsHide: true });
expect(options).not.toHaveProperty("shell");
});
it("does not retry a failed Windows spawn with shell fallback", async () => {
setProcessPlatform("win32");
spawnState.spawn.mockReset();
spawnState.spawn.mockImplementationOnce(() => {
const child = new EventEmitter() as EventEmitter & {
stdout: PassThrough;
stderr: PassThrough;
kill: (signal?: string) => boolean;
};
child.stdout = new PassThrough();
child.stderr = new PassThrough();
child.kill = () => true;
const err = Object.assign(new Error("spawn failed"), { code: "ENOENT" });
setImmediate(() => child.emit("error", err));
return child;
});
const tool = createLobsterTool(fakeApi());
await expect(
tool.execute("call-win-no-retry", {
action: "run",
pipeline: "noop",
}),
).rejects.toThrow(/spawn failed/);
expect(spawnState.spawn).toHaveBeenCalledTimes(1);
});
it("can be gated off in sandboxed contexts", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
const api = fakeApi();
const factoryTool = (ctx: OpenClawPluginToolContext) => {
if (ctx.sandboxed) {
return null;
}
return createLobsterTool(api);
return createLobsterTool(api, {
runner: { run: vi.fn() },
});
};
expect(factoryTool(fakeCtx({ sandboxed: true }))).toBeNull();
expect(factoryTool(fakeCtx({ sandboxed: false }))?.name).toBe("lobster");
});
});
describe("resolveWindowsLobsterSpawn", () => {
let tempDir = "";
const originalProcessState = snapshotPlatformPathEnv();
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lobster-win-spawn-"));
setProcessPlatform("win32");
});
afterEach(async () => {
restorePlatformPathEnv(originalProcessState);
if (tempDir) {
await fs.rm(tempDir, { recursive: true, force: true });
tempDir = "";
}
});
it("unwraps cmd shim with %dp0% token", async () => {
const scriptPath = path.join(tempDir, "shim-dist", "lobster-cli.cjs");
const shimPath = path.join(tempDir, "shim", "lobster.cmd");
await expectUnwrappedShim({
shimPath,
scriptPath,
shimLine: `"%dp0%\\..\\shim-dist\\lobster-cli.cjs" %*`,
});
});
it("unwraps cmd shim with %~dp0% token", async () => {
const scriptPath = path.join(tempDir, "shim-dist", "lobster-cli.cjs");
const shimPath = path.join(tempDir, "shim", "lobster.cmd");
await expectUnwrappedShim({
shimPath,
scriptPath,
shimLine: `"%~dp0%\\..\\shim-dist\\lobster-cli.cjs" %*`,
});
});
it("ignores node.exe shim entries and picks lobster script", async () => {
const shimDir = path.join(tempDir, "shim-with-node");
const scriptPath = path.join(tempDir, "shim-dist-node", "lobster-cli.cjs");
const shimPath = path.join(shimDir, "lobster.cmd");
await fs.mkdir(path.dirname(scriptPath), { recursive: true });
await fs.mkdir(shimDir, { recursive: true });
await fs.writeFile(path.join(shimDir, "node.exe"), "", "utf8");
await fs.writeFile(scriptPath, "module.exports = {};\n", "utf8");
await fs.writeFile(
shimPath,
`@echo off\r\n"%~dp0%\\node.exe" "%~dp0%\\..\\shim-dist-node\\lobster-cli.cjs" %*\r\n`,
"utf8",
);
const target = resolveWindowsLobsterSpawn(shimPath, ["run", "noop"], process.env);
expect(target.command).toBe(process.execPath);
expect(target.argv).toEqual([scriptPath, "run", "noop"]);
expect(target.windowsHide).toBe(true);
});
it("resolves lobster.cmd from PATH and unwraps npm layout shim", async () => {
const binDir = path.join(tempDir, "node_modules", ".bin");
const packageDir = path.join(tempDir, "node_modules", "lobster");
const scriptPath = path.join(packageDir, "dist", "cli.js");
const shimPath = path.join(binDir, "lobster.cmd");
await fs.mkdir(path.dirname(scriptPath), { recursive: true });
await fs.mkdir(binDir, { recursive: true });
await fs.writeFile(shimPath, "@echo off\r\n", "utf8");
await fs.writeFile(
path.join(packageDir, "package.json"),
JSON.stringify({ name: "lobster", version: "0.0.0", bin: { lobster: "dist/cli.js" } }),
"utf8",
);
await fs.writeFile(scriptPath, "module.exports = {};\n", "utf8");
const env = {
...process.env,
PATH: `${binDir};${process.env.PATH ?? ""}`,
PATHEXT: ".CMD;.EXE",
};
const target = resolveWindowsLobsterSpawn("lobster", ["run", "noop"], env);
expect(target.command).toBe(process.execPath);
expect(target.argv).toEqual([scriptPath, "run", "noop"]);
expect(target.windowsHide).toBe(true);
});
it("fails fast when wrapper cannot be resolved without shell execution", async () => {
const badShimPath = path.join(tempDir, "bad-shim", "lobster.cmd");
await fs.mkdir(path.dirname(badShimPath), { recursive: true });
await fs.writeFile(badShimPath, "@echo off\r\nREM no entrypoint\r\n", "utf8");
expect(() => resolveWindowsLobsterSpawn(badShimPath, ["run", "noop"], process.env)).toThrow(
/without shell execution/,
);
});
});

View File

@@ -1,213 +1,14 @@
import { spawn } from "node:child_process";
import path from "node:path";
import { Type } from "@sinclair/typebox";
import type { OpenClawPluginApi } from "../runtime-api.js";
import { resolveWindowsLobsterSpawn } from "./windows-spawn.js";
import {
createEmbeddedLobsterRunner,
resolveLobsterCwd,
type LobsterRunner,
type LobsterRunnerParams,
} from "./lobster-runner.js";
type LobsterEnvelope =
| {
ok: true;
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval: null | {
type: "approval_request";
prompt: string;
items: unknown[];
resumeToken?: string;
};
}
| {
ok: false;
error: { type?: string; message: string };
};
function normalizeForCwdSandbox(p: string): string {
const normalized = path.normalize(p);
return process.platform === "win32" ? normalized.toLowerCase() : normalized;
}
function resolveCwd(cwdRaw: unknown): string {
if (typeof cwdRaw !== "string" || !cwdRaw.trim()) {
return process.cwd();
}
const cwd = cwdRaw.trim();
if (path.isAbsolute(cwd)) {
throw new Error("cwd must be a relative path");
}
const base = process.cwd();
const resolved = path.resolve(base, cwd);
const rel = path.relative(normalizeForCwdSandbox(base), normalizeForCwdSandbox(resolved));
if (rel === "" || rel === ".") {
return resolved;
}
if (rel.startsWith("..") || path.isAbsolute(rel)) {
throw new Error("cwd must stay within the gateway working directory");
}
return resolved;
}
async function runLobsterSubprocessOnce(params: {
execPath: string;
argv: string[];
cwd: string;
timeoutMs: number;
maxStdoutBytes: number;
}) {
const { execPath, argv, cwd } = params;
const timeoutMs = Math.max(200, params.timeoutMs);
const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes);
const env = { ...process.env, LOBSTER_MODE: "tool" } as Record<string, string | undefined>;
const nodeOptions = env.NODE_OPTIONS ?? "";
if (nodeOptions.includes("--inspect")) {
delete env.NODE_OPTIONS;
}
const spawnTarget =
process.platform === "win32"
? resolveWindowsLobsterSpawn(execPath, argv, env)
: { command: execPath, argv };
return await new Promise<{ stdout: string }>((resolve, reject) => {
const child = spawn(spawnTarget.command, spawnTarget.argv, {
cwd,
stdio: ["ignore", "pipe", "pipe"],
env,
windowsHide: spawnTarget.windowsHide,
});
let stdout = "";
let stdoutBytes = 0;
let stderr = "";
let settled = false;
const settle = (
result: { ok: true; value: { stdout: string } } | { ok: false; error: Error },
) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
if (result.ok) {
resolve(result.value);
} else {
reject(result.error);
}
};
const failAndTerminate = (message: string) => {
try {
child.kill("SIGKILL");
} finally {
settle({ ok: false, error: new Error(message) });
}
};
child.stdout?.setEncoding("utf8");
child.stderr?.setEncoding("utf8");
child.stdout?.on("data", (chunk) => {
const str = String(chunk);
stdoutBytes += Buffer.byteLength(str, "utf8");
if (stdoutBytes > maxStdoutBytes) {
failAndTerminate("lobster output exceeded maxStdoutBytes");
return;
}
stdout += str;
});
child.stderr?.on("data", (chunk) => {
stderr += String(chunk);
});
const timer = setTimeout(() => {
failAndTerminate("lobster subprocess timed out");
}, timeoutMs);
child.once("error", (err) => {
settle({ ok: false, error: err });
});
child.once("exit", (code) => {
if (code !== 0) {
settle({
ok: false,
error: new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`),
});
return;
}
settle({ ok: true, value: { stdout } });
});
});
}
function parseEnvelope(stdout: string): LobsterEnvelope {
const trimmed = stdout.trim();
const tryParse = (input: string) => {
try {
return JSON.parse(input) as unknown;
} catch {
return undefined;
}
};
let parsed: unknown = tryParse(trimmed);
// Some environments can leak extra stdout (e.g. warnings/logs) before the
// final JSON envelope. Be tolerant and parse the last JSON-looking suffix.
if (parsed === undefined) {
const suffixMatch = trimmed.match(/({[\s\S]*}|\[[\s\S]*])\s*$/);
if (suffixMatch?.[1]) {
parsed = tryParse(suffixMatch[1]);
}
}
if (parsed === undefined) {
throw new Error("lobster returned invalid JSON");
}
if (!parsed || typeof parsed !== "object") {
throw new Error("lobster returned invalid JSON envelope");
}
const ok = (parsed as { ok?: unknown }).ok;
if (ok === true || ok === false) {
return parsed as LobsterEnvelope;
}
throw new Error("lobster returned invalid JSON envelope");
}
function buildLobsterArgv(action: string, params: Record<string, unknown>): string[] {
if (action === "run") {
const pipeline = typeof params.pipeline === "string" ? params.pipeline : "";
if (!pipeline.trim()) {
throw new Error("pipeline required");
}
const argv = ["run", "--mode", "tool", pipeline];
const argsJson = typeof params.argsJson === "string" ? params.argsJson : "";
if (argsJson.trim()) {
argv.push("--args-json", argsJson);
}
return argv;
}
if (action === "resume") {
const token = typeof params.token === "string" ? params.token : "";
if (!token.trim()) {
throw new Error("token required");
}
const approve = params.approve;
if (typeof approve !== "boolean") {
throw new Error("approve required");
}
return ["resume", "--token", token, "--approve", approve ? "yes" : "no"];
}
throw new Error(`Unknown action: ${action}`);
}
export function createLobsterTool(api: OpenClawPluginApi) {
export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: LobsterRunner }) {
const runner = options?.runner ?? createEmbeddedLobsterRunner();
return {
name: "lobster",
label: "Lobster Workflow",
@@ -234,28 +35,33 @@ export function createLobsterTool(api: OpenClawPluginApi) {
if (!action) {
throw new Error("action required");
}
if (action !== "run" && action !== "resume") {
throw new Error(`Unknown action: ${action}`);
}
const execPath = "lobster";
const cwd = resolveCwd(params.cwd);
const cwd = resolveLobsterCwd(params.cwd);
const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 20_000;
const maxStdoutBytes =
typeof params.maxStdoutBytes === "number" ? params.maxStdoutBytes : 512_000;
const argv = buildLobsterArgv(action, params);
if (api.runtime?.version && api.logger?.debug) {
api.logger.debug(`lobster plugin runtime=${api.runtime.version}`);
}
const { stdout } = await runLobsterSubprocessOnce({
execPath,
argv,
const runnerParams: LobsterRunnerParams = {
action,
...(typeof params.pipeline === "string" ? { pipeline: params.pipeline } : {}),
...(typeof params.argsJson === "string" ? { argsJson: params.argsJson } : {}),
...(typeof params.token === "string" ? { token: params.token } : {}),
...(typeof params.approve === "boolean" ? { approve: params.approve } : {}),
cwd,
timeoutMs,
maxStdoutBytes,
});
const envelope = parseEnvelope(stdout);
};
const envelope = await runner.run(runnerParams);
if (!envelope.ok) {
throw new Error(envelope.error.message);
}
return {
content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }],

View File

@@ -1,36 +0,0 @@
import {
applyWindowsSpawnProgramPolicy,
materializeWindowsSpawnProgram,
resolveWindowsSpawnProgramCandidate,
} from "../runtime-api.js";
type SpawnTarget = {
command: string;
argv: string[];
windowsHide?: boolean;
};
export function resolveWindowsLobsterSpawn(
execPath: string,
argv: string[],
env: NodeJS.ProcessEnv,
): SpawnTarget {
const candidate = resolveWindowsSpawnProgramCandidate({
command: execPath,
env,
packageName: "lobster",
});
const program = applyWindowsSpawnProgramPolicy({
candidate,
allowShellFallback: false,
});
const resolved = materializeWindowsSpawnProgram(program, argv);
if (resolved.shell) {
throw new Error("lobster wrapper resolved to shell fallback unexpectedly");
}
return {
command: resolved.command,
argv: resolved.argv,
windowsHide: resolved.windowsHide,
};
}

13
pnpm-lock.yaml generated
View File

@@ -447,6 +447,9 @@ importers:
extensions/lobster:
dependencies:
'@clawdbot/lobster':
specifier: 2026.1.24
version: 2026.1.24
'@sinclair/typebox':
specifier: 0.34.49
version: 0.34.49
@@ -1108,6 +1111,11 @@ packages:
'@clack/prompts@1.2.0':
resolution: {integrity: sha512-4jmztR9fMqPMjz6H/UZXj0zEmE43ha1euENwkckKKel4XpSfokExPo5AiVStdHSAlHekz4d0CA/r45Ok1E4D3w==}
'@clawdbot/lobster@2026.1.24':
resolution: {integrity: sha512-vHrMy4NErcq6suyGByfQSdalnvaMu4dRd10BJdeMp60V6cYtuHJSR2Ay5l0kb4iSPyk4dZKrXNNpLzeqHRcAfA==}
engines: {node: '>=20'}
hasBin: true
'@cloudflare/workers-types@4.20260120.0':
resolution: {integrity: sha512-B8pueG+a5S+mdK3z8oKu1ShcxloZ7qWb68IEyLLaepvdryIbNC7JVPcY0bWsjS56UQVKc5fnyRge3yZIwc9bxw==}
@@ -7388,6 +7396,11 @@ snapshots:
fast-wrap-ansi: 0.1.6
sisteransi: 1.0.5
'@clawdbot/lobster@2026.1.24':
dependencies:
ajv: 8.18.0
yaml: 2.8.3
'@cloudflare/workers-types@4.20260120.0':
optional: true