From 81e7e8ef24dee045fe2bce7d0b50c169e718a699 Mon Sep 17 00:00:00 2001 From: Andy Ye <35905412+TurboTheTurtle@users.noreply.github.com> Date: Tue, 26 May 2026 22:30:56 -0700 Subject: [PATCH] fix: handle sessions_send active fallback failures (#86638) Fix run-scoped sessions_send active-run fallback handling. - surface active queue rejection plus durable fallback admission failures instead of returning accepted too early - return fallback run/session metadata so normal A2A announcement waits on the fallback run - retry active steering without transcript-commit waiting when the active runtime does not support it Thanks @TurboTheTurtle. Verification: - node scripts/run-vitest.mjs src/agents/openclaw-tools.sessions.test.ts - pnpm check:test-types - git diff --check - .agents/skills/autoreview/scripts/autoreview --mode branch --base origin/main --- src/agents/openclaw-tools.sessions.test.ts | 207 +++++++++++++++++++++ src/agents/tools/sessions-send-tool.ts | 97 +++++++++- 2 files changed, 299 insertions(+), 5 deletions(-) diff --git a/src/agents/openclaw-tools.sessions.test.ts b/src/agents/openclaw-tools.sessions.test.ts index c836628db26..633f52556cb 100644 --- a/src/agents/openclaw-tools.sessions.test.ts +++ b/src/agents/openclaw-tools.sessions.test.ts @@ -33,6 +33,7 @@ vi.mock("../config/config.js", () => ({ import "./test-helpers/fast-openclaw-tools-sessions.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { testing as embeddedRunsTesting, setActiveEmbeddedRun } from "./pi-embedded-runner/runs.js"; import { testing as agentStepTesting } from "./tools/agent-step.js"; import { createSessionsHistoryTool } from "./tools/sessions-history-tool.js"; import { createSessionsListTool } from "./tools/sessions-list-tool.js"; @@ -224,6 +225,7 @@ function sessionsSendDetails(details: unknown): SessionsSendDetails { describe("sessions tools", () => { beforeEach(() => { callGatewayMock.mockClear(); + embeddedRunsTesting.resetActiveEmbeddedRuns(); loadSessionEntryByKeyMock.mockReset(); loadSessionEntryByKeyMock.mockReturnValue(undefined); installMessagingTestRegistry(); @@ -1296,6 +1298,211 @@ describe("sessions tools", () => { expect(calls.find((call) => call.method === "send")).toBeUndefined(); }); + it("sessions_send reroutes run-scoped active deliveries when transcript steering is rejected", async () => { + const calls: Array<{ method?: string; params?: unknown }> = []; + const requesterKey = "agent:re-portal:main"; + const runScopedCallerKey = "agent:leasing-ops:cron:monthly-utility:run:run-fast"; + const durableCallerKey = "agent:leasing-ops:cron:monthly-utility"; + const queueMessage = vi.fn(async (_text: string, _options?: unknown) => { + throw new Error("active session ended before queued steering message was committed"); + }); + setActiveEmbeddedRun( + "caller-active-session", + { + queueMessage, + isStreaming: () => true, + isCompacting: () => false, + supportsTranscriptCommitWait: true, + abort: () => {}, + }, + runScopedCallerKey, + ); + callGatewayMock.mockImplementation(async (opts: unknown) => { + const request = opts as { method?: string; params?: unknown }; + calls.push(request); + if (request.method === "agent") { + return { runId: "fallback-run", status: "accepted", acceptedAt: 2000 }; + } + if (request.method === "agent.wait") { + const params = request.params as { runId?: string } | undefined; + return { runId: params?.runId ?? "fallback-run", status: "ok" }; + } + if (request.method === "chat.history") { + return { messages: [] }; + } + return {}; + }); + + const tool = createOpenClawTools({ + agentSessionKey: requesterKey, + agentChannel: "telegram", + config: { + ...TEST_CONFIG, + session: { + ...TEST_CONFIG.session, + agentToAgent: { maxPingPongTurns: 0 }, + }, + }, + }).find((candidate) => candidate.name === "sessions_send"); + if (!tool) { + throw new Error("missing sessions_send tool"); + } + + const result = await tool.execute("call-run-scoped-caller", { + sessionKey: runScopedCallerKey, + message: "[TASK-COMPLETE] re-portal occupancy ready", + timeoutSeconds: 0, + }); + const details = sessionsSendDetails(result.details); + expect(details.status).toBe("accepted"); + expect(details.sessionKey).toBe(runScopedCallerKey); + expect(details.delivery?.status).toBe("pending"); + const queuedText = queueMessage.mock.calls[0]?.[0]; + expect(queuedText).toContain("[Inter-session message]"); + expect(queuedText).toContain("[TASK-COMPLETE] re-portal occupancy ready"); + expect(queueMessage).toHaveBeenCalledWith(queuedText, { + steeringMode: "all", + debounceMs: 0, + deliveryTimeoutMs: 30_000, + waitForTranscriptCommit: true, + }); + + await vi.waitFor(() => { + const fallbackCall = calls.find( + (call) => + call.method === "agent" && + (call.params as { sessionKey?: string } | undefined)?.sessionKey === durableCallerKey, + ); + expect(fallbackCall).toBeDefined(); + }); + + const agentCalls = calls.filter((call) => call.method === "agent"); + expect( + agentCalls.some( + (call) => + (call.params as { sessionKey?: string } | undefined)?.sessionKey === runScopedCallerKey, + ), + ).toBe(false); + const fallbackParams = agentCalls.find( + (call) => + (call.params as { sessionKey?: string } | undefined)?.sessionKey === durableCallerKey, + )?.params as { inputProvenance?: { sourceSessionKey?: string }; message?: string } | undefined; + expect(fallbackParams?.message).toContain("[Inter-session message]"); + expect(fallbackParams?.message).toContain("[TASK-COMPLETE] re-portal occupancy ready"); + expect(fallbackParams?.inputProvenance?.sourceSessionKey).toBe(requesterKey); + + await vi.waitFor(() => { + const waitCall = calls.find( + (call) => + call.method === "agent.wait" && + (call.params as { runId?: string } | undefined)?.runId === "fallback-run", + ); + expect(waitCall).toBeDefined(); + }); + await vi.waitFor(() => { + const historyCall = calls.find( + (call) => + call.method === "chat.history" && + (call.params as { sessionKey?: string } | undefined)?.sessionKey === durableCallerKey, + ); + expect(historyCall).toBeDefined(); + }); + }); + + it("sessions_send preserves active delivery when transcript commit wait is unsupported", async () => { + const calls: Array<{ method?: string }> = []; + const runScopedCallerKey = "agent:leasing-ops:cron:monthly-utility:run:run-fast"; + const queueMessage = vi.fn(async () => {}); + setActiveEmbeddedRun( + "caller-active-session", + { + queueMessage, + isStreaming: () => true, + isCompacting: () => false, + abort: () => {}, + }, + runScopedCallerKey, + ); + callGatewayMock.mockImplementation(async (opts: unknown) => { + const request = opts as { method?: string }; + calls.push(request); + if (request.method === "agent") { + throw new Error("fallback agent should not start"); + } + return {}; + }); + + const tool = createOpenClawTools({ + agentSessionKey: "agent:re-portal:main", + agentChannel: "telegram", + }).find((candidate) => candidate.name === "sessions_send"); + if (!tool) { + throw new Error("missing sessions_send tool"); + } + + const result = await tool.execute("call-run-scoped-caller", { + sessionKey: runScopedCallerKey, + message: "[TASK-COMPLETE] re-portal occupancy ready", + timeoutSeconds: 0, + }); + + const details = sessionsSendDetails(result.details); + expect(details.status).toBe("accepted"); + expect(details.sessionKey).toBe(runScopedCallerKey); + expect(queueMessage).toHaveBeenCalledOnce(); + expect(queueMessage).toHaveBeenCalledWith(expect.stringContaining("[Inter-session message]"), { + steeringMode: "all", + debounceMs: 0, + deliveryTimeoutMs: 30_000, + }); + expect(calls.some((call) => call.method === "agent")).toBe(false); + }); + + it("sessions_send reports run-scoped fallback admission failures", async () => { + const runScopedCallerKey = "agent:leasing-ops:cron:monthly-utility:run:run-fast"; + const queueMessage = vi.fn(async () => { + throw new Error("active session ended before queued steering message was committed"); + }); + setActiveEmbeddedRun( + "caller-active-session", + { + queueMessage, + isStreaming: () => true, + isCompacting: () => false, + supportsTranscriptCommitWait: true, + abort: () => {}, + }, + runScopedCallerKey, + ); + callGatewayMock.mockImplementation(async (opts: unknown) => { + const request = opts as { method?: string; params?: unknown }; + if (request.method === "agent") { + throw new Error("gateway request timeout for agent"); + } + return {}; + }); + + const tool = createOpenClawTools({ + agentSessionKey: "agent:re-portal:main", + agentChannel: "telegram", + }).find((candidate) => candidate.name === "sessions_send"); + if (!tool) { + throw new Error("missing sessions_send tool"); + } + + const result = await tool.execute("call-run-scoped-caller", { + sessionKey: runScopedCallerKey, + message: "[TASK-COMPLETE] re-portal occupancy ready", + timeoutSeconds: 0, + }); + + const details = sessionsSendDetails(result.details); + expect(details.status).toBe("error"); + expect(details.sessionKey).toBe(runScopedCallerKey); + expect(details.error).toContain("queue_message_failed reason=runtime_rejected"); + expect(details.error).toContain("fallback_failed error=gateway request timeout for agent"); + }); + it("sessions_send preserves terminal timeouts without starting A2A", async () => { const calls: Array<{ method?: string; params?: unknown }> = []; const requesterKey = "agent:main:main"; diff --git a/src/agents/tools/sessions-send-tool.ts b/src/agents/tools/sessions-send-tool.ts index e9c9f830e8b..d34a013fc1a 100644 --- a/src/agents/tools/sessions-send-tool.ts +++ b/src/agents/tools/sessions-send-tool.ts @@ -21,6 +21,12 @@ import { } from "../../utils/message-channel.js"; import { listAgentIds } from "../agent-scope.js"; import { resolveNestedAgentLaneForSession } from "../lanes.js"; +import { + type EmbeddedPiQueueMessageOptions, + formatEmbeddedPiQueueFailureSummary, + queueEmbeddedPiMessageWithOutcomeAsync, + resolveActiveEmbeddedRunSessionId, +} from "../pi-embedded-runner/runs.js"; import { type AgentWaitResult, readLatestAssistantReplySnapshot, @@ -55,6 +61,11 @@ const SessionsSendToolSchema = Type.Object({ type GatewayCaller = typeof callGateway; const SESSIONS_SEND_REPLY_HISTORY_LIMIT = 50; +function resolveRunScopedFallbackSessionKey(sessionKey: string): string | undefined { + const match = /^(agent:[^:]+:.+):run:[^:]+$/.exec(sessionKey.trim()); + return match?.[1]; +} + function resolveConfiguredAgentMainSessionKey(params: { cfg: OpenClawConfig; agentId: string; @@ -155,8 +166,74 @@ async function startAgentRun(params: { runId: string; sendParams: Record; sessionKey: string; -}): Promise<{ ok: true; runId: string } | { ok: false; result: ReturnType }> { + deliveryTimeoutMs?: number; + allowActiveRunQueueFallback?: boolean; +}): Promise< + | { + ok: true; + runId: string; + activeRunQueue?: boolean; + a2aSessionKey?: string; + a2aDisplayKey?: string; + } + | { ok: false; result: ReturnType } +> { try { + const activeRunSessionId = params.allowActiveRunQueueFallback + ? resolveActiveEmbeddedRunSessionId(params.sessionKey) + : undefined; + const fallbackSessionKey = activeRunSessionId + ? resolveRunScopedFallbackSessionKey(params.sessionKey) + : undefined; + const messageText = + typeof params.sendParams.message === "string" ? params.sendParams.message : undefined; + if (activeRunSessionId && fallbackSessionKey && messageText) { + const queueOptions: EmbeddedPiQueueMessageOptions = { + steeringMode: "all", + debounceMs: 0, + deliveryTimeoutMs: params.deliveryTimeoutMs, + waitForTranscriptCommit: true, + }; + let queueOutcome = await queueEmbeddedPiMessageWithOutcomeAsync( + activeRunSessionId, + messageText, + queueOptions, + ); + if (!queueOutcome.queued && queueOutcome.reason === "transcript_commit_wait_unsupported") { + const bestEffortQueueOptions = { ...queueOptions }; + delete bestEffortQueueOptions.waitForTranscriptCommit; + queueOutcome = await queueEmbeddedPiMessageWithOutcomeAsync( + activeRunSessionId, + messageText, + bestEffortQueueOptions, + ); + } + if (queueOutcome.queued) { + return { ok: true, runId: params.runId, activeRunQueue: true }; + } + try { + const response = await params.callGateway<{ runId: string }>({ + method: "agent", + params: { + ...params.sendParams, + sessionKey: fallbackSessionKey, + idempotencyKey: crypto.randomUUID(), + }, + timeoutMs: 10_000, + }); + return { + ok: true, + runId: + typeof response?.runId === "string" && response.runId ? response.runId : params.runId, + a2aSessionKey: fallbackSessionKey, + a2aDisplayKey: fallbackSessionKey, + }; + } catch (err) { + const queueSummary = + formatEmbeddedPiQueueFailureSummary(queueOutcome) ?? "active run queue rejected"; + throw new Error(`${queueSummary}; fallback_failed error=${formatErrorMessage(err)}`); + } + } const response = await params.callGateway<{ runId: string }>({ method: "agent", params: params.sendParams, @@ -473,13 +550,18 @@ export function createSessionsSendTool(opts?: { ? ({ status: "skipped", mode: "announce" } as const) : ({ status: "pending", mode: "announce" } as const); - const startA2AFlow = (roundOneReply?: string, waitRunId?: string) => { + const startA2AFlow = ( + roundOneReply?: string, + waitRunId?: string, + flowTargetSessionKey = resolvedKey, + flowDisplayKey = displayKey, + ) => { if (skipA2AFlow) { return; } void runSessionsSendA2AFlow({ - targetSessionKey: resolvedKey, - displayKey, + targetSessionKey: flowTargetSessionKey, + displayKey: flowDisplayKey, message, announceTimeoutMs, maxPingPongTurns, @@ -497,12 +579,16 @@ export function createSessionsSendTool(opts?: { runId, sendParams, sessionKey: displayKey, + deliveryTimeoutMs: announceTimeoutMs, + allowActiveRunQueueFallback: true, }); if (!start.ok) { return start.result; } runId = start.runId; - startA2AFlow(undefined, runId); + if (!start.activeRunQueue) { + startA2AFlow(undefined, runId, start.a2aSessionKey, start.a2aDisplayKey); + } return jsonResult({ runId, status: "accepted", @@ -516,6 +602,7 @@ export function createSessionsSendTool(opts?: { runId, sendParams, sessionKey: displayKey, + deliveryTimeoutMs: announceTimeoutMs, }); if (!start.ok) { return start.result;