Files
openclaw/src/agents/bash-process-registry.ts
Josh Lehman 29142a9d47 fix: preserve Telegram topic routing for exec completions (#64580)
* clawdbot-a2c: pin exec completion delivery context

Regeneration-Prompt: |
  Fix a Telegram forum topic misroute where delayed exec completion or similar async completion text could be delivered into the wrong topic after the session's stored route drifted. Keep the patch surgical. Preserve immutable origin deliveryContext when background exec completion events are queued, thread that context from the exec tool's ambient channel/session defaults into the process session, and ensure the queued system event carries it instead of relying on later heartbeat fallback to mutable session lastTo/lastThreadId data. Add one focused unit assertion that notifyOnExit events keep the original Telegram topic delivery context and one heartbeat regression that proves work started in topic 47 still delivers back to topic 47 even if the session store later points at topic 2175.

* fix: note Telegram exec topic routing

Regeneration-Prompt: |
  Prepare PR #64580 after review-pr with no blocking findings. The only required prep change was the workflow-mandated changelog entry under CHANGELOG.md -> Unreleased -> Fixes. Preserve the review conclusion that the code change is already acceptable, do not widen scope beyond the changelog, and include the PR number plus thanks attribution in the changelog line for the Telegram exec forum-topic completion routing fix.
2026-04-11 15:47:53 -07:00

314 lines
8.5 KiB
TypeScript

import type { ChildProcessWithoutNullStreams } from "node:child_process";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { createSessionSlug as createSessionSlugId } from "./session-slug.js";
const DEFAULT_JOB_TTL_MS = 30 * 60 * 1000; // 30 minutes
const MIN_JOB_TTL_MS = 60 * 1000; // 1 minute
const MAX_JOB_TTL_MS = 3 * 60 * 60 * 1000; // 3 hours
const DEFAULT_PENDING_OUTPUT_CHARS = 30_000;
function clampTtl(value: number | undefined) {
if (!value || Number.isNaN(value)) {
return DEFAULT_JOB_TTL_MS;
}
return Math.min(Math.max(value, MIN_JOB_TTL_MS), MAX_JOB_TTL_MS);
}
let jobTtlMs = clampTtl(Number.parseInt(process.env.PI_BASH_JOB_TTL_MS ?? "", 10));
export type ProcessStatus = "running" | "completed" | "failed" | "killed";
export type SessionStdin = {
write: (data: string, cb?: (err?: Error | null) => void) => void;
end: () => void;
// When backed by a real Node stream (child.stdin), this exists; for PTY wrappers it may not.
destroy?: () => void;
destroyed?: boolean;
};
export interface ProcessSession {
id: string;
command: string;
scopeKey?: string;
sessionKey?: string;
notifyDeliveryContext?: DeliveryContext;
notifyOnExit?: boolean;
notifyOnExitEmptySuccess?: boolean;
exitNotified?: boolean;
child?: ChildProcessWithoutNullStreams;
stdin?: SessionStdin;
pid?: number;
startedAt: number;
cwd?: string;
maxOutputChars: number;
pendingMaxOutputChars?: number;
totalOutputChars: number;
pendingStdout: string[];
pendingStderr: string[];
pendingStdoutChars: number;
pendingStderrChars: number;
aggregated: string;
tail: string;
exitCode?: number | null;
exitSignal?: NodeJS.Signals | number | null;
exited: boolean;
truncated: boolean;
backgrounded: boolean;
/** PTY cursor key mode: unknown until a PTY reports smkx/rmkx. */
cursorKeyMode: "unknown" | "normal" | "application";
}
export interface FinishedSession {
id: string;
command: string;
scopeKey?: string;
startedAt: number;
endedAt: number;
cwd?: string;
status: ProcessStatus;
exitCode?: number | null;
exitSignal?: NodeJS.Signals | number | null;
aggregated: string;
tail: string;
truncated: boolean;
totalOutputChars: number;
}
const runningSessions = new Map<string, ProcessSession>();
const finishedSessions = new Map<string, FinishedSession>();
let sweeper: NodeJS.Timeout | null = null;
function isSessionIdTaken(id: string) {
return runningSessions.has(id) || finishedSessions.has(id);
}
export function createSessionSlug(): string {
return createSessionSlugId(isSessionIdTaken);
}
export function addSession(session: ProcessSession) {
runningSessions.set(session.id, session);
startSweeper();
}
export function getSession(id: string) {
return runningSessions.get(id);
}
export function getFinishedSession(id: string) {
return finishedSessions.get(id);
}
export function deleteSession(id: string) {
runningSessions.delete(id);
finishedSessions.delete(id);
}
export function appendOutput(session: ProcessSession, stream: "stdout" | "stderr", chunk: string) {
session.pendingStdout ??= [];
session.pendingStderr ??= [];
session.pendingStdoutChars ??= sumPendingChars(session.pendingStdout);
session.pendingStderrChars ??= sumPendingChars(session.pendingStderr);
const buffer = stream === "stdout" ? session.pendingStdout : session.pendingStderr;
const bufferChars = stream === "stdout" ? session.pendingStdoutChars : session.pendingStderrChars;
const pendingCap = Math.min(
session.pendingMaxOutputChars ?? DEFAULT_PENDING_OUTPUT_CHARS,
session.maxOutputChars,
);
buffer.push(chunk);
let pendingChars = bufferChars + chunk.length;
if (pendingChars > pendingCap) {
session.truncated = true;
pendingChars = capPendingBuffer(buffer, pendingChars, pendingCap);
}
if (stream === "stdout") {
session.pendingStdoutChars = pendingChars;
} else {
session.pendingStderrChars = pendingChars;
}
session.totalOutputChars += chunk.length;
const aggregated = trimWithCap(session.aggregated + chunk, session.maxOutputChars);
session.truncated =
session.truncated || aggregated.length < session.aggregated.length + chunk.length;
session.aggregated = aggregated;
session.tail = tail(session.aggregated, 2000);
}
export function drainSession(session: ProcessSession) {
const stdout = session.pendingStdout.join("");
const stderr = session.pendingStderr.join("");
session.pendingStdout = [];
session.pendingStderr = [];
session.pendingStdoutChars = 0;
session.pendingStderrChars = 0;
return { stdout, stderr };
}
export function markExited(
session: ProcessSession,
exitCode: number | null,
exitSignal: NodeJS.Signals | number | null,
status: ProcessStatus,
) {
session.exited = true;
session.exitCode = exitCode;
session.exitSignal = exitSignal;
session.tail = tail(session.aggregated, 2000);
moveToFinished(session, status);
}
export function markBackgrounded(session: ProcessSession) {
session.backgrounded = true;
}
function moveToFinished(session: ProcessSession, status: ProcessStatus) {
runningSessions.delete(session.id);
// Clean up child process stdio streams to prevent FD leaks
if (session.child) {
// Destroy stdio streams to release file descriptors
session.child.stdin?.destroy?.();
session.child.stdout?.destroy?.();
session.child.stderr?.destroy?.();
// Remove all event listeners to prevent memory leaks
session.child.removeAllListeners();
// Clear the reference
delete session.child;
}
// Clean up stdin wrapper - call destroy if available, otherwise just remove reference
if (session.stdin) {
// Try to call destroy/end method if exists
if (typeof session.stdin.destroy === "function") {
session.stdin.destroy();
} else if (typeof session.stdin.end === "function") {
session.stdin.end();
}
// Only set flag if writable
try {
(session.stdin as { destroyed?: boolean }).destroyed = true;
} catch {
// Ignore if read-only
}
delete session.stdin;
}
if (!session.backgrounded) {
return;
}
finishedSessions.set(session.id, {
id: session.id,
command: session.command,
scopeKey: session.scopeKey,
startedAt: session.startedAt,
endedAt: Date.now(),
cwd: session.cwd,
status,
exitCode: session.exitCode,
exitSignal: session.exitSignal,
aggregated: session.aggregated,
tail: session.tail,
truncated: session.truncated,
totalOutputChars: session.totalOutputChars,
});
}
export function tail(text: string, max = 2000) {
if (text.length <= max) {
return text;
}
return text.slice(text.length - max);
}
function sumPendingChars(buffer: string[]) {
let total = 0;
for (const chunk of buffer) {
total += chunk.length;
}
return total;
}
function capPendingBuffer(buffer: string[], pendingChars: number, cap: number) {
if (pendingChars <= cap) {
return pendingChars;
}
const last = buffer.at(-1);
if (last && last.length >= cap) {
buffer.length = 0;
buffer.push(last.slice(last.length - cap));
return cap;
}
while (buffer.length && pendingChars - buffer[0].length >= cap) {
pendingChars -= buffer[0].length;
buffer.shift();
}
if (buffer.length && pendingChars > cap) {
const overflow = pendingChars - cap;
buffer[0] = buffer[0].slice(overflow);
pendingChars = cap;
}
return pendingChars;
}
export function trimWithCap(text: string, max: number) {
if (text.length <= max) {
return text;
}
return text.slice(text.length - max);
}
export function listRunningSessions() {
return Array.from(runningSessions.values()).filter((s) => s.backgrounded);
}
export function listFinishedSessions() {
return Array.from(finishedSessions.values());
}
export function clearFinished() {
finishedSessions.clear();
}
export function resetProcessRegistryForTests() {
runningSessions.clear();
finishedSessions.clear();
stopSweeper();
}
export function setJobTtlMs(value?: number) {
if (value === undefined || Number.isNaN(value)) {
return;
}
jobTtlMs = clampTtl(value);
stopSweeper();
startSweeper();
}
function pruneFinishedSessions() {
const cutoff = Date.now() - jobTtlMs;
for (const [id, session] of finishedSessions.entries()) {
if (session.endedAt < cutoff) {
finishedSessions.delete(id);
}
}
}
function startSweeper() {
if (sweeper) {
return;
}
sweeper = setInterval(pruneFinishedSessions, Math.max(30_000, jobTtlMs / 6));
sweeper.unref?.();
}
function stopSweeper() {
if (!sweeper) {
return;
}
clearInterval(sweeper);
sweeper = null;
}