mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-13 11:00:50 +00:00
`Math.min(250, deadline - Date.now())` could return a negative value if the deadline expired between the while-condition check and the setTimeout call. Wrap with `Math.max(0, ...)` to ensure the sleep is never negative. Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
656 lines
22 KiB
TypeScript
656 lines
22 KiB
TypeScript
import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core";
|
|
import { Type } from "@sinclair/typebox";
|
|
import { formatDurationCompact } from "../infra/format-time/format-duration.ts";
|
|
import { getDiagnosticSessionState } from "../logging/diagnostic-session-state.js";
|
|
import { killProcessTree } from "../process/kill-tree.js";
|
|
import { getProcessSupervisor } from "../process/supervisor/index.js";
|
|
import {
|
|
type ProcessSession,
|
|
deleteSession,
|
|
drainSession,
|
|
getFinishedSession,
|
|
getSession,
|
|
listFinishedSessions,
|
|
listRunningSessions,
|
|
markExited,
|
|
setJobTtlMs,
|
|
} from "./bash-process-registry.js";
|
|
import { deriveSessionName, pad, sliceLogLines, truncateMiddle } from "./bash-tools.shared.js";
|
|
import { recordCommandPoll, resetCommandPollCount } from "./command-poll-backoff.js";
|
|
import { encodeKeySequence, encodePaste } from "./pty-keys.js";
|
|
|
|
export type ProcessToolDefaults = {
|
|
cleanupMs?: number;
|
|
scopeKey?: string;
|
|
};
|
|
|
|
type WritableStdin = {
|
|
write: (data: string, cb?: (err?: Error | null) => void) => void;
|
|
end: () => void;
|
|
destroyed?: boolean;
|
|
};
|
|
const DEFAULT_LOG_TAIL_LINES = 200;
|
|
|
|
function resolveLogSliceWindow(offset?: number, limit?: number) {
|
|
const usingDefaultTail = offset === undefined && limit === undefined;
|
|
const effectiveLimit =
|
|
typeof limit === "number" && Number.isFinite(limit)
|
|
? limit
|
|
: usingDefaultTail
|
|
? DEFAULT_LOG_TAIL_LINES
|
|
: undefined;
|
|
return { effectiveOffset: offset, effectiveLimit, usingDefaultTail };
|
|
}
|
|
|
|
function defaultTailNote(totalLines: number, usingDefaultTail: boolean) {
|
|
if (!usingDefaultTail || totalLines <= DEFAULT_LOG_TAIL_LINES) {
|
|
return "";
|
|
}
|
|
return `\n\n[showing last ${DEFAULT_LOG_TAIL_LINES} of ${totalLines} lines; pass offset/limit to page]`;
|
|
}
|
|
|
|
const processSchema = Type.Object({
|
|
action: Type.String({ description: "Process action" }),
|
|
sessionId: Type.Optional(Type.String({ description: "Session id for actions other than list" })),
|
|
data: Type.Optional(Type.String({ description: "Data to write for write" })),
|
|
keys: Type.Optional(
|
|
Type.Array(Type.String(), { description: "Key tokens to send for send-keys" }),
|
|
),
|
|
hex: Type.Optional(Type.Array(Type.String(), { description: "Hex bytes to send for send-keys" })),
|
|
literal: Type.Optional(Type.String({ description: "Literal string for send-keys" })),
|
|
text: Type.Optional(Type.String({ description: "Text to paste for paste" })),
|
|
bracketed: Type.Optional(Type.Boolean({ description: "Wrap paste in bracketed mode" })),
|
|
eof: Type.Optional(Type.Boolean({ description: "Close stdin after write" })),
|
|
offset: Type.Optional(Type.Number({ description: "Log offset" })),
|
|
limit: Type.Optional(Type.Number({ description: "Log length" })),
|
|
timeout: Type.Optional(
|
|
Type.Number({
|
|
description: "For poll: wait up to this many milliseconds before returning",
|
|
minimum: 0,
|
|
}),
|
|
),
|
|
});
|
|
|
|
const MAX_POLL_WAIT_MS = 120_000;
|
|
|
|
function resolvePollWaitMs(value: unknown) {
|
|
if (typeof value === "number" && Number.isFinite(value)) {
|
|
return Math.max(0, Math.min(MAX_POLL_WAIT_MS, Math.floor(value)));
|
|
}
|
|
if (typeof value === "string") {
|
|
const parsed = Number.parseInt(value.trim(), 10);
|
|
if (Number.isFinite(parsed)) {
|
|
return Math.max(0, Math.min(MAX_POLL_WAIT_MS, parsed));
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
function failText(text: string): AgentToolResult<unknown> {
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text,
|
|
},
|
|
],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
|
|
function recordPollRetrySuggestion(sessionId: string, hasNewOutput: boolean): number | undefined {
|
|
try {
|
|
const sessionState = getDiagnosticSessionState({ sessionId });
|
|
return recordCommandPoll(sessionState, sessionId, hasNewOutput);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
function resetPollRetrySuggestion(sessionId: string): void {
|
|
try {
|
|
const sessionState = getDiagnosticSessionState({ sessionId });
|
|
resetCommandPollCount(sessionState, sessionId);
|
|
} catch {
|
|
// Ignore diagnostics state failures for process tool behavior.
|
|
}
|
|
}
|
|
|
|
export function createProcessTool(
|
|
defaults?: ProcessToolDefaults,
|
|
// oxlint-disable-next-line typescript/no-explicit-any
|
|
): AgentTool<any, unknown> {
|
|
if (defaults?.cleanupMs !== undefined) {
|
|
setJobTtlMs(defaults.cleanupMs);
|
|
}
|
|
const scopeKey = defaults?.scopeKey;
|
|
const supervisor = getProcessSupervisor();
|
|
const isInScope = (session?: { scopeKey?: string } | null) =>
|
|
!scopeKey || session?.scopeKey === scopeKey;
|
|
|
|
const cancelManagedSession = (sessionId: string) => {
|
|
const record = supervisor.getRecord(sessionId);
|
|
if (!record || record.state === "exited") {
|
|
return false;
|
|
}
|
|
supervisor.cancel(sessionId, "manual-cancel");
|
|
return true;
|
|
};
|
|
|
|
const terminateSessionFallback = (session: ProcessSession) => {
|
|
const pid = session.pid ?? session.child?.pid;
|
|
if (typeof pid !== "number" || !Number.isFinite(pid) || pid <= 0) {
|
|
return false;
|
|
}
|
|
killProcessTree(pid);
|
|
return true;
|
|
};
|
|
|
|
return {
|
|
name: "process",
|
|
label: "process",
|
|
description:
|
|
"Manage running exec sessions: list, poll, log, write, send-keys, submit, paste, kill.",
|
|
parameters: processSchema,
|
|
execute: async (_toolCallId, args, _signal, _onUpdate): Promise<AgentToolResult<unknown>> => {
|
|
const params = args as {
|
|
action:
|
|
| "list"
|
|
| "poll"
|
|
| "log"
|
|
| "write"
|
|
| "send-keys"
|
|
| "submit"
|
|
| "paste"
|
|
| "kill"
|
|
| "clear"
|
|
| "remove";
|
|
sessionId?: string;
|
|
data?: string;
|
|
keys?: string[];
|
|
hex?: string[];
|
|
literal?: string;
|
|
text?: string;
|
|
bracketed?: boolean;
|
|
eof?: boolean;
|
|
offset?: number;
|
|
limit?: number;
|
|
timeout?: unknown;
|
|
};
|
|
|
|
if (params.action === "list") {
|
|
const running = listRunningSessions()
|
|
.filter((s) => isInScope(s))
|
|
.map((s) => ({
|
|
sessionId: s.id,
|
|
status: "running",
|
|
pid: s.pid ?? undefined,
|
|
startedAt: s.startedAt,
|
|
runtimeMs: Date.now() - s.startedAt,
|
|
cwd: s.cwd,
|
|
command: s.command,
|
|
name: deriveSessionName(s.command),
|
|
tail: s.tail,
|
|
truncated: s.truncated,
|
|
}));
|
|
const finished = listFinishedSessions()
|
|
.filter((s) => isInScope(s))
|
|
.map((s) => ({
|
|
sessionId: s.id,
|
|
status: s.status,
|
|
startedAt: s.startedAt,
|
|
endedAt: s.endedAt,
|
|
runtimeMs: s.endedAt - s.startedAt,
|
|
cwd: s.cwd,
|
|
command: s.command,
|
|
name: deriveSessionName(s.command),
|
|
tail: s.tail,
|
|
truncated: s.truncated,
|
|
exitCode: s.exitCode ?? undefined,
|
|
exitSignal: s.exitSignal ?? undefined,
|
|
}));
|
|
const lines = [...running, ...finished]
|
|
.toSorted((a, b) => b.startedAt - a.startedAt)
|
|
.map((s) => {
|
|
const label = s.name ? truncateMiddle(s.name, 80) : truncateMiddle(s.command, 120);
|
|
return `${s.sessionId} ${pad(s.status, 9)} ${formatDurationCompact(s.runtimeMs) ?? "n/a"} :: ${label}`;
|
|
});
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: lines.join("\n") || "No running or recent sessions.",
|
|
},
|
|
],
|
|
details: { status: "completed", sessions: [...running, ...finished] },
|
|
};
|
|
}
|
|
|
|
if (!params.sessionId) {
|
|
return {
|
|
content: [{ type: "text", text: "sessionId is required for this action." }],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
|
|
const session = getSession(params.sessionId);
|
|
const finished = getFinishedSession(params.sessionId);
|
|
const scopedSession = isInScope(session) ? session : undefined;
|
|
const scopedFinished = isInScope(finished) ? finished : undefined;
|
|
|
|
const failedResult = (text: string): AgentToolResult<unknown> => ({
|
|
content: [{ type: "text", text }],
|
|
details: { status: "failed" },
|
|
});
|
|
|
|
const resolveBackgroundedWritableStdin = () => {
|
|
if (!scopedSession) {
|
|
return {
|
|
ok: false as const,
|
|
result: failedResult(`No active session found for ${params.sessionId}`),
|
|
};
|
|
}
|
|
if (!scopedSession.backgrounded) {
|
|
return {
|
|
ok: false as const,
|
|
result: failedResult(`Session ${params.sessionId} is not backgrounded.`),
|
|
};
|
|
}
|
|
const stdin = scopedSession.stdin ?? scopedSession.child?.stdin;
|
|
if (!stdin || stdin.destroyed) {
|
|
return {
|
|
ok: false as const,
|
|
result: failedResult(`Session ${params.sessionId} stdin is not writable.`),
|
|
};
|
|
}
|
|
return { ok: true as const, session: scopedSession, stdin: stdin as WritableStdin };
|
|
};
|
|
|
|
const writeToStdin = async (stdin: WritableStdin, data: string) => {
|
|
await new Promise<void>((resolve, reject) => {
|
|
stdin.write(data, (err) => {
|
|
if (err) {
|
|
reject(err);
|
|
} else {
|
|
resolve();
|
|
}
|
|
});
|
|
});
|
|
};
|
|
|
|
const runningSessionResult = (
|
|
session: ProcessSession,
|
|
text: string,
|
|
): AgentToolResult<unknown> => ({
|
|
content: [{ type: "text", text }],
|
|
details: {
|
|
status: "running",
|
|
sessionId: params.sessionId,
|
|
name: deriveSessionName(session.command),
|
|
},
|
|
});
|
|
|
|
switch (params.action) {
|
|
case "poll": {
|
|
if (!scopedSession) {
|
|
if (scopedFinished) {
|
|
resetPollRetrySuggestion(params.sessionId);
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text:
|
|
(scopedFinished.tail ||
|
|
`(no output recorded${
|
|
scopedFinished.truncated ? " — truncated to cap" : ""
|
|
})`) +
|
|
`\n\nProcess exited with ${
|
|
scopedFinished.exitSignal
|
|
? `signal ${scopedFinished.exitSignal}`
|
|
: `code ${scopedFinished.exitCode ?? 0}`
|
|
}.`,
|
|
},
|
|
],
|
|
details: {
|
|
status: scopedFinished.status === "completed" ? "completed" : "failed",
|
|
sessionId: params.sessionId,
|
|
exitCode: scopedFinished.exitCode ?? undefined,
|
|
aggregated: scopedFinished.aggregated,
|
|
name: deriveSessionName(scopedFinished.command),
|
|
},
|
|
};
|
|
}
|
|
resetPollRetrySuggestion(params.sessionId);
|
|
return failText(`No session found for ${params.sessionId}`);
|
|
}
|
|
if (!scopedSession.backgrounded) {
|
|
return failText(`Session ${params.sessionId} is not backgrounded.`);
|
|
}
|
|
const pollWaitMs = resolvePollWaitMs(params.timeout);
|
|
if (pollWaitMs > 0 && !scopedSession.exited) {
|
|
const deadline = Date.now() + pollWaitMs;
|
|
while (!scopedSession.exited && Date.now() < deadline) {
|
|
await new Promise((resolve) =>
|
|
setTimeout(resolve, Math.max(0, Math.min(250, deadline - Date.now()))),
|
|
);
|
|
}
|
|
}
|
|
const { stdout, stderr } = drainSession(scopedSession);
|
|
const exited = scopedSession.exited;
|
|
const exitCode = scopedSession.exitCode ?? 0;
|
|
const exitSignal = scopedSession.exitSignal ?? undefined;
|
|
if (exited) {
|
|
const status = exitCode === 0 && exitSignal == null ? "completed" : "failed";
|
|
markExited(
|
|
scopedSession,
|
|
scopedSession.exitCode ?? null,
|
|
scopedSession.exitSignal ?? null,
|
|
status,
|
|
);
|
|
}
|
|
const status = exited
|
|
? exitCode === 0 && exitSignal == null
|
|
? "completed"
|
|
: "failed"
|
|
: "running";
|
|
const output = [stdout.trimEnd(), stderr.trimEnd()].filter(Boolean).join("\n").trim();
|
|
const hasNewOutput = output.length > 0;
|
|
const retryInMs = exited
|
|
? undefined
|
|
: recordPollRetrySuggestion(params.sessionId, hasNewOutput);
|
|
if (exited) {
|
|
resetPollRetrySuggestion(params.sessionId);
|
|
}
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text:
|
|
(output || "(no new output)") +
|
|
(exited
|
|
? `\n\nProcess exited with ${
|
|
exitSignal ? `signal ${exitSignal}` : `code ${exitCode}`
|
|
}.`
|
|
: "\n\nProcess still running."),
|
|
},
|
|
],
|
|
details: {
|
|
status,
|
|
sessionId: params.sessionId,
|
|
exitCode: exited ? exitCode : undefined,
|
|
aggregated: scopedSession.aggregated,
|
|
name: deriveSessionName(scopedSession.command),
|
|
...(typeof retryInMs === "number" ? { retryInMs } : {}),
|
|
},
|
|
};
|
|
}
|
|
|
|
case "log": {
|
|
if (scopedSession) {
|
|
if (!scopedSession.backgrounded) {
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: `Session ${params.sessionId} is not backgrounded.`,
|
|
},
|
|
],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
const window = resolveLogSliceWindow(params.offset, params.limit);
|
|
const { slice, totalLines, totalChars } = sliceLogLines(
|
|
scopedSession.aggregated,
|
|
window.effectiveOffset,
|
|
window.effectiveLimit,
|
|
);
|
|
const logDefaultTailNote = defaultTailNote(totalLines, window.usingDefaultTail);
|
|
return {
|
|
content: [{ type: "text", text: (slice || "(no output yet)") + logDefaultTailNote }],
|
|
details: {
|
|
status: scopedSession.exited ? "completed" : "running",
|
|
sessionId: params.sessionId,
|
|
total: totalLines,
|
|
totalLines,
|
|
totalChars,
|
|
truncated: scopedSession.truncated,
|
|
name: deriveSessionName(scopedSession.command),
|
|
},
|
|
};
|
|
}
|
|
if (scopedFinished) {
|
|
const window = resolveLogSliceWindow(params.offset, params.limit);
|
|
const { slice, totalLines, totalChars } = sliceLogLines(
|
|
scopedFinished.aggregated,
|
|
window.effectiveOffset,
|
|
window.effectiveLimit,
|
|
);
|
|
const status = scopedFinished.status === "completed" ? "completed" : "failed";
|
|
const logDefaultTailNote = defaultTailNote(totalLines, window.usingDefaultTail);
|
|
return {
|
|
content: [
|
|
{ type: "text", text: (slice || "(no output recorded)") + logDefaultTailNote },
|
|
],
|
|
details: {
|
|
status,
|
|
sessionId: params.sessionId,
|
|
total: totalLines,
|
|
totalLines,
|
|
totalChars,
|
|
truncated: scopedFinished.truncated,
|
|
exitCode: scopedFinished.exitCode ?? undefined,
|
|
exitSignal: scopedFinished.exitSignal ?? undefined,
|
|
name: deriveSessionName(scopedFinished.command),
|
|
},
|
|
};
|
|
}
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: `No session found for ${params.sessionId}`,
|
|
},
|
|
],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
|
|
case "write": {
|
|
const resolved = resolveBackgroundedWritableStdin();
|
|
if (!resolved.ok) {
|
|
return resolved.result;
|
|
}
|
|
await writeToStdin(resolved.stdin, params.data ?? "");
|
|
if (params.eof) {
|
|
resolved.stdin.end();
|
|
}
|
|
return runningSessionResult(
|
|
resolved.session,
|
|
`Wrote ${(params.data ?? "").length} bytes to session ${params.sessionId}${
|
|
params.eof ? " (stdin closed)" : ""
|
|
}.`,
|
|
);
|
|
}
|
|
|
|
case "send-keys": {
|
|
const resolved = resolveBackgroundedWritableStdin();
|
|
if (!resolved.ok) {
|
|
return resolved.result;
|
|
}
|
|
const { data, warnings } = encodeKeySequence({
|
|
keys: params.keys,
|
|
hex: params.hex,
|
|
literal: params.literal,
|
|
});
|
|
if (!data) {
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: "No key data provided.",
|
|
},
|
|
],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
await writeToStdin(resolved.stdin, data);
|
|
return runningSessionResult(
|
|
resolved.session,
|
|
`Sent ${data.length} bytes to session ${params.sessionId}.` +
|
|
(warnings.length ? `\nWarnings:\n- ${warnings.join("\n- ")}` : ""),
|
|
);
|
|
}
|
|
|
|
case "submit": {
|
|
const resolved = resolveBackgroundedWritableStdin();
|
|
if (!resolved.ok) {
|
|
return resolved.result;
|
|
}
|
|
await writeToStdin(resolved.stdin, "\r");
|
|
return runningSessionResult(
|
|
resolved.session,
|
|
`Submitted session ${params.sessionId} (sent CR).`,
|
|
);
|
|
}
|
|
|
|
case "paste": {
|
|
const resolved = resolveBackgroundedWritableStdin();
|
|
if (!resolved.ok) {
|
|
return resolved.result;
|
|
}
|
|
const payload = encodePaste(params.text ?? "", params.bracketed !== false);
|
|
if (!payload) {
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: "No paste text provided.",
|
|
},
|
|
],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
await writeToStdin(resolved.stdin, payload);
|
|
return runningSessionResult(
|
|
resolved.session,
|
|
`Pasted ${params.text?.length ?? 0} chars to session ${params.sessionId}.`,
|
|
);
|
|
}
|
|
|
|
case "kill": {
|
|
if (!scopedSession) {
|
|
return failText(`No active session found for ${params.sessionId}`);
|
|
}
|
|
if (!scopedSession.backgrounded) {
|
|
return failText(`Session ${params.sessionId} is not backgrounded.`);
|
|
}
|
|
const canceled = cancelManagedSession(scopedSession.id);
|
|
if (!canceled) {
|
|
const terminated = terminateSessionFallback(scopedSession);
|
|
if (!terminated) {
|
|
return failText(
|
|
`Unable to terminate session ${params.sessionId}: no active supervisor run or process id.`,
|
|
);
|
|
}
|
|
markExited(scopedSession, null, "SIGKILL", "failed");
|
|
}
|
|
resetPollRetrySuggestion(params.sessionId);
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: canceled
|
|
? `Termination requested for session ${params.sessionId}.`
|
|
: `Killed session ${params.sessionId}.`,
|
|
},
|
|
],
|
|
details: {
|
|
status: "failed",
|
|
name: scopedSession ? deriveSessionName(scopedSession.command) : undefined,
|
|
},
|
|
};
|
|
}
|
|
|
|
case "clear": {
|
|
if (scopedFinished) {
|
|
resetPollRetrySuggestion(params.sessionId);
|
|
deleteSession(params.sessionId);
|
|
return {
|
|
content: [{ type: "text", text: `Cleared session ${params.sessionId}.` }],
|
|
details: { status: "completed" },
|
|
};
|
|
}
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: `No finished session found for ${params.sessionId}`,
|
|
},
|
|
],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
|
|
case "remove": {
|
|
if (scopedSession) {
|
|
const canceled = cancelManagedSession(scopedSession.id);
|
|
if (canceled) {
|
|
// Keep remove semantics deterministic: drop from process registry now.
|
|
scopedSession.backgrounded = false;
|
|
deleteSession(params.sessionId);
|
|
} else {
|
|
const terminated = terminateSessionFallback(scopedSession);
|
|
if (!terminated) {
|
|
return failText(
|
|
`Unable to remove session ${params.sessionId}: no active supervisor run or process id.`,
|
|
);
|
|
}
|
|
markExited(scopedSession, null, "SIGKILL", "failed");
|
|
deleteSession(params.sessionId);
|
|
}
|
|
resetPollRetrySuggestion(params.sessionId);
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: canceled
|
|
? `Removed session ${params.sessionId} (termination requested).`
|
|
: `Removed session ${params.sessionId}.`,
|
|
},
|
|
],
|
|
details: {
|
|
status: "failed",
|
|
name: scopedSession ? deriveSessionName(scopedSession.command) : undefined,
|
|
},
|
|
};
|
|
}
|
|
if (scopedFinished) {
|
|
resetPollRetrySuggestion(params.sessionId);
|
|
deleteSession(params.sessionId);
|
|
return {
|
|
content: [{ type: "text", text: `Removed session ${params.sessionId}.` }],
|
|
details: { status: "completed" },
|
|
};
|
|
}
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: `No session found for ${params.sessionId}`,
|
|
},
|
|
],
|
|
details: { status: "failed" },
|
|
};
|
|
}
|
|
}
|
|
|
|
return {
|
|
content: [{ type: "text", text: `Unknown action ${params.action as string}` }],
|
|
details: { status: "failed" },
|
|
};
|
|
},
|
|
};
|
|
}
|
|
|
|
export const processTool = createProcessTool();
|