diff --git a/src/agents/embedded-agent-runner/run-state.ts b/src/agents/embedded-agent-runner/run-state.ts index 1c29d865927..c3345659e77 100644 --- a/src/agents/embedded-agent-runner/run-state.ts +++ b/src/agents/embedded-agent-runner/run-state.ts @@ -43,6 +43,14 @@ export type EmbeddedRunWaiter = { timer: NodeJS.Timeout; }; +export type AbandonedEmbeddedRun = { + sessionId: string; + sessionKey?: string; + sessionFile?: string; + abandonedAtMs: number; + reason: "timeout"; +}; + const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState"); const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({ @@ -50,6 +58,9 @@ const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({ snapshots: new Map(), sessionIdsByKey: new Map(), sessionIdsByFile: new Map(), + abandonedRunsBySessionId: new Map(), + abandonedRunSessionIdsByKey: new Map(), + abandonedRunSessionIdsByFile: new Map(), waiters: new Map>(), modelSwitchRequests: new Map(), })); @@ -66,6 +77,15 @@ export const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY = export const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_FILE = embeddedRunState.sessionIdsByFile ?? (embeddedRunState.sessionIdsByFile = new Map()); +export const ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID = + embeddedRunState.abandonedRunsBySessionId ?? + (embeddedRunState.abandonedRunsBySessionId = new Map()); +export const ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY = + embeddedRunState.abandonedRunSessionIdsByKey ?? + (embeddedRunState.abandonedRunSessionIdsByKey = new Map()); +export const ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE = + embeddedRunState.abandonedRunSessionIdsByFile ?? + (embeddedRunState.abandonedRunSessionIdsByFile = new Map()); export const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters ?? (embeddedRunState.waiters = new Map>()); diff --git a/src/agents/embedded-agent-runner/run/attempt.ts b/src/agents/embedded-agent-runner/run/attempt.ts index 37fb36b7ad4..b172d2eb684 100644 --- a/src/agents/embedded-agent-runner/run/attempt.ts +++ b/src/agents/embedded-agent-runner/run/attempt.ts @@ -232,6 +232,7 @@ import { createEmbeddedAgentResourceLoader } from "../resource-loader.js"; import { clearActiveEmbeddedRun, type EmbeddedAgentQueueHandle, + markActiveEmbeddedRunAbandoned, setActiveEmbeddedRun, updateActiveEmbeddedRunSessionFile, updateActiveEmbeddedRunSnapshot, @@ -2892,6 +2893,7 @@ export async function runEmbeddedAttempt( } } }; + let queueHandleForAbandonment: EmbeddedAgentQueueHandle | undefined; const abortRun = (isTimeout = false, reason?: unknown) => { aborted = true; if (isTimeout) { @@ -2907,7 +2909,14 @@ export async function runEmbeddedAttempt( } abortCompaction(); void abortActiveSession(); - if (isTimeout) { + if (isTimeout && queueHandleForAbandonment) { + markActiveEmbeddedRunAbandoned({ + sessionId: params.sessionId, + handle: queueHandleForAbandonment, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: "timeout", + }); void sessionLockController.releaseHeldLockForAbort().catch((err) => { log.warn( `failed to release session lock on timeout abort: runId=${params.runId} ${String(err)}`, @@ -3092,6 +3101,7 @@ export async function runEmbeddedAttempt( if (params.replyOperation) { params.replyOperation.attachBackend(queueHandle); } + queueHandleForAbandonment = queueHandle; setActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey, params.sessionFile); let abortWarnTimer: NodeJS.Timeout | undefined; diff --git a/src/agents/embedded-agent-runner/runs.test.ts b/src/agents/embedded-agent-runner/runs.test.ts index 33a09393819..1a200d10230 100644 --- a/src/agents/embedded-agent-runner/runs.test.ts +++ b/src/agents/embedded-agent-runner/runs.test.ts @@ -18,10 +18,14 @@ import { abortAndDrainEmbeddedAgentRun, abortEmbeddedAgentRun, clearActiveEmbeddedRun, + clearEmbeddedRunAbandonment, consumeEmbeddedRunModelSwitch, getActiveEmbeddedRunSnapshot, isEmbeddedAgentRunHandleActive, + isEmbeddedRunAbandoned, formatEmbeddedAgentQueueFailureSummary, + markActiveEmbeddedRunAbandoned, + markEmbeddedRunAbandoned, queueEmbeddedAgentMessageWithOutcome, queueEmbeddedAgentMessageWithOutcomeAsync, requestEmbeddedRunModelSwitch, @@ -412,6 +416,56 @@ describe("embedded-agent runner run registry", () => { expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBeUndefined(); }); + it("tracks timeout abandonment by session id, key, and file until a new run starts", () => { + const sessionFile = "/tmp/openclaw-abandoned-session.jsonl"; + const handle = createRunHandle(); + + markEmbeddedRunAbandoned({ + sessionId: "session-timeout", + sessionKey: "agent:main:main", + sessionFile, + reason: "timeout", + }); + + expect(isEmbeddedRunAbandoned({ sessionId: "session-timeout" })).toBe(true); + expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(true); + expect(isEmbeddedRunAbandoned({ sessionFile })).toBe(true); + + setActiveEmbeddedRun("session-next", handle, "agent:main:main", sessionFile); + + expect(isEmbeddedRunAbandoned({ sessionId: "session-timeout" })).toBe(false); + expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(false); + expect(isEmbeddedRunAbandoned({ sessionFile })).toBe(false); + + markEmbeddedRunAbandoned({ + sessionId: "session-next", + sessionKey: "agent:main:main", + reason: "timeout", + }); + clearEmbeddedRunAbandonment({ sessionId: "session-next" }); + + expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(false); + }); + + it("ignores timeout abandonment from a stale replaced handle", () => { + const oldHandle = createRunHandle(); + const newHandle = createRunHandle(); + + setActiveEmbeddedRun("session-replaced", oldHandle, "agent:main:main"); + setActiveEmbeddedRun("session-replaced", newHandle, "agent:main:main"); + + expect( + markActiveEmbeddedRunAbandoned({ + sessionId: "session-replaced", + handle: oldHandle, + sessionKey: "agent:main:main", + reason: "timeout", + }), + ).toBe(false); + + expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(false); + }); + it("treats repeated clears for a completed run handle as idempotent", () => { const debugSpy = vi.spyOn(diagnosticLogger, "debug").mockImplementation(() => undefined); const handle = createRunHandle(); diff --git a/src/agents/embedded-agent-runner/runs.ts b/src/agents/embedded-agent-runner/runs.ts index 54e354f47c3..cfe9f884d47 100644 --- a/src/agents/embedded-agent-runner/runs.ts +++ b/src/agents/embedded-agent-runner/runs.ts @@ -24,10 +24,14 @@ import { ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_FILE, ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY, ACTIVE_EMBEDDED_RUN_SNAPSHOTS, + ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID, + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE, + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY, EMBEDDED_RUN_MODEL_SWITCH_REQUESTS, EMBEDDED_RUN_WAITERS, getActiveEmbeddedRunCount, type ActiveEmbeddedRunSnapshot, + type AbandonedEmbeddedRun, type EmbeddedAgentQueueHandle, type EmbeddedAgentQueueMessageOptions, type EmbeddedRunModelSwitchRequest, @@ -136,6 +140,135 @@ function setActiveRunSessionFile(sessionFile: string | undefined, sessionId: str ); } +function clearEmbeddedRunAbandonmentBySessionId(sessionId: string): void { + const abandonedRun = ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.get(sessionId); + if (!abandonedRun) { + return; + } + ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.delete(sessionId); + const normalizedSessionKey = abandonedRun.sessionKey?.trim(); + if ( + normalizedSessionKey && + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.get(normalizedSessionKey) === sessionId + ) { + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.delete(normalizedSessionKey); + } + const normalizedSessionFile = abandonedRun.sessionFile?.trim(); + if (normalizedSessionFile) { + const sessionFileKey = resolveEmbeddedSessionFileKey(normalizedSessionFile); + if (ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.get(sessionFileKey) === sessionId) { + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.delete(sessionFileKey); + } + } +} + +function clearEmbeddedRunAbandonmentBySessionKey(sessionKey: string | undefined): void { + const normalizedSessionKey = sessionKey?.trim(); + if (!normalizedSessionKey) { + return; + } + const sessionId = ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.get(normalizedSessionKey); + if (sessionId) { + clearEmbeddedRunAbandonmentBySessionId(sessionId); + } +} + +function clearEmbeddedRunAbandonmentBySessionFile(sessionFile: string | undefined): void { + const normalizedSessionFile = sessionFile?.trim(); + if (!normalizedSessionFile) { + return; + } + const sessionFileKey = resolveEmbeddedSessionFileKey(normalizedSessionFile); + const sessionId = ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.get(sessionFileKey); + if (sessionId) { + clearEmbeddedRunAbandonmentBySessionId(sessionId); + } +} + +export function clearEmbeddedRunAbandonment(params: { + sessionId?: string; + sessionKey?: string; + sessionFile?: string; +}): void { + const normalizedSessionId = params.sessionId?.trim(); + if (normalizedSessionId) { + clearEmbeddedRunAbandonmentBySessionId(normalizedSessionId); + } + clearEmbeddedRunAbandonmentBySessionKey(params.sessionKey); + clearEmbeddedRunAbandonmentBySessionFile(params.sessionFile); +} + +export function markEmbeddedRunAbandoned(params: { + sessionId: string; + sessionKey?: string; + sessionFile?: string; + reason: AbandonedEmbeddedRun["reason"]; +}): void { + const sessionId = params.sessionId.trim(); + if (!sessionId) { + return; + } + clearEmbeddedRunAbandonment({ + sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + const abandonedRun: AbandonedEmbeddedRun = { + sessionId, + abandonedAtMs: Date.now(), + reason: params.reason, + ...(params.sessionKey?.trim() ? { sessionKey: params.sessionKey.trim() } : {}), + ...(params.sessionFile?.trim() ? { sessionFile: params.sessionFile.trim() } : {}), + }; + ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.set(sessionId, abandonedRun); + if (abandonedRun.sessionKey) { + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.set(abandonedRun.sessionKey, sessionId); + } + if (abandonedRun.sessionFile) { + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.set( + resolveEmbeddedSessionFileKey(abandonedRun.sessionFile), + sessionId, + ); + } +} + +export function markActiveEmbeddedRunAbandoned(params: { + sessionId: string; + handle: EmbeddedAgentQueueHandle; + sessionKey?: string; + sessionFile?: string; + reason: AbandonedEmbeddedRun["reason"]; +}): boolean { + const sessionId = params.sessionId.trim(); + if (!sessionId || ACTIVE_EMBEDDED_RUNS.get(sessionId) !== params.handle) { + return false; + } + markEmbeddedRunAbandoned(params); + return true; +} + +export function isEmbeddedRunAbandoned(params: { + sessionId?: string; + sessionKey?: string; + sessionFile?: string; +}): boolean { + const normalizedSessionId = params.sessionId?.trim(); + if (normalizedSessionId && ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.has(normalizedSessionId)) { + return true; + } + const normalizedSessionKey = params.sessionKey?.trim(); + if (normalizedSessionKey && ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.has(normalizedSessionKey)) { + return true; + } + const normalizedSessionFile = params.sessionFile?.trim(); + return Boolean( + normalizedSessionFile && + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.has( + resolveEmbeddedSessionFileKey(normalizedSessionFile), + ), + ); +} + function clearActiveRunSessionFiles(sessionId: string, sessionFile?: string): void { const normalizedSessionFile = sessionFile?.trim(); if (normalizedSessionFile) { @@ -586,6 +719,7 @@ export function setActiveEmbeddedRun( sessionFile?: string, ) { const wasActive = ACTIVE_EMBEDDED_RUNS.has(sessionId); + clearEmbeddedRunAbandonment({ sessionId, sessionKey, sessionFile }); ACTIVE_EMBEDDED_RUNS.set(sessionId, handle); setActiveRunSessionKey(sessionKey, sessionId); clearActiveRunSessionFiles(sessionId); @@ -692,6 +826,9 @@ export const testing = { ACTIVE_EMBEDDED_RUN_SNAPSHOTS.clear(); ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY.clear(); ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_FILE.clear(); + ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.clear(); + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.clear(); + ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.clear(); EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.clear(); }, }; diff --git a/src/agents/subagent-announce-delivery.runtime.ts b/src/agents/subagent-announce-delivery.runtime.ts index fe7df3c2044..e4ecc4167e7 100644 --- a/src/agents/subagent-announce-delivery.runtime.ts +++ b/src/agents/subagent-announce-delivery.runtime.ts @@ -15,6 +15,7 @@ export { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; export { formatEmbeddedAgentQueueFailureSummary, isEmbeddedAgentRunActive, + isEmbeddedRunAbandoned, queueEmbeddedAgentMessageWithOutcomeAsync, resolveActiveEmbeddedRunSessionId, } from "./embedded-agent-runner/runs.js"; diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index 81142b2059d..087e0c16a6e 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -195,6 +195,7 @@ async function deliverSlackThreadAnnouncement(params: { sendMessage?: typeof runtimeSendMessage; internalEvents?: AgentInternalEvent[]; sourceTool?: string; + requesterAbandoned?: boolean; }) { testing.setDepsForTest({ callGateway: params.callGateway, @@ -202,6 +203,7 @@ async function deliverSlackThreadAnnouncement(params: { sessionId: params.sessionId, isActive: params.isActive, }), + isRequesterSessionAbandoned: () => params.requesterAbandoned === true, getRuntimeConfig: () => ({}) as never, sendMessage: params.sendMessage ?? runtimeSendMessage, ...(params.queueEmbeddedAgentMessageWithOutcome @@ -276,6 +278,7 @@ async function deliverTelegramDirectMessageCompletion(params: { requesterSessionKey?: string; sourceTool?: string; runtimeConfig?: Record; + requesterAbandoned?: boolean; origin?: { channel: "telegram"; to: string; @@ -298,6 +301,7 @@ async function deliverTelegramDirectMessageCompletion(params: { : (params.requesterSessionId ?? "requester-session-telegram"), isActive: params.isActive === true, }), + isRequesterSessionAbandoned: () => params.requesterAbandoned === true, getRuntimeConfig: () => (params.runtimeConfig ?? {}) as never, sendMessage: params.sendMessage ?? runtimeSendMessage, ...(params.queueEmbeddedAgentMessageWithOutcome @@ -1987,6 +1991,59 @@ describe("deliverSubagentAnnouncement completion delivery", () => { expect(sendMessage).not.toHaveBeenCalled(); }); + it("does not restart an abandoned requester session for late completion delivery", async () => { + const callGateway = createGatewayMock({ + result: { + payloads: [{ text: "child completion output" }], + }, + }); + const sendMessage = createSendMessageMock(); + const queueEmbeddedAgentMessageWithOutcome = createQueueOutcomeMock(true); + const result = await deliverTelegramDirectMessageCompletion({ + callGateway, + sendMessage, + requesterAbandoned: true, + isActive: false, + queueEmbeddedAgentMessageWithOutcome, + internalEvents: [ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + childSessionId: "child-session-id", + announceType: "subagent task", + taskLabel: "telegram late completion", + status: "ok", + statusLabel: "completed successfully", + result: "child completion output", + replyInstruction: "Summarize the result.", + }, + ], + }); + + expectRecordFields(result, { + delivered: false, + path: "none", + error: "requester session abandoned after timeout", + }); + expect(result.phases).toEqual([ + expect.objectContaining({ + phase: "direct-primary", + delivered: false, + path: "none", + error: "requester session abandoned after timeout", + }), + expect.objectContaining({ + phase: "steer-fallback", + delivered: false, + path: "none", + }), + ]); + expect(callGateway).not.toHaveBeenCalled(); + expect(sendMessage).not.toHaveBeenCalled(); + expect(queueEmbeddedAgentMessageWithOutcome).not.toHaveBeenCalled(); + }); + it("uses steer fallback when a completion handoff has no visible output", async () => { const callGateway = createGatewayMock({ result: { diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 02e0e3f74e2..a72e4d54c78 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -43,6 +43,7 @@ import { dispatchGatewayMethodInProcess, getGlobalHookRunner, isEmbeddedAgentRunActive, + isEmbeddedRunAbandoned, getRuntimeConfig, formatEmbeddedAgentQueueFailureSummary, loadSessionStore, @@ -73,6 +74,7 @@ type SubagentAnnounceDeliveryDeps = { sessionId?: string; isActive: boolean; }; + isRequesterSessionAbandoned: (requesterSessionKey: string, sessionId?: string) => boolean; queueEmbeddedAgentMessageWithOutcome: ( sessionId: string, text: string, @@ -93,6 +95,8 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = { isActive: Boolean(sessionId && isEmbeddedAgentRunActive(sessionId)), }; }, + isRequesterSessionAbandoned: (requesterSessionKey, sessionId) => + isEmbeddedRunAbandoned({ sessionKey: requesterSessionKey, sessionId }), queueEmbeddedAgentMessageWithOutcome: queueEmbeddedAgentMessageWithOutcomeAsync, sendMessage, }; @@ -569,6 +573,9 @@ async function maybeSteerSubagentAnnounce(params: { const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey); const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); const { sessionId, isActive } = resolveRequesterSessionActivity(canonicalKey); + if (subagentAnnounceDeliveryDeps.isRequesterSessionAbandoned(canonicalKey, sessionId)) { + return { status: "none" }; + } if (!sessionId || !isActive) { return { status: "none" }; } @@ -1055,6 +1062,19 @@ async function sendSubagentAnnounceDirectly(params: { completionRouteRequiresMessageToolDelivery || (agentMediatedCompletion && expectedMediaUrls.length > 0); const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey); + if ( + params.expectsCompletionMessage && + subagentAnnounceDeliveryDeps.isRequesterSessionAbandoned( + canonicalRequesterSessionKey, + requesterActivity.sessionId, + ) + ) { + return { + delivered: false, + path: "none", + error: "requester session abandoned after timeout", + }; + } let activeRequesterWakeFailed = false; const tryGeneratedMediaDirectDelivery = async (announceResponse?: unknown) => { if (requesterActivity.isActive && !activeRequesterWakeFailed) {