fix(sessions): suppress a2a control echoes

This commit is contained in:
Peter Steinberger
2026-05-02 07:20:47 +01:00
parent fc4da581b3
commit 817e6e810b
9 changed files with 224 additions and 26 deletions

View File

@@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai
- Agents/compaction: submit a non-empty runtime-event marker for pre-compaction memory flush turns, so strict Anthropic providers no longer reject the silent flush as an empty user message. Fixes #75305. Thanks @sableassistant3777-source.
- Plugin SDK: re-export `isPrivateIpAddress` from `plugin-sdk/ssrf-runtime`, restoring source-checkout builds for SearXNG and Firecrawl private-network guards. Thanks @vincentkoc.
- Discord/message actions: advertise `upload-file` and route it through Discord's send runtime with agent-scoped media reads, so agents can discover and send file attachments. Fixes #60652 and supersedes #60808, #61087, and #61100. Thanks @claw-io, @efe-arv, @joelnishanth, and @sjhddh.
- Sessions: suppress exact inter-session control replies such as `NO_REPLY` and keep agent-to-agent announce bookkeeping out of visible transcripts. Fixes #53145. Thanks @TarahAssistant.
- CLI/directory: report unsupported directory operations for installed channel plugins instead of prompting to reinstall the plugin when it lacks a directory adapter. Fixes #75770. Thanks @lawong888.
- Web search/SearXNG: show the JSON API `search.formats` prerequisite during SearXNG setup before prompting for the base URL. Supersedes #65592. Thanks @evanpaul14.
- Web search/SearXNG: pass through `img_src` image URLs from SearXNG image-category results. Supersedes #61416. Thanks @sghael.

View File

@@ -158,6 +158,10 @@ describe("sessions tools", () => {
callGatewayMock.mockClear();
installMessagingTestRegistry();
agentStepTesting.setDepsForTest({
agentCommandFromIngress: async () => ({
payloads: [{ text: "ANNOUNCE_SKIP", mediaUrl: null }],
meta: { durationMs: 1 },
}),
callGateway: (opts: unknown) => callGatewayMock(opts),
});
sessionsResolutionTesting.setDepsForTest({
@@ -860,9 +864,9 @@ describe("sessions tools", () => {
runId: "run-1",
delivery: { status: "pending", mode: "announce" },
});
await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 4);
await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 4);
await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 4);
await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 3);
await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 3);
await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 3);
const waitPromise = tool.execute("call6", {
sessionKey: "main",
@@ -876,14 +880,14 @@ describe("sessions tools", () => {
delivery: { status: "pending", mode: "announce" },
});
expect(typeof (waited.details as { runId?: string }).runId).toBe("string");
await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 8);
await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 8);
await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 8);
await waitForCalls(() => calls.filter((call) => call.method === "agent").length, 6);
await waitForCalls(() => calls.filter((call) => call.method === "agent.wait").length, 6);
await waitForCalls(() => calls.filter((call) => call.method === "chat.history").length, 7);
const agentCalls = calls.filter((call) => call.method === "agent");
const waitCalls = calls.filter((call) => call.method === "agent.wait");
const historyOnlyCalls = calls.filter((call) => call.method === "chat.history");
expect(agentCalls).toHaveLength(8);
expect(agentCalls).toHaveLength(6);
for (const call of agentCalls) {
expect(call.params).toMatchObject({
message: expect.stringContaining("[Inter-session message"),
@@ -911,17 +915,8 @@ describe("sessions tools", () => {
),
),
).toBe(true);
expect(
agentCalls.some(
(call) =>
typeof (call.params as { extraSystemPrompt?: string })?.extraSystemPrompt === "string" &&
(call.params as { extraSystemPrompt?: string })?.extraSystemPrompt?.includes(
"Agent-to-agent announce step",
),
),
).toBe(true);
expect(waitCalls).toHaveLength(8);
expect(historyOnlyCalls).toHaveLength(9);
expect(waitCalls).toHaveLength(6);
expect(historyOnlyCalls).toHaveLength(7);
expect(sendCallCount).toBe(0);
});
@@ -1038,6 +1033,13 @@ describe("sessions tools", () => {
}
return {};
});
agentStepTesting.setDepsForTest({
agentCommandFromIngress: async () => ({
payloads: [{ text: "announce now", mediaUrl: null }],
meta: { durationMs: 1 },
}),
callGateway: (opts: unknown) => callGatewayMock(opts),
});
const tool = createOpenClawTools({
agentSessionKey: requesterKey,
@@ -1059,13 +1061,13 @@ describe("sessions tools", () => {
});
await vi.waitFor(
() => {
expect(calls.filter((call) => call.method === "agent")).toHaveLength(4);
expect(calls.filter((call) => call.method === "agent")).toHaveLength(3);
},
{ timeout: 2_000, interval: 5 },
);
const agentCalls = calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(4);
expect(agentCalls).toHaveLength(3);
for (const call of agentCalls) {
expect(call.params).toMatchObject({
lane: expect.stringMatching(/^nested(?::|$)/),
@@ -1184,6 +1186,13 @@ describe("sessions tools", () => {
}
return {};
});
agentStepTesting.setDepsForTest({
agentCommandFromIngress: async () => ({
payloads: [{ text: "announce now", mediaUrl: null }],
meta: { durationMs: 1 },
}),
callGateway: (opts: unknown) => callGatewayMock(opts),
});
const tool = createOpenClawTools({
agentSessionKey: requesterKey,

View File

@@ -84,4 +84,39 @@ describe("runAgentStep", () => {
expect(bundleMcpRuntimeMocks.retireSessionMcpRuntimeForSessionKey).not.toHaveBeenCalled();
});
it("forwards explicit transcript bodies for nested bookkeeping turns", async () => {
const gatewayCalls: CallGatewayOptions[] = [];
const agentCommandFromIngress = vi.fn(async () => ({
payloads: [{ text: "done", mediaUrl: null }],
meta: { durationMs: 1 },
}));
__testing.setDepsForTest({
agentCommandFromIngress,
callGateway: async <T = unknown>(opts: CallGatewayOptions): Promise<T> => {
gatewayCalls.push(opts);
return { runId: "run-nested" } as T;
},
});
runWaitMocks.waitForAgentRunAndReadUpdatedAssistantReply.mockResolvedValue({
status: "ok",
replyText: "done",
});
await runAgentStep({
sessionKey: "agent:main:subagent:child",
message: "internal announce step",
transcriptMessage: "",
extraSystemPrompt: "announce only",
timeoutMs: 10_000,
});
expect(gatewayCalls).toEqual([]);
expect(agentCommandFromIngress).toHaveBeenCalledWith(
expect.objectContaining({
message: expect.stringContaining("internal announce step"),
transcriptMessage: "",
}),
);
});
});

View File

@@ -9,15 +9,38 @@ import { waitForAgentRunAndReadUpdatedAssistantReply } from "../run-wait.js";
export { readLatestAssistantReply } from "../run-wait.js";
type GatewayCaller = typeof callGateway;
type AgentCommandRunner = typeof import("../../commands/agent.js").agentCommandFromIngress;
const defaultAgentStepDeps = {
agentCommandFromIngress: (async (...args) => {
const { agentCommandFromIngress } = await import("../../commands/agent.js");
return await agentCommandFromIngress(...args);
}) as AgentCommandRunner,
callGateway,
};
let agentStepDeps: {
agentCommandFromIngress: AgentCommandRunner;
callGateway: GatewayCaller;
} = defaultAgentStepDeps;
function extractAgentCommandReply(result: unknown): string | undefined {
const payloads = (result as { payloads?: unknown } | undefined)?.payloads;
if (!Array.isArray(payloads)) {
return undefined;
}
const texts = payloads
.map((payload) =>
payload &&
typeof payload === "object" &&
typeof (payload as { text?: unknown }).text === "string"
? (payload as { text: string }).text
: "",
)
.filter((text) => text.trim().length > 0);
return texts.length > 0 ? texts.join("\n\n") : undefined;
}
export async function runAgentStep(params: {
sessionKey: string;
message: string;
@@ -25,6 +48,7 @@ export async function runAgentStep(params: {
timeoutMs: number;
channel?: string;
lane?: string;
transcriptMessage?: string;
sourceSessionKey?: string;
sourceChannel?: string;
sourceTool?: string;
@@ -36,15 +60,38 @@ export async function runAgentStep(params: {
sourceChannel: params.sourceChannel,
sourceTool: params.sourceTool ?? "sessions_send",
};
const message = annotateInterSessionPromptText(params.message, inputProvenance);
const lane = params.lane ?? resolveNestedAgentLaneForSession(params.sessionKey);
const channel = params.channel ?? INTERNAL_MESSAGE_CHANNEL;
if (params.transcriptMessage !== undefined) {
const result = await agentStepDeps.agentCommandFromIngress({
message,
transcriptMessage: params.transcriptMessage,
sessionKey: params.sessionKey,
deliver: false,
channel,
lane,
runId: stepIdem,
extraSystemPrompt: params.extraSystemPrompt,
inputProvenance,
senderIsOwner: false,
allowModelOverride: false,
});
await retireSessionMcpRuntimeForSessionKey({
sessionKey: params.sessionKey,
reason: "nested-agent-step-complete",
});
return extractAgentCommandReply(result);
}
const response = await agentStepDeps.callGateway({
method: "agent",
params: {
message: annotateInterSessionPromptText(params.message, inputProvenance),
message,
sessionKey: params.sessionKey,
idempotencyKey: stepIdem,
deliver: false,
channel: params.channel ?? INTERNAL_MESSAGE_CHANNEL,
lane: params.lane ?? resolveNestedAgentLaneForSession(params.sessionKey),
channel,
lane,
extraSystemPrompt: params.extraSystemPrompt,
inputProvenance,
},
@@ -71,7 +118,12 @@ export async function runAgentStep(params: {
}
export const __testing = {
setDepsForTest(overrides?: Partial<{ callGateway: GatewayCaller }>) {
setDepsForTest(
overrides?: Partial<{
agentCommandFromIngress: AgentCommandRunner;
callGateway: GatewayCaller;
}>,
) {
agentStepDeps = overrides
? {
...defaultAgentStepDeps,

View File

@@ -10,6 +10,7 @@ export {
ANNOUNCE_SKIP_TOKEN,
REPLY_SKIP_TOKEN,
isAnnounceSkip,
isNonDeliverableSessionsReply,
isReplySkip,
} from "./sessions-send-tokens.js";

View File

@@ -1,6 +1,15 @@
import { HEARTBEAT_TOKEN, isSilentReplyText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
export const ANNOUNCE_SKIP_TOKEN = "ANNOUNCE_SKIP";
export const REPLY_SKIP_TOKEN = "REPLY_SKIP";
const NON_DELIVERABLE_REPLY_TOKENS = [
ANNOUNCE_SKIP_TOKEN,
REPLY_SKIP_TOKEN,
SILENT_REPLY_TOKEN,
HEARTBEAT_TOKEN,
] as const;
export function isAnnounceSkip(text?: string) {
return (text ?? "").trim() === ANNOUNCE_SKIP_TOKEN;
}
@@ -8,3 +17,7 @@ export function isAnnounceSkip(text?: string) {
export function isReplySkip(text?: string) {
return (text ?? "").trim() === REPLY_SKIP_TOKEN;
}
export function isNonDeliverableSessionsReply(text?: string) {
return NON_DELIVERABLE_REPLY_TOKENS.some((token) => isSilentReplyText(text, token));
}

View File

@@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CallGatewayOptions } from "../../gateway/call.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { createSessionConversationTestRegistry } from "../../test-utils/session-conversation-registry.js";
import { runAgentStep } from "./agent-step.js";
import { runSessionsSendA2AFlow, __testing } from "./sessions-send-tool.a2a.js";
vi.mock("../run-wait.js", () => ({
@@ -19,6 +20,8 @@ describe("runSessionsSendA2AFlow announce delivery", () => {
beforeEach(() => {
setActivePluginRegistry(createSessionConversationTestRegistry());
gatewayCalls = [];
vi.clearAllMocks();
vi.mocked(runAgentStep).mockResolvedValue("Test announce reply");
__testing.setDepsForTest({
callGateway: async <T = Record<string, unknown>>(opts: CallGatewayOptions) => {
gatewayCalls.push(opts);
@@ -66,4 +69,47 @@ describe("runSessionsSendA2AFlow announce delivery", () => {
expect(sendParams.channel).toBe("discord");
expect(sendParams.threadId).toBeUndefined();
});
it.each(["NO_REPLY", "HEARTBEAT_OK", "ANNOUNCE_SKIP", "REPLY_SKIP"])(
"does not re-inject exact control reply %s into agent-to-agent flow",
async (roundOneReply) => {
await runSessionsSendA2AFlow({
targetSessionKey: "agent:main:discord:group:dev",
displayKey: "agent:main:discord:group:dev",
message: "Test message",
announceTimeoutMs: 10_000,
maxPingPongTurns: 2,
requesterSessionKey: "agent:main:discord:group:req",
requesterChannel: "discord",
roundOneReply,
});
expect(runAgentStep).not.toHaveBeenCalled();
expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined();
},
);
it.each(["NO_REPLY", "HEARTBEAT_OK"])(
"suppresses exact announce control reply %s before channel delivery",
async (announceReply) => {
vi.mocked(runAgentStep).mockResolvedValueOnce(announceReply);
await runSessionsSendA2AFlow({
targetSessionKey: "agent:main:discord:group:dev",
displayKey: "agent:main:discord:group:dev",
message: "Test message",
announceTimeoutMs: 10_000,
maxPingPongTurns: 0,
roundOneReply: "Worker completed successfully",
});
expect(runAgentStep).toHaveBeenCalledWith(
expect.objectContaining({
message: "Agent-to-agent announce step.",
transcriptMessage: "",
}),
);
expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined();
},
);
});

View File

@@ -11,6 +11,7 @@ import {
buildAgentToAgentAnnounceContext,
buildAgentToAgentReplyContext,
isAnnounceSkip,
isNonDeliverableSessionsReply,
isReplySkip,
} from "./sessions-send-helpers.js";
@@ -60,6 +61,9 @@ export async function runSessionsSendA2AFlow(params: {
if (!latestReply) {
return;
}
if (isNonDeliverableSessionsReply(latestReply)) {
return;
}
const announceTarget = await resolveAnnounceTarget({
sessionKey: params.targetSessionKey,
@@ -98,7 +102,7 @@ export async function runSessionsSendA2AFlow(params: {
nextSessionKey === params.requesterSessionKey ? params.requesterChannel : targetChannel,
sourceTool: "sessions_send",
});
if (!replyText || isReplySkip(replyText)) {
if (!replyText || isReplySkip(replyText) || isNonDeliverableSessionsReply(replyText)) {
break;
}
latestReply = replyText;
@@ -124,11 +128,18 @@ export async function runSessionsSendA2AFlow(params: {
extraSystemPrompt: announcePrompt,
timeoutMs: params.announceTimeoutMs,
lane: resolveNestedAgentLaneForSession(params.targetSessionKey),
transcriptMessage: "",
sourceSessionKey: params.requesterSessionKey,
sourceChannel: params.requesterChannel,
sourceTool: "sessions_send",
});
if (announceTarget && announceReply && announceReply.trim() && !isAnnounceSkip(announceReply)) {
if (
announceTarget &&
announceReply &&
announceReply.trim() &&
!isAnnounceSkip(announceReply) &&
!isNonDeliverableSessionsReply(announceReply)
) {
try {
await sessionsSendA2ADeps.callGateway({
method: "send",

View File

@@ -1008,6 +1008,36 @@ describe("gateway agent handler", () => {
resetTimeConfig();
});
it("rejects public transcriptMessage overrides", async () => {
primeMainAgentRun({ cfg: mocks.loadConfigReturn });
mocks.agentCommand.mockClear();
const respond = await invokeAgent(
{
message: "runtime-only announce bookkeeping",
transcriptMessage: "",
agentId: "main",
sessionKey: "agent:main:main",
inputProvenance: {
kind: "inter_session",
sourceSessionKey: "agent:main:discord:source",
sourceTool: "sessions_send",
},
idempotencyKey: "test-transcript-message",
} as AgentParams,
{ reqId: "transcript-message", flushDispatch: false },
);
expect(respond).toHaveBeenCalledWith(
false,
undefined,
expect.objectContaining({
message: expect.stringContaining("invalid agent params"),
}),
);
expect(mocks.agentCommand).not.toHaveBeenCalled();
});
it("keeps model-run gateway prompts undecorated and forwards raw-run flags", async () => {
setupNewYorkTimeConfig("2026-01-29T01:30:00.000Z");
primeMainAgentRun({ cfg: mocks.loadConfigReturn });