mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-24 23:51:48 +00:00
* Gateway: resolve agent.wait for chat.send runs * Discord: harden ACP thread binding + listener timeout * ACPX: handle already-exited child wait * Gateway/Discord: address PR review findings * Discord: keep ACP error-state thread bindings on startup * gateway: make agent.wait dedupe bridge event-driven * discord: harden ACP probe classification and cap startup fan-out * discord: add cooperative timeout cancellation * discord: fix startup probe concurrency helper typing * plugin-sdk: avoid Windows root-alias shard timeout * plugin-sdk: keep root alias reflection path non-blocking * discord+gateway: resolve remaining PR review findings * gateway+discord: fix codex review regressions * Discord/Gateway: address Codex review findings * Gateway: keep agent.wait lifecycle active with shared run IDs * Discord: clean up status reactions on aborted runs * fix: add changelog note for ACP/Discord startup hardening (#33699) (thanks @dutifulbob) --------- Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
253 lines
6.9 KiB
TypeScript
253 lines
6.9 KiB
TypeScript
import { onAgentEvent } from "../../infra/agent-events.js";
|
|
|
|
const AGENT_RUN_CACHE_TTL_MS = 10 * 60_000;
|
|
/**
|
|
* Embedded runs can emit transient lifecycle `error` events while auth/model
|
|
* failover is still in progress. Give errors a short grace window so a
|
|
* subsequent `start` event can cancel premature terminal snapshots.
|
|
*/
|
|
const AGENT_RUN_ERROR_RETRY_GRACE_MS = 15_000;
|
|
|
|
const agentRunCache = new Map<string, AgentRunSnapshot>();
|
|
const agentRunStarts = new Map<string, number>();
|
|
const pendingAgentRunErrors = new Map<string, PendingAgentRunError>();
|
|
let agentRunListenerStarted = false;
|
|
|
|
type AgentRunSnapshot = {
|
|
runId: string;
|
|
status: "ok" | "error" | "timeout";
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
error?: string;
|
|
ts: number;
|
|
};
|
|
|
|
type PendingAgentRunError = {
|
|
snapshot: AgentRunSnapshot;
|
|
dueAt: number;
|
|
timer: NodeJS.Timeout;
|
|
};
|
|
|
|
function pruneAgentRunCache(now = Date.now()) {
|
|
for (const [runId, entry] of agentRunCache) {
|
|
if (now - entry.ts > AGENT_RUN_CACHE_TTL_MS) {
|
|
agentRunCache.delete(runId);
|
|
}
|
|
}
|
|
}
|
|
|
|
function recordAgentRunSnapshot(entry: AgentRunSnapshot) {
|
|
pruneAgentRunCache(entry.ts);
|
|
agentRunCache.set(entry.runId, entry);
|
|
}
|
|
|
|
function clearPendingAgentRunError(runId: string) {
|
|
const pending = pendingAgentRunErrors.get(runId);
|
|
if (!pending) {
|
|
return;
|
|
}
|
|
clearTimeout(pending.timer);
|
|
pendingAgentRunErrors.delete(runId);
|
|
}
|
|
|
|
function schedulePendingAgentRunError(snapshot: AgentRunSnapshot) {
|
|
clearPendingAgentRunError(snapshot.runId);
|
|
const dueAt = Date.now() + AGENT_RUN_ERROR_RETRY_GRACE_MS;
|
|
const timer = setTimeout(() => {
|
|
const pending = pendingAgentRunErrors.get(snapshot.runId);
|
|
if (!pending) {
|
|
return;
|
|
}
|
|
pendingAgentRunErrors.delete(snapshot.runId);
|
|
recordAgentRunSnapshot(pending.snapshot);
|
|
}, AGENT_RUN_ERROR_RETRY_GRACE_MS);
|
|
timer.unref?.();
|
|
pendingAgentRunErrors.set(snapshot.runId, { snapshot, dueAt, timer });
|
|
}
|
|
|
|
function getPendingAgentRunError(runId: string) {
|
|
const pending = pendingAgentRunErrors.get(runId);
|
|
if (!pending) {
|
|
return undefined;
|
|
}
|
|
return {
|
|
snapshot: pending.snapshot,
|
|
dueAt: pending.dueAt,
|
|
};
|
|
}
|
|
|
|
function createSnapshotFromLifecycleEvent(params: {
|
|
runId: string;
|
|
phase: "end" | "error";
|
|
data?: Record<string, unknown>;
|
|
}): AgentRunSnapshot {
|
|
const { runId, phase, data } = params;
|
|
const startedAt =
|
|
typeof data?.startedAt === "number" ? data.startedAt : agentRunStarts.get(runId);
|
|
const endedAt = typeof data?.endedAt === "number" ? data.endedAt : undefined;
|
|
const error = typeof data?.error === "string" ? data.error : undefined;
|
|
return {
|
|
runId,
|
|
status: phase === "error" ? "error" : data?.aborted ? "timeout" : "ok",
|
|
startedAt,
|
|
endedAt,
|
|
error,
|
|
ts: Date.now(),
|
|
};
|
|
}
|
|
|
|
function ensureAgentRunListener() {
|
|
if (agentRunListenerStarted) {
|
|
return;
|
|
}
|
|
agentRunListenerStarted = true;
|
|
onAgentEvent((evt) => {
|
|
if (!evt) {
|
|
return;
|
|
}
|
|
if (evt.stream !== "lifecycle") {
|
|
return;
|
|
}
|
|
const phase = evt.data?.phase;
|
|
if (phase === "start") {
|
|
const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined;
|
|
agentRunStarts.set(evt.runId, startedAt ?? Date.now());
|
|
clearPendingAgentRunError(evt.runId);
|
|
// A new start means this run is active again (or retried). Drop stale
|
|
// terminal snapshots so waiters don't resolve from old state.
|
|
agentRunCache.delete(evt.runId);
|
|
return;
|
|
}
|
|
if (phase !== "end" && phase !== "error") {
|
|
return;
|
|
}
|
|
const snapshot = createSnapshotFromLifecycleEvent({
|
|
runId: evt.runId,
|
|
phase,
|
|
data: evt.data,
|
|
});
|
|
agentRunStarts.delete(evt.runId);
|
|
if (phase === "error") {
|
|
schedulePendingAgentRunError(snapshot);
|
|
return;
|
|
}
|
|
clearPendingAgentRunError(evt.runId);
|
|
recordAgentRunSnapshot(snapshot);
|
|
});
|
|
}
|
|
|
|
function getCachedAgentRun(runId: string) {
|
|
pruneAgentRunCache();
|
|
return agentRunCache.get(runId);
|
|
}
|
|
|
|
export async function waitForAgentJob(params: {
|
|
runId: string;
|
|
timeoutMs: number;
|
|
signal?: AbortSignal;
|
|
ignoreCachedSnapshot?: boolean;
|
|
}): Promise<AgentRunSnapshot | null> {
|
|
const { runId, timeoutMs, signal, ignoreCachedSnapshot = false } = params;
|
|
ensureAgentRunListener();
|
|
const cached = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId);
|
|
if (cached) {
|
|
return cached;
|
|
}
|
|
if (timeoutMs <= 0 || signal?.aborted) {
|
|
return null;
|
|
}
|
|
|
|
return await new Promise((resolve) => {
|
|
let settled = false;
|
|
let pendingErrorTimer: NodeJS.Timeout | undefined;
|
|
let onAbort: (() => void) | undefined;
|
|
|
|
const clearPendingErrorTimer = () => {
|
|
if (!pendingErrorTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(pendingErrorTimer);
|
|
pendingErrorTimer = undefined;
|
|
};
|
|
|
|
const finish = (entry: AgentRunSnapshot | null) => {
|
|
if (settled) {
|
|
return;
|
|
}
|
|
settled = true;
|
|
clearTimeout(timer);
|
|
clearPendingErrorTimer();
|
|
unsubscribe();
|
|
if (onAbort) {
|
|
signal?.removeEventListener("abort", onAbort);
|
|
}
|
|
resolve(entry);
|
|
};
|
|
|
|
const scheduleErrorFinish = (
|
|
snapshot: AgentRunSnapshot,
|
|
delayMs = AGENT_RUN_ERROR_RETRY_GRACE_MS,
|
|
) => {
|
|
clearPendingErrorTimer();
|
|
const effectiveDelay = Math.max(1, Math.min(Math.floor(delayMs), 2_147_483_647));
|
|
pendingErrorTimer = setTimeout(() => {
|
|
const latest = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId);
|
|
if (latest) {
|
|
finish(latest);
|
|
return;
|
|
}
|
|
recordAgentRunSnapshot(snapshot);
|
|
finish(snapshot);
|
|
}, effectiveDelay);
|
|
pendingErrorTimer.unref?.();
|
|
};
|
|
|
|
if (!ignoreCachedSnapshot) {
|
|
const pending = getPendingAgentRunError(runId);
|
|
if (pending) {
|
|
scheduleErrorFinish(pending.snapshot, pending.dueAt - Date.now());
|
|
}
|
|
}
|
|
|
|
const unsubscribe = onAgentEvent((evt) => {
|
|
if (!evt || evt.stream !== "lifecycle") {
|
|
return;
|
|
}
|
|
if (evt.runId !== runId) {
|
|
return;
|
|
}
|
|
const phase = evt.data?.phase;
|
|
if (phase === "start") {
|
|
clearPendingErrorTimer();
|
|
return;
|
|
}
|
|
if (phase !== "end" && phase !== "error") {
|
|
return;
|
|
}
|
|
const latest = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId);
|
|
if (latest) {
|
|
finish(latest);
|
|
return;
|
|
}
|
|
const snapshot = createSnapshotFromLifecycleEvent({
|
|
runId: evt.runId,
|
|
phase,
|
|
data: evt.data,
|
|
});
|
|
if (phase === "error") {
|
|
scheduleErrorFinish(snapshot);
|
|
return;
|
|
}
|
|
recordAgentRunSnapshot(snapshot);
|
|
finish(snapshot);
|
|
});
|
|
|
|
const timerDelayMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647));
|
|
const timer = setTimeout(() => finish(null), timerDelayMs);
|
|
onAbort = () => finish(null);
|
|
signal?.addEventListener("abort", onAbort, { once: true });
|
|
});
|
|
}
|
|
|
|
ensureAgentRunListener();
|