From b0a6543838acc099e79f6b46d153a97ed9a4294b Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 2 May 2026 23:08:10 -0700 Subject: [PATCH] fix(agents): keep delayed sessions_send replies alive (#76484) --- CHANGELOG.md | 1 + src/agents/openclaw-tools.sessions.test.ts | 168 ++++++++++++++++++ .../tools/sessions-send-tool.a2a.test.ts | 46 ++++- src/agents/tools/sessions-send-tool.a2a.ts | 16 +- src/agents/tools/sessions-send-tool.ts | 15 ++ 5 files changed, 243 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 430439bd83e..3099e069c91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/sessions: keep delayed `sessions_send` A2A replies alive after soft wait-window timeouts, while preserving terminal run timeouts and avoiding stale target replies in requester sessions. Fixes #76443. Thanks @ryswork1993 and @vincentkoc. - Channels/secrets: resolve SecretRef-backed channel credentials through external plugin secret contracts after the plugin split, covering runtime startup, target discovery, webhook auth, disabled-account enumeration, and late-bound web_search config. Fixes #76371. (#76449) Thanks @joshavant and @neeravmakwana. - Docker/Gateway: pass Docker setup `.env` values into gateway and CLI containers and preserve exec SecretRef `passEnv` keys in managed service plans, so 1Password Connect-backed Discord tokens keep resolving after doctor or plugin repair. Thanks @vincentkoc. - Control UI/WebChat: explain compaction boundaries in chat history and link directly to session checkpoint controls so pre-compaction turns no longer look silently lost after refresh. Fixes #76415. Thanks @BunsDev. diff --git a/src/agents/openclaw-tools.sessions.test.ts b/src/agents/openclaw-tools.sessions.test.ts index 9428273f608..7ca3336d429 100644 --- a/src/agents/openclaw-tools.sessions.test.ts +++ b/src/agents/openclaw-tools.sessions.test.ts @@ -1098,6 +1098,174 @@ describe("sessions tools", () => { }); }); + it("sessions_send keeps delayed requester replies alive after a wait timeout", async () => { + const calls: Array<{ method?: string; params?: unknown }> = []; + const requesterKey = "agent:main:main"; + const targetKey = "agent:director1:main"; + let targetWaitCount = 0; + callGatewayMock.mockImplementation(async (opts: unknown) => { + const request = opts as { method?: string; params?: unknown }; + calls.push(request); + if (request.method === "agent") { + const params = request.params as { sessionKey?: string } | undefined; + if (params?.sessionKey === targetKey) { + return { runId: "run-target", status: "accepted", acceptedAt: 2000 }; + } + if (params?.sessionKey === requesterKey) { + return { runId: "run-requester", status: "accepted", acceptedAt: 2001 }; + } + } + if (request.method === "agent.wait") { + const params = request.params as { runId?: string } | undefined; + if (params?.runId === "run-target") { + targetWaitCount += 1; + return targetWaitCount === 1 + ? { runId: "run-target", status: "timeout" } + : { runId: "run-target", status: "ok" }; + } + if (params?.runId === "run-requester") { + return { runId: "run-requester", status: "ok" }; + } + } + if (request.method === "chat.history") { + const params = request.params as { sessionKey?: string } | undefined; + if (params?.sessionKey === targetKey && targetWaitCount > 1) { + return { + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "late director reply" }], + timestamp: 20, + }, + ], + }; + } + if (params?.sessionKey === requesterKey) { + return { + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "requester saw director" }], + timestamp: 21, + }, + ], + }; + } + return { messages: [] }; + } + return {}; + }); + + const tool = createOpenClawTools({ + agentSessionKey: requesterKey, + agentChannel: "discord", + config: { + ...TEST_CONFIG, + session: { + ...TEST_CONFIG.session, + agentToAgent: { maxPingPongTurns: 1 }, + }, + }, + }).find((candidate) => candidate.name === "sessions_send"); + expect(tool).toBeDefined(); + if (!tool) { + throw new Error("missing sessions_send tool"); + } + + const result = await tool.execute("call-delayed", { + sessionKey: targetKey, + message: "ping", + timeoutSeconds: 1, + }); + expect(result.details).toMatchObject({ + status: "accepted", + sessionKey: targetKey, + delivery: { status: "pending", mode: "announce" }, + }); + + await vi.waitFor( + () => { + const requesterReplyCall = calls.find( + (call) => + call.method === "agent" && + (call.params as { sessionKey?: string } | undefined)?.sessionKey === requesterKey, + ); + expect(requesterReplyCall).toBeDefined(); + }, + { timeout: 2_000, interval: 5 }, + ); + + const requesterReplyCall = calls.find( + (call) => + call.method === "agent" && + (call.params as { sessionKey?: string } | undefined)?.sessionKey === requesterKey, + ); + const replyParams = requesterReplyCall?.params as + | { + extraSystemPrompt?: string; + inputProvenance?: { sourceSessionKey?: string }; + message?: string; + sessionKey?: string; + } + | undefined; + expect(replyParams).toMatchObject({ + sessionKey: requesterKey, + inputProvenance: { sourceSessionKey: targetKey }, + }); + expect(replyParams?.message).toContain("late director reply"); + expect(replyParams?.extraSystemPrompt).toContain("Agent-to-agent reply step"); + expect(replyParams?.extraSystemPrompt).toContain("Current agent: Agent 1 (requester)"); + expect(calls.find((call) => call.method === "send")).toBeUndefined(); + }); + + it("sessions_send preserves terminal timeouts without starting A2A", async () => { + const calls: Array<{ method?: string; params?: unknown }> = []; + const requesterKey = "agent:main:main"; + const targetKey = "agent:director1:main"; + callGatewayMock.mockImplementation(async (opts: unknown) => { + const request = opts as { method?: string; params?: unknown }; + calls.push(request); + if (request.method === "agent") { + return { runId: "run-terminal", status: "accepted", acceptedAt: 2000 }; + } + if (request.method === "agent.wait") { + return { + runId: "run-terminal", + status: "timeout", + endedAt: 3000, + stopReason: "timeout", + error: "agent run timed out", + }; + } + if (request.method === "chat.history") { + return { messages: [] }; + } + return {}; + }); + + const tool = createOpenClawTools({ + agentSessionKey: requesterKey, + agentChannel: "discord", + }).find((candidate) => candidate.name === "sessions_send"); + expect(tool).toBeDefined(); + if (!tool) { + throw new Error("missing sessions_send tool"); + } + + const result = await tool.execute("call-terminal", { + sessionKey: targetKey, + message: "ping", + timeoutSeconds: 1, + }); + expect(result.details).toMatchObject({ + status: "timeout", + error: "agent run timed out", + sessionKey: targetKey, + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(calls.filter((call) => call.method === "agent")).toHaveLength(1); + }); + it("sessions_send skips duplicate A2A delivery for waited parent-owned native subagents", async () => { const calls: Array<{ method?: string; params?: unknown }> = []; const requesterKey = "agent:main:discord:direct:parent"; diff --git a/src/agents/tools/sessions-send-tool.a2a.test.ts b/src/agents/tools/sessions-send-tool.a2a.test.ts index 310ffeabf22..209b0dac1ae 100644 --- a/src/agents/tools/sessions-send-tool.a2a.test.ts +++ b/src/agents/tools/sessions-send-tool.a2a.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { CallGatewayOptions } from "../../gateway/call.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; import { createSessionConversationTestRegistry } from "../../test-utils/session-conversation-registry.js"; +import { readLatestAssistantReplySnapshot, waitForAgentRun } from "../run-wait.js"; import { runAgentStep } from "./agent-step.js"; import type { SessionListRow } from "./sessions-helpers.js"; import { runSessionsSendA2AFlow, __testing } from "./sessions-send-tool.a2a.js"; @@ -14,7 +15,10 @@ vi.mock("../../gateway/call.js", () => ({ vi.mock("../run-wait.js", () => ({ waitForAgentRun: vi.fn().mockResolvedValue({ status: "ok" }), - readLatestAssistantReply: vi.fn().mockResolvedValue("Test announce reply"), + readLatestAssistantReplySnapshot: vi.fn().mockResolvedValue({ + text: "Test announce reply", + fingerprint: "test-announce-reply", + }), })); vi.mock("./agent-step.js", () => ({ @@ -40,6 +44,11 @@ describe("runSessionsSendA2AFlow announce delivery", () => { callGatewayMock.mockImplementation(callGateway); vi.clearAllMocks(); vi.mocked(runAgentStep).mockResolvedValue("Test announce reply"); + vi.mocked(waitForAgentRun).mockResolvedValue({ status: "ok" }); + vi.mocked(readLatestAssistantReplySnapshot).mockResolvedValue({ + text: "Test announce reply", + fingerprint: "test-announce-reply", + }); __testing.setDepsForTest({ callGateway, }); @@ -153,6 +162,41 @@ describe("runSessionsSendA2AFlow announce delivery", () => { }, ); + it("does not inject a delayed reply that matches the baseline", async () => { + vi.mocked(readLatestAssistantReplySnapshot).mockResolvedValueOnce({ + text: "same reply", + fingerprint: "same-reply", + }); + + await runSessionsSendA2AFlow({ + targetSessionKey: "agent:main:discord:group:dev", + displayKey: "agent:main:discord:group:dev", + message: "Test message", + announceTimeoutMs: 10_000, + maxPingPongTurns: 2, + requesterSessionKey: "agent:main:discord:group:req", + requesterChannel: "discord", + baseline: { + text: "same reply", + fingerprint: "same-reply", + }, + waitRunId: "run-delayed", + }); + + expect(waitForAgentRun).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-delayed", + }), + ); + expect(readLatestAssistantReplySnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:main:discord:group:dev", + }), + ); + expect(runAgentStep).not.toHaveBeenCalled(); + expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined(); + }); + it.each(["NO_REPLY", "HEARTBEAT_OK"])( "suppresses exact announce control reply %s before channel delivery", async (announceReply) => { diff --git a/src/agents/tools/sessions-send-tool.a2a.ts b/src/agents/tools/sessions-send-tool.a2a.ts index 90a2700762e..220e4e8024f 100644 --- a/src/agents/tools/sessions-send-tool.a2a.ts +++ b/src/agents/tools/sessions-send-tool.a2a.ts @@ -4,7 +4,11 @@ import { formatErrorMessage } from "../../infra/errors.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import type { GatewayMessageChannel } from "../../utils/message-channel.js"; import { resolveNestedAgentLaneForSession } from "../lanes.js"; -import { readLatestAssistantReply, waitForAgentRun } from "../run-wait.js"; +import { + type AssistantReplySnapshot, + readLatestAssistantReplySnapshot, + waitForAgentRun, +} from "../run-wait.js"; import { runAgentStep } from "./agent-step.js"; import { resolveAnnounceTarget } from "./sessions-announce-target.js"; import { @@ -38,6 +42,7 @@ export async function runSessionsSendA2AFlow(params: { maxPingPongTurns: number; requesterSessionKey?: string; requesterChannel?: GatewayMessageChannel; + baseline?: AssistantReplySnapshot; roundOneReply?: string; waitRunId?: string; }) { @@ -52,9 +57,16 @@ export async function runSessionsSendA2AFlow(params: { callGateway: sessionsSendA2ADeps.callGateway, }); if (wait.status === "ok") { - primaryReply = await readLatestAssistantReply({ + const latestSnapshot = await readLatestAssistantReplySnapshot({ sessionKey: params.targetSessionKey, + callGateway: sessionsSendA2ADeps.callGateway, }); + const baselineFingerprint = params.baseline?.fingerprint; + primaryReply = + latestSnapshot.text && + (!baselineFingerprint || latestSnapshot.fingerprint !== baselineFingerprint) + ? latestSnapshot.text + : undefined; latestReply = primaryReply; } } diff --git a/src/agents/tools/sessions-send-tool.ts b/src/agents/tools/sessions-send-tool.ts index 90fcaa93baf..26ed967a54d 100644 --- a/src/agents/tools/sessions-send-tool.ts +++ b/src/agents/tools/sessions-send-tool.ts @@ -20,6 +20,7 @@ import { } from "../../utils/message-channel.js"; import { resolveNestedAgentLaneForSession } from "../lanes.js"; import { + type AgentWaitResult, readLatestAssistantReplySnapshot, waitForAgentRunAndReadUpdatedAssistantReply, } from "../run-wait.js"; @@ -71,6 +72,10 @@ function isRequesterParentOfNativeSubagentSession(params: { return requester === spawnedBy || requester === parentSessionKey; } +function isTerminalAgentWaitTimeout(result: AgentWaitResult): boolean { + return result.endedAt !== undefined || Boolean(result.stopReason || result.livenessState); +} + async function startAgentRun(params: { callGateway: GatewayCaller; runId: string; @@ -376,6 +381,7 @@ export function createSessionsSendTool(opts?: { maxPingPongTurns, requesterSessionKey, requesterChannel, + baseline: baselineReply, roundOneReply, waitRunId, }); @@ -421,6 +427,15 @@ export function createSessionsSendTool(opts?: { }); if (result.status === "timeout") { + if (!isTerminalAgentWaitTimeout(result)) { + startA2AFlow(undefined, runId); + return jsonResult({ + runId, + status: "accepted", + sessionKey: displayKey, + delivery, + }); + } return jsonResult({ runId, status: "timeout",