fix(agents): suppress abandoned requester completion handoff (#87541)

This commit is contained in:
Peter Steinberger
2026-05-28 07:10:17 +01:00
committed by GitHub
parent 50a708c5f9
commit ea48ac7da8
7 changed files with 300 additions and 1 deletions

View File

@@ -43,6 +43,14 @@ export type EmbeddedRunWaiter = {
timer: NodeJS.Timeout;
};
export type AbandonedEmbeddedRun = {
sessionId: string;
sessionKey?: string;
sessionFile?: string;
abandonedAtMs: number;
reason: "timeout";
};
const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
@@ -50,6 +58,9 @@ const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
snapshots: new Map<string, ActiveEmbeddedRunSnapshot>(),
sessionIdsByKey: new Map<string, string>(),
sessionIdsByFile: new Map<string, string>(),
abandonedRunsBySessionId: new Map<string, AbandonedEmbeddedRun>(),
abandonedRunSessionIdsByKey: new Map<string, string>(),
abandonedRunSessionIdsByFile: new Map<string, string>(),
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
modelSwitchRequests: new Map<string, EmbeddedRunModelSwitchRequest>(),
}));
@@ -66,6 +77,15 @@ export const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY =
export const ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_FILE =
embeddedRunState.sessionIdsByFile ??
(embeddedRunState.sessionIdsByFile = new Map<string, string>());
export const ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID =
embeddedRunState.abandonedRunsBySessionId ??
(embeddedRunState.abandonedRunsBySessionId = new Map<string, AbandonedEmbeddedRun>());
export const ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY =
embeddedRunState.abandonedRunSessionIdsByKey ??
(embeddedRunState.abandonedRunSessionIdsByKey = new Map<string, string>());
export const ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE =
embeddedRunState.abandonedRunSessionIdsByFile ??
(embeddedRunState.abandonedRunSessionIdsByFile = new Map<string, string>());
export const EMBEDDED_RUN_WAITERS =
embeddedRunState.waiters ??
(embeddedRunState.waiters = new Map<string, Set<EmbeddedRunWaiter>>());

View File

@@ -232,6 +232,7 @@ import { createEmbeddedAgentResourceLoader } from "../resource-loader.js";
import {
clearActiveEmbeddedRun,
type EmbeddedAgentQueueHandle,
markActiveEmbeddedRunAbandoned,
setActiveEmbeddedRun,
updateActiveEmbeddedRunSessionFile,
updateActiveEmbeddedRunSnapshot,
@@ -2892,6 +2893,7 @@ export async function runEmbeddedAttempt(
}
}
};
let queueHandleForAbandonment: EmbeddedAgentQueueHandle | undefined;
const abortRun = (isTimeout = false, reason?: unknown) => {
aborted = true;
if (isTimeout) {
@@ -2907,7 +2909,14 @@ export async function runEmbeddedAttempt(
}
abortCompaction();
void abortActiveSession();
if (isTimeout) {
if (isTimeout && queueHandleForAbandonment) {
markActiveEmbeddedRunAbandoned({
sessionId: params.sessionId,
handle: queueHandleForAbandonment,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
reason: "timeout",
});
void sessionLockController.releaseHeldLockForAbort().catch((err) => {
log.warn(
`failed to release session lock on timeout abort: runId=${params.runId} ${String(err)}`,
@@ -3092,6 +3101,7 @@ export async function runEmbeddedAttempt(
if (params.replyOperation) {
params.replyOperation.attachBackend(queueHandle);
}
queueHandleForAbandonment = queueHandle;
setActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey, params.sessionFile);
let abortWarnTimer: NodeJS.Timeout | undefined;

View File

@@ -18,10 +18,14 @@ import {
abortAndDrainEmbeddedAgentRun,
abortEmbeddedAgentRun,
clearActiveEmbeddedRun,
clearEmbeddedRunAbandonment,
consumeEmbeddedRunModelSwitch,
getActiveEmbeddedRunSnapshot,
isEmbeddedAgentRunHandleActive,
isEmbeddedRunAbandoned,
formatEmbeddedAgentQueueFailureSummary,
markActiveEmbeddedRunAbandoned,
markEmbeddedRunAbandoned,
queueEmbeddedAgentMessageWithOutcome,
queueEmbeddedAgentMessageWithOutcomeAsync,
requestEmbeddedRunModelSwitch,
@@ -412,6 +416,56 @@ describe("embedded-agent runner run registry", () => {
expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBeUndefined();
});
it("tracks timeout abandonment by session id, key, and file until a new run starts", () => {
const sessionFile = "/tmp/openclaw-abandoned-session.jsonl";
const handle = createRunHandle();
markEmbeddedRunAbandoned({
sessionId: "session-timeout",
sessionKey: "agent:main:main",
sessionFile,
reason: "timeout",
});
expect(isEmbeddedRunAbandoned({ sessionId: "session-timeout" })).toBe(true);
expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(true);
expect(isEmbeddedRunAbandoned({ sessionFile })).toBe(true);
setActiveEmbeddedRun("session-next", handle, "agent:main:main", sessionFile);
expect(isEmbeddedRunAbandoned({ sessionId: "session-timeout" })).toBe(false);
expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(false);
expect(isEmbeddedRunAbandoned({ sessionFile })).toBe(false);
markEmbeddedRunAbandoned({
sessionId: "session-next",
sessionKey: "agent:main:main",
reason: "timeout",
});
clearEmbeddedRunAbandonment({ sessionId: "session-next" });
expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(false);
});
it("ignores timeout abandonment from a stale replaced handle", () => {
const oldHandle = createRunHandle();
const newHandle = createRunHandle();
setActiveEmbeddedRun("session-replaced", oldHandle, "agent:main:main");
setActiveEmbeddedRun("session-replaced", newHandle, "agent:main:main");
expect(
markActiveEmbeddedRunAbandoned({
sessionId: "session-replaced",
handle: oldHandle,
sessionKey: "agent:main:main",
reason: "timeout",
}),
).toBe(false);
expect(isEmbeddedRunAbandoned({ sessionKey: "agent:main:main" })).toBe(false);
});
it("treats repeated clears for a completed run handle as idempotent", () => {
const debugSpy = vi.spyOn(diagnosticLogger, "debug").mockImplementation(() => undefined);
const handle = createRunHandle();

View File

@@ -24,10 +24,14 @@ import {
ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_FILE,
ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY,
ACTIVE_EMBEDDED_RUN_SNAPSHOTS,
ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID,
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE,
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY,
EMBEDDED_RUN_MODEL_SWITCH_REQUESTS,
EMBEDDED_RUN_WAITERS,
getActiveEmbeddedRunCount,
type ActiveEmbeddedRunSnapshot,
type AbandonedEmbeddedRun,
type EmbeddedAgentQueueHandle,
type EmbeddedAgentQueueMessageOptions,
type EmbeddedRunModelSwitchRequest,
@@ -136,6 +140,135 @@ function setActiveRunSessionFile(sessionFile: string | undefined, sessionId: str
);
}
function clearEmbeddedRunAbandonmentBySessionId(sessionId: string): void {
const abandonedRun = ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.get(sessionId);
if (!abandonedRun) {
return;
}
ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.delete(sessionId);
const normalizedSessionKey = abandonedRun.sessionKey?.trim();
if (
normalizedSessionKey &&
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.get(normalizedSessionKey) === sessionId
) {
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.delete(normalizedSessionKey);
}
const normalizedSessionFile = abandonedRun.sessionFile?.trim();
if (normalizedSessionFile) {
const sessionFileKey = resolveEmbeddedSessionFileKey(normalizedSessionFile);
if (ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.get(sessionFileKey) === sessionId) {
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.delete(sessionFileKey);
}
}
}
function clearEmbeddedRunAbandonmentBySessionKey(sessionKey: string | undefined): void {
const normalizedSessionKey = sessionKey?.trim();
if (!normalizedSessionKey) {
return;
}
const sessionId = ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.get(normalizedSessionKey);
if (sessionId) {
clearEmbeddedRunAbandonmentBySessionId(sessionId);
}
}
function clearEmbeddedRunAbandonmentBySessionFile(sessionFile: string | undefined): void {
const normalizedSessionFile = sessionFile?.trim();
if (!normalizedSessionFile) {
return;
}
const sessionFileKey = resolveEmbeddedSessionFileKey(normalizedSessionFile);
const sessionId = ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.get(sessionFileKey);
if (sessionId) {
clearEmbeddedRunAbandonmentBySessionId(sessionId);
}
}
export function clearEmbeddedRunAbandonment(params: {
sessionId?: string;
sessionKey?: string;
sessionFile?: string;
}): void {
const normalizedSessionId = params.sessionId?.trim();
if (normalizedSessionId) {
clearEmbeddedRunAbandonmentBySessionId(normalizedSessionId);
}
clearEmbeddedRunAbandonmentBySessionKey(params.sessionKey);
clearEmbeddedRunAbandonmentBySessionFile(params.sessionFile);
}
export function markEmbeddedRunAbandoned(params: {
sessionId: string;
sessionKey?: string;
sessionFile?: string;
reason: AbandonedEmbeddedRun["reason"];
}): void {
const sessionId = params.sessionId.trim();
if (!sessionId) {
return;
}
clearEmbeddedRunAbandonment({
sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
const abandonedRun: AbandonedEmbeddedRun = {
sessionId,
abandonedAtMs: Date.now(),
reason: params.reason,
...(params.sessionKey?.trim() ? { sessionKey: params.sessionKey.trim() } : {}),
...(params.sessionFile?.trim() ? { sessionFile: params.sessionFile.trim() } : {}),
};
ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.set(sessionId, abandonedRun);
if (abandonedRun.sessionKey) {
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.set(abandonedRun.sessionKey, sessionId);
}
if (abandonedRun.sessionFile) {
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.set(
resolveEmbeddedSessionFileKey(abandonedRun.sessionFile),
sessionId,
);
}
}
export function markActiveEmbeddedRunAbandoned(params: {
sessionId: string;
handle: EmbeddedAgentQueueHandle;
sessionKey?: string;
sessionFile?: string;
reason: AbandonedEmbeddedRun["reason"];
}): boolean {
const sessionId = params.sessionId.trim();
if (!sessionId || ACTIVE_EMBEDDED_RUNS.get(sessionId) !== params.handle) {
return false;
}
markEmbeddedRunAbandoned(params);
return true;
}
export function isEmbeddedRunAbandoned(params: {
sessionId?: string;
sessionKey?: string;
sessionFile?: string;
}): boolean {
const normalizedSessionId = params.sessionId?.trim();
if (normalizedSessionId && ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.has(normalizedSessionId)) {
return true;
}
const normalizedSessionKey = params.sessionKey?.trim();
if (normalizedSessionKey && ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.has(normalizedSessionKey)) {
return true;
}
const normalizedSessionFile = params.sessionFile?.trim();
return Boolean(
normalizedSessionFile &&
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.has(
resolveEmbeddedSessionFileKey(normalizedSessionFile),
),
);
}
function clearActiveRunSessionFiles(sessionId: string, sessionFile?: string): void {
const normalizedSessionFile = sessionFile?.trim();
if (normalizedSessionFile) {
@@ -586,6 +719,7 @@ export function setActiveEmbeddedRun(
sessionFile?: string,
) {
const wasActive = ACTIVE_EMBEDDED_RUNS.has(sessionId);
clearEmbeddedRunAbandonment({ sessionId, sessionKey, sessionFile });
ACTIVE_EMBEDDED_RUNS.set(sessionId, handle);
setActiveRunSessionKey(sessionKey, sessionId);
clearActiveRunSessionFiles(sessionId);
@@ -692,6 +826,9 @@ export const testing = {
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.clear();
ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY.clear();
ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_FILE.clear();
ABANDONED_EMBEDDED_RUNS_BY_SESSION_ID.clear();
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_KEY.clear();
ABANDONED_EMBEDDED_RUN_SESSION_IDS_BY_FILE.clear();
EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.clear();
},
};

View File

@@ -15,6 +15,7 @@ export { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
export {
formatEmbeddedAgentQueueFailureSummary,
isEmbeddedAgentRunActive,
isEmbeddedRunAbandoned,
queueEmbeddedAgentMessageWithOutcomeAsync,
resolveActiveEmbeddedRunSessionId,
} from "./embedded-agent-runner/runs.js";

View File

@@ -195,6 +195,7 @@ async function deliverSlackThreadAnnouncement(params: {
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
sourceTool?: string;
requesterAbandoned?: boolean;
}) {
testing.setDepsForTest({
callGateway: params.callGateway,
@@ -202,6 +203,7 @@ async function deliverSlackThreadAnnouncement(params: {
sessionId: params.sessionId,
isActive: params.isActive,
}),
isRequesterSessionAbandoned: () => params.requesterAbandoned === true,
getRuntimeConfig: () => ({}) as never,
sendMessage: params.sendMessage ?? runtimeSendMessage,
...(params.queueEmbeddedAgentMessageWithOutcome
@@ -276,6 +278,7 @@ async function deliverTelegramDirectMessageCompletion(params: {
requesterSessionKey?: string;
sourceTool?: string;
runtimeConfig?: Record<string, unknown>;
requesterAbandoned?: boolean;
origin?: {
channel: "telegram";
to: string;
@@ -298,6 +301,7 @@ async function deliverTelegramDirectMessageCompletion(params: {
: (params.requesterSessionId ?? "requester-session-telegram"),
isActive: params.isActive === true,
}),
isRequesterSessionAbandoned: () => params.requesterAbandoned === true,
getRuntimeConfig: () => (params.runtimeConfig ?? {}) as never,
sendMessage: params.sendMessage ?? runtimeSendMessage,
...(params.queueEmbeddedAgentMessageWithOutcome
@@ -1987,6 +1991,59 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
expect(sendMessage).not.toHaveBeenCalled();
});
it("does not restart an abandoned requester session for late completion delivery", async () => {
const callGateway = createGatewayMock({
result: {
payloads: [{ text: "child completion output" }],
},
});
const sendMessage = createSendMessageMock();
const queueEmbeddedAgentMessageWithOutcome = createQueueOutcomeMock(true);
const result = await deliverTelegramDirectMessageCompletion({
callGateway,
sendMessage,
requesterAbandoned: true,
isActive: false,
queueEmbeddedAgentMessageWithOutcome,
internalEvents: [
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
childSessionId: "child-session-id",
announceType: "subagent task",
taskLabel: "telegram late completion",
status: "ok",
statusLabel: "completed successfully",
result: "child completion output",
replyInstruction: "Summarize the result.",
},
],
});
expectRecordFields(result, {
delivered: false,
path: "none",
error: "requester session abandoned after timeout",
});
expect(result.phases).toEqual([
expect.objectContaining({
phase: "direct-primary",
delivered: false,
path: "none",
error: "requester session abandoned after timeout",
}),
expect.objectContaining({
phase: "steer-fallback",
delivered: false,
path: "none",
}),
]);
expect(callGateway).not.toHaveBeenCalled();
expect(sendMessage).not.toHaveBeenCalled();
expect(queueEmbeddedAgentMessageWithOutcome).not.toHaveBeenCalled();
});
it("uses steer fallback when a completion handoff has no visible output", async () => {
const callGateway = createGatewayMock({
result: {

View File

@@ -43,6 +43,7 @@ import {
dispatchGatewayMethodInProcess,
getGlobalHookRunner,
isEmbeddedAgentRunActive,
isEmbeddedRunAbandoned,
getRuntimeConfig,
formatEmbeddedAgentQueueFailureSummary,
loadSessionStore,
@@ -73,6 +74,7 @@ type SubagentAnnounceDeliveryDeps = {
sessionId?: string;
isActive: boolean;
};
isRequesterSessionAbandoned: (requesterSessionKey: string, sessionId?: string) => boolean;
queueEmbeddedAgentMessageWithOutcome: (
sessionId: string,
text: string,
@@ -93,6 +95,8 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
isActive: Boolean(sessionId && isEmbeddedAgentRunActive(sessionId)),
};
},
isRequesterSessionAbandoned: (requesterSessionKey, sessionId) =>
isEmbeddedRunAbandoned({ sessionKey: requesterSessionKey, sessionId }),
queueEmbeddedAgentMessageWithOutcome: queueEmbeddedAgentMessageWithOutcomeAsync,
sendMessage,
};
@@ -569,6 +573,9 @@ async function maybeSteerSubagentAnnounce(params: {
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
const { sessionId, isActive } = resolveRequesterSessionActivity(canonicalKey);
if (subagentAnnounceDeliveryDeps.isRequesterSessionAbandoned(canonicalKey, sessionId)) {
return { status: "none" };
}
if (!sessionId || !isActive) {
return { status: "none" };
}
@@ -1055,6 +1062,19 @@ async function sendSubagentAnnounceDirectly(params: {
completionRouteRequiresMessageToolDelivery ||
(agentMediatedCompletion && expectedMediaUrls.length > 0);
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
if (
params.expectsCompletionMessage &&
subagentAnnounceDeliveryDeps.isRequesterSessionAbandoned(
canonicalRequesterSessionKey,
requesterActivity.sessionId,
)
) {
return {
delivered: false,
path: "none",
error: "requester session abandoned after timeout",
};
}
let activeRequesterWakeFailed = false;
const tryGeneratedMediaDirectDelivery = async (announceResponse?: unknown) => {
if (requesterActivity.isActive && !activeRequesterWakeFailed) {