From 817e6e810b7d66dcba9c3f28cbdff00664d898e0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 07:20:47 +0100 Subject: [PATCH] fix(sessions): suppress a2a control echoes --- CHANGELOG.md | 1 + src/agents/openclaw-tools.sessions.test.ts | 49 ++++++++------- src/agents/tools/agent-step.test.ts | 35 +++++++++++ src/agents/tools/agent-step.ts | 60 +++++++++++++++++-- src/agents/tools/sessions-send-helpers.ts | 1 + src/agents/tools/sessions-send-tokens.ts | 13 ++++ .../tools/sessions-send-tool.a2a.test.ts | 46 ++++++++++++++ src/agents/tools/sessions-send-tool.a2a.ts | 15 ++++- src/gateway/server-methods/agent.test.ts | 30 ++++++++++ 9 files changed, 224 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12200846710..45c2011d869 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai - Agents/compaction: submit a non-empty runtime-event marker for pre-compaction memory flush turns, so strict Anthropic providers no longer reject the silent flush as an empty user message. Fixes #75305. Thanks @sableassistant3777-source. - Plugin SDK: re-export `isPrivateIpAddress` from `plugin-sdk/ssrf-runtime`, restoring source-checkout builds for SearXNG and Firecrawl private-network guards. Thanks @vincentkoc. - Discord/message actions: advertise `upload-file` and route it through Discord's send runtime with agent-scoped media reads, so agents can discover and send file attachments. Fixes #60652 and supersedes #60808, #61087, and #61100. Thanks @claw-io, @efe-arv, @joelnishanth, and @sjhddh. +- Sessions: suppress exact inter-session control replies such as `NO_REPLY` and keep agent-to-agent announce bookkeeping out of visible transcripts. Fixes #53145. Thanks @TarahAssistant. - CLI/directory: report unsupported directory operations for installed channel plugins instead of prompting to reinstall the plugin when it lacks a directory adapter. Fixes #75770. Thanks @lawong888. - Web search/SearXNG: show the JSON API `search.formats` prerequisite during SearXNG setup before prompting for the base URL. Supersedes #65592. Thanks @evanpaul14. - Web search/SearXNG: pass through `img_src` image URLs from SearXNG image-category results. Supersedes #61416. Thanks @sghael. diff --git a/src/agents/openclaw-tools.sessions.test.ts b/src/agents/openclaw-tools.sessions.test.ts index 74317996f17..5e226c250b2 100644 --- a/src/agents/openclaw-tools.sessions.test.ts +++ b/src/agents/openclaw-tools.sessions.test.ts @@ -158,6 +158,10 @@ describe("sessions tools", () => { callGatewayMock.mockClear(); installMessagingTestRegistry(); agentStepTesting.setDepsForTest({ + agentCommandFromIngress: async () => ({ + payloads: [{ text: "ANNOUNCE_SKIP", mediaUrl: null }], + meta: { durationMs: 1 }, + }), callGateway: (opts: unknown) => callGatewayMock(opts), }); sessionsResolutionTesting.setDepsForTest({ @@ -860,9 +864,9 @@ describe("sessions tools", () => { runId: "run-1", delivery: { status: "pending", mode: "announce" }, }); - await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 4); - await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 4); - await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 4); + await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 3); + await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 3); + await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 3); const waitPromise = tool.execute("call6", { sessionKey: "main", @@ -876,14 +880,14 @@ describe("sessions tools", () => { delivery: { status: "pending", mode: "announce" }, }); expect(typeof (waited.details as { runId?: string }).runId).toBe("string"); - await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 8); - await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 8); - await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 8); + await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 6); + await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 6); + await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 7); const agentCalls = calls.filter((call) => call.method === "agent"); const waitCalls = calls.filter((call) => call.method === "agent.wait"); const historyOnlyCalls = calls.filter((call) => call.method === "chat.history"); - expect(agentCalls).toHaveLength(8); + expect(agentCalls).toHaveLength(6); for (const call of agentCalls) { expect(call.params).toMatchObject({ message: expect.stringContaining("[Inter-session message"), @@ -911,17 +915,8 @@ describe("sessions tools", () => { ), ), ).toBe(true); - expect( - agentCalls.some( - (call) => - typeof (call.params as { extraSystemPrompt?: string })?.extraSystemPrompt === "string" && - (call.params as { extraSystemPrompt?: string })?.extraSystemPrompt?.includes( - "Agent-to-agent announce step", - ), - ), - ).toBe(true); - expect(waitCalls).toHaveLength(8); - expect(historyOnlyCalls).toHaveLength(9); + expect(waitCalls).toHaveLength(6); + expect(historyOnlyCalls).toHaveLength(7); expect(sendCallCount).toBe(0); }); @@ -1038,6 +1033,13 @@ describe("sessions tools", () => { } return {}; }); + agentStepTesting.setDepsForTest({ + agentCommandFromIngress: async () => ({ + payloads: [{ text: "announce now", mediaUrl: null }], + meta: { durationMs: 1 }, + }), + callGateway: (opts: unknown) => callGatewayMock(opts), + }); const tool = createOpenClawTools({ agentSessionKey: requesterKey, @@ -1059,13 +1061,13 @@ describe("sessions tools", () => { }); await vi.waitFor( () => { - expect(calls.filter((call) => call.method === "agent")).toHaveLength(4); + expect(calls.filter((call) => call.method === "agent")).toHaveLength(3); }, { timeout: 2_000, interval: 5 }, ); const agentCalls = calls.filter((call) => call.method === "agent"); - expect(agentCalls).toHaveLength(4); + expect(agentCalls).toHaveLength(3); for (const call of agentCalls) { expect(call.params).toMatchObject({ lane: expect.stringMatching(/^nested(?::|$)/), @@ -1184,6 +1186,13 @@ describe("sessions tools", () => { } return {}; }); + agentStepTesting.setDepsForTest({ + agentCommandFromIngress: async () => ({ + payloads: [{ text: "announce now", mediaUrl: null }], + meta: { durationMs: 1 }, + }), + callGateway: (opts: unknown) => callGatewayMock(opts), + }); const tool = createOpenClawTools({ agentSessionKey: requesterKey, diff --git a/src/agents/tools/agent-step.test.ts b/src/agents/tools/agent-step.test.ts index 002e5f7536c..94d1aa3defe 100644 --- a/src/agents/tools/agent-step.test.ts +++ b/src/agents/tools/agent-step.test.ts @@ -84,4 +84,39 @@ describe("runAgentStep", () => { expect(bundleMcpRuntimeMocks.retireSessionMcpRuntimeForSessionKey).not.toHaveBeenCalled(); }); + + it("forwards explicit transcript bodies for nested bookkeeping turns", async () => { + const gatewayCalls: CallGatewayOptions[] = []; + const agentCommandFromIngress = vi.fn(async () => ({ + payloads: [{ text: "done", mediaUrl: null }], + meta: { durationMs: 1 }, + })); + __testing.setDepsForTest({ + agentCommandFromIngress, + callGateway: async (opts: CallGatewayOptions): Promise => { + gatewayCalls.push(opts); + return { runId: "run-nested" } as T; + }, + }); + runWaitMocks.waitForAgentRunAndReadUpdatedAssistantReply.mockResolvedValue({ + status: "ok", + replyText: "done", + }); + + await runAgentStep({ + sessionKey: "agent:main:subagent:child", + message: "internal announce step", + transcriptMessage: "", + extraSystemPrompt: "announce only", + timeoutMs: 10_000, + }); + + expect(gatewayCalls).toEqual([]); + expect(agentCommandFromIngress).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringContaining("internal announce step"), + transcriptMessage: "", + }), + ); + }); }); diff --git a/src/agents/tools/agent-step.ts b/src/agents/tools/agent-step.ts index 6f4cbc18299..d8d2e2a3544 100644 --- a/src/agents/tools/agent-step.ts +++ b/src/agents/tools/agent-step.ts @@ -9,15 +9,38 @@ import { waitForAgentRunAndReadUpdatedAssistantReply } from "../run-wait.js"; export { readLatestAssistantReply } from "../run-wait.js"; type GatewayCaller = typeof callGateway; +type AgentCommandRunner = typeof import("../../commands/agent.js").agentCommandFromIngress; const defaultAgentStepDeps = { + agentCommandFromIngress: (async (...args) => { + const { agentCommandFromIngress } = await import("../../commands/agent.js"); + return await agentCommandFromIngress(...args); + }) as AgentCommandRunner, callGateway, }; let agentStepDeps: { + agentCommandFromIngress: AgentCommandRunner; callGateway: GatewayCaller; } = defaultAgentStepDeps; +function extractAgentCommandReply(result: unknown): string | undefined { + const payloads = (result as { payloads?: unknown } | undefined)?.payloads; + if (!Array.isArray(payloads)) { + return undefined; + } + const texts = payloads + .map((payload) => + payload && + typeof payload === "object" && + typeof (payload as { text?: unknown }).text === "string" + ? (payload as { text: string }).text + : "", + ) + .filter((text) => text.trim().length > 0); + return texts.length > 0 ? texts.join("\n\n") : undefined; +} + export async function runAgentStep(params: { sessionKey: string; message: string; @@ -25,6 +48,7 @@ export async function runAgentStep(params: { timeoutMs: number; channel?: string; lane?: string; + transcriptMessage?: string; sourceSessionKey?: string; sourceChannel?: string; sourceTool?: string; @@ -36,15 +60,38 @@ export async function runAgentStep(params: { sourceChannel: params.sourceChannel, sourceTool: params.sourceTool ?? "sessions_send", }; + const message = annotateInterSessionPromptText(params.message, inputProvenance); + const lane = params.lane ?? resolveNestedAgentLaneForSession(params.sessionKey); + const channel = params.channel ?? INTERNAL_MESSAGE_CHANNEL; + if (params.transcriptMessage !== undefined) { + const result = await agentStepDeps.agentCommandFromIngress({ + message, + transcriptMessage: params.transcriptMessage, + sessionKey: params.sessionKey, + deliver: false, + channel, + lane, + runId: stepIdem, + extraSystemPrompt: params.extraSystemPrompt, + inputProvenance, + senderIsOwner: false, + allowModelOverride: false, + }); + await retireSessionMcpRuntimeForSessionKey({ + sessionKey: params.sessionKey, + reason: "nested-agent-step-complete", + }); + return extractAgentCommandReply(result); + } const response = await agentStepDeps.callGateway({ method: "agent", params: { - message: annotateInterSessionPromptText(params.message, inputProvenance), + message, sessionKey: params.sessionKey, idempotencyKey: stepIdem, deliver: false, - channel: params.channel ?? INTERNAL_MESSAGE_CHANNEL, - lane: params.lane ?? resolveNestedAgentLaneForSession(params.sessionKey), + channel, + lane, extraSystemPrompt: params.extraSystemPrompt, inputProvenance, }, @@ -71,7 +118,12 @@ export async function runAgentStep(params: { } export const __testing = { - setDepsForTest(overrides?: Partial<{ callGateway: GatewayCaller }>) { + setDepsForTest( + overrides?: Partial<{ + agentCommandFromIngress: AgentCommandRunner; + callGateway: GatewayCaller; + }>, + ) { agentStepDeps = overrides ? { ...defaultAgentStepDeps, diff --git a/src/agents/tools/sessions-send-helpers.ts b/src/agents/tools/sessions-send-helpers.ts index 78a1d6f0ccc..1da897984fb 100644 --- a/src/agents/tools/sessions-send-helpers.ts +++ b/src/agents/tools/sessions-send-helpers.ts @@ -10,6 +10,7 @@ export { ANNOUNCE_SKIP_TOKEN, REPLY_SKIP_TOKEN, isAnnounceSkip, + isNonDeliverableSessionsReply, isReplySkip, } from "./sessions-send-tokens.js"; diff --git a/src/agents/tools/sessions-send-tokens.ts b/src/agents/tools/sessions-send-tokens.ts index 1c333bb8ede..d9c81e59ef6 100644 --- a/src/agents/tools/sessions-send-tokens.ts +++ b/src/agents/tools/sessions-send-tokens.ts @@ -1,6 +1,15 @@ +import { HEARTBEAT_TOKEN, isSilentReplyText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; + export const ANNOUNCE_SKIP_TOKEN = "ANNOUNCE_SKIP"; export const REPLY_SKIP_TOKEN = "REPLY_SKIP"; +const NON_DELIVERABLE_REPLY_TOKENS = [ + ANNOUNCE_SKIP_TOKEN, + REPLY_SKIP_TOKEN, + SILENT_REPLY_TOKEN, + HEARTBEAT_TOKEN, +] as const; + export function isAnnounceSkip(text?: string) { return (text ?? "").trim() === ANNOUNCE_SKIP_TOKEN; } @@ -8,3 +17,7 @@ export function isAnnounceSkip(text?: string) { export function isReplySkip(text?: string) { return (text ?? "").trim() === REPLY_SKIP_TOKEN; } + +export function isNonDeliverableSessionsReply(text?: string) { + return NON_DELIVERABLE_REPLY_TOKENS.some((token) => isSilentReplyText(text, token)); +} diff --git a/src/agents/tools/sessions-send-tool.a2a.test.ts b/src/agents/tools/sessions-send-tool.a2a.test.ts index 9d9a9d21048..a700f704766 100644 --- a/src/agents/tools/sessions-send-tool.a2a.test.ts +++ b/src/agents/tools/sessions-send-tool.a2a.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { CallGatewayOptions } from "../../gateway/call.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; import { createSessionConversationTestRegistry } from "../../test-utils/session-conversation-registry.js"; +import { runAgentStep } from "./agent-step.js"; import { runSessionsSendA2AFlow, __testing } from "./sessions-send-tool.a2a.js"; vi.mock("../run-wait.js", () => ({ @@ -19,6 +20,8 @@ describe("runSessionsSendA2AFlow announce delivery", () => { beforeEach(() => { setActivePluginRegistry(createSessionConversationTestRegistry()); gatewayCalls = []; + vi.clearAllMocks(); + vi.mocked(runAgentStep).mockResolvedValue("Test announce reply"); __testing.setDepsForTest({ callGateway: async >(opts: CallGatewayOptions) => { gatewayCalls.push(opts); @@ -66,4 +69,47 @@ describe("runSessionsSendA2AFlow announce delivery", () => { expect(sendParams.channel).toBe("discord"); expect(sendParams.threadId).toBeUndefined(); }); + + it.each(["NO_REPLY", "HEARTBEAT_OK", "ANNOUNCE_SKIP", "REPLY_SKIP"])( + "does not re-inject exact control reply %s into agent-to-agent flow", + async (roundOneReply) => { + await runSessionsSendA2AFlow({ + targetSessionKey: "agent:main:discord:group:dev", + displayKey: "agent:main:discord:group:dev", + message: "Test message", + announceTimeoutMs: 10_000, + maxPingPongTurns: 2, + requesterSessionKey: "agent:main:discord:group:req", + requesterChannel: "discord", + roundOneReply, + }); + + expect(runAgentStep).not.toHaveBeenCalled(); + expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined(); + }, + ); + + it.each(["NO_REPLY", "HEARTBEAT_OK"])( + "suppresses exact announce control reply %s before channel delivery", + async (announceReply) => { + vi.mocked(runAgentStep).mockResolvedValueOnce(announceReply); + + await runSessionsSendA2AFlow({ + targetSessionKey: "agent:main:discord:group:dev", + displayKey: "agent:main:discord:group:dev", + message: "Test message", + announceTimeoutMs: 10_000, + maxPingPongTurns: 0, + roundOneReply: "Worker completed successfully", + }); + + expect(runAgentStep).toHaveBeenCalledWith( + expect.objectContaining({ + message: "Agent-to-agent announce step.", + transcriptMessage: "", + }), + ); + expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined(); + }, + ); }); diff --git a/src/agents/tools/sessions-send-tool.a2a.ts b/src/agents/tools/sessions-send-tool.a2a.ts index 62e82134e30..90a2700762e 100644 --- a/src/agents/tools/sessions-send-tool.a2a.ts +++ b/src/agents/tools/sessions-send-tool.a2a.ts @@ -11,6 +11,7 @@ import { buildAgentToAgentAnnounceContext, buildAgentToAgentReplyContext, isAnnounceSkip, + isNonDeliverableSessionsReply, isReplySkip, } from "./sessions-send-helpers.js"; @@ -60,6 +61,9 @@ export async function runSessionsSendA2AFlow(params: { if (!latestReply) { return; } + if (isNonDeliverableSessionsReply(latestReply)) { + return; + } const announceTarget = await resolveAnnounceTarget({ sessionKey: params.targetSessionKey, @@ -98,7 +102,7 @@ export async function runSessionsSendA2AFlow(params: { nextSessionKey === params.requesterSessionKey ? params.requesterChannel : targetChannel, sourceTool: "sessions_send", }); - if (!replyText || isReplySkip(replyText)) { + if (!replyText || isReplySkip(replyText) || isNonDeliverableSessionsReply(replyText)) { break; } latestReply = replyText; @@ -124,11 +128,18 @@ export async function runSessionsSendA2AFlow(params: { extraSystemPrompt: announcePrompt, timeoutMs: params.announceTimeoutMs, lane: resolveNestedAgentLaneForSession(params.targetSessionKey), + transcriptMessage: "", sourceSessionKey: params.requesterSessionKey, sourceChannel: params.requesterChannel, sourceTool: "sessions_send", }); - if (announceTarget && announceReply && announceReply.trim() && !isAnnounceSkip(announceReply)) { + if ( + announceTarget && + announceReply && + announceReply.trim() && + !isAnnounceSkip(announceReply) && + !isNonDeliverableSessionsReply(announceReply) + ) { try { await sessionsSendA2ADeps.callGateway({ method: "send", diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 5ba31732b26..3e9ee2f5f53 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -1008,6 +1008,36 @@ describe("gateway agent handler", () => { resetTimeConfig(); }); + it("rejects public transcriptMessage overrides", async () => { + primeMainAgentRun({ cfg: mocks.loadConfigReturn }); + mocks.agentCommand.mockClear(); + + const respond = await invokeAgent( + { + message: "runtime-only announce bookkeeping", + transcriptMessage: "", + agentId: "main", + sessionKey: "agent:main:main", + inputProvenance: { + kind: "inter_session", + sourceSessionKey: "agent:main:discord:source", + sourceTool: "sessions_send", + }, + idempotencyKey: "test-transcript-message", + } as AgentParams, + { reqId: "transcript-message", flushDispatch: false }, + ); + + expect(respond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + message: expect.stringContaining("invalid agent params"), + }), + ); + expect(mocks.agentCommand).not.toHaveBeenCalled(); + }); + it("keeps model-run gateway prompts undecorated and forwards raw-run flags", async () => { setupNewYorkTimeConfig("2026-01-29T01:30:00.000Z"); primeMainAgentRun({ cfg: mocks.loadConfigReturn });