fix(agents): keep delayed sessions_send replies alive (#76484)

This commit is contained in:
Vincent Koc
2026-05-02 23:08:10 -07:00
committed by GitHub
parent 207aa18c40
commit b0a6543838
5 changed files with 243 additions and 3 deletions

View File

@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Agents/sessions: keep delayed `sessions_send` A2A replies alive after soft wait-window timeouts, while preserving terminal run timeouts and avoiding stale target replies in requester sessions. Fixes #76443. Thanks @ryswork1993 and @vincentkoc.
- Channels/secrets: resolve SecretRef-backed channel credentials through external plugin secret contracts after the plugin split, covering runtime startup, target discovery, webhook auth, disabled-account enumeration, and late-bound web_search config. Fixes #76371. (#76449) Thanks @joshavant and @neeravmakwana.
- Docker/Gateway: pass Docker setup `.env` values into gateway and CLI containers and preserve exec SecretRef `passEnv` keys in managed service plans, so 1Password Connect-backed Discord tokens keep resolving after doctor or plugin repair. Thanks @vincentkoc.
- Control UI/WebChat: explain compaction boundaries in chat history and link directly to session checkpoint controls so pre-compaction turns no longer look silently lost after refresh. Fixes #76415. Thanks @BunsDev.

View File

@@ -1098,6 +1098,174 @@ describe("sessions tools", () => {
});
});
it("sessions_send keeps delayed requester replies alive after a wait timeout", async () => {
const calls: Array<{ method?: string; params?: unknown }> = [];
const requesterKey = "agent:main:main";
const targetKey = "agent:director1:main";
let targetWaitCount = 0;
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
calls.push(request);
if (request.method === "agent") {
const params = request.params as { sessionKey?: string } | undefined;
if (params?.sessionKey === targetKey) {
return { runId: "run-target", status: "accepted", acceptedAt: 2000 };
}
if (params?.sessionKey === requesterKey) {
return { runId: "run-requester", status: "accepted", acceptedAt: 2001 };
}
}
if (request.method === "agent.wait") {
const params = request.params as { runId?: string } | undefined;
if (params?.runId === "run-target") {
targetWaitCount += 1;
return targetWaitCount === 1
? { runId: "run-target", status: "timeout" }
: { runId: "run-target", status: "ok" };
}
if (params?.runId === "run-requester") {
return { runId: "run-requester", status: "ok" };
}
}
if (request.method === "chat.history") {
const params = request.params as { sessionKey?: string } | undefined;
if (params?.sessionKey === targetKey && targetWaitCount > 1) {
return {
messages: [
{
role: "assistant",
content: [{ type: "text", text: "late director reply" }],
timestamp: 20,
},
],
};
}
if (params?.sessionKey === requesterKey) {
return {
messages: [
{
role: "assistant",
content: [{ type: "text", text: "requester saw director" }],
timestamp: 21,
},
],
};
}
return { messages: [] };
}
return {};
});
const tool = createOpenClawTools({
agentSessionKey: requesterKey,
agentChannel: "discord",
config: {
...TEST_CONFIG,
session: {
...TEST_CONFIG.session,
agentToAgent: { maxPingPongTurns: 1 },
},
},
}).find((candidate) => candidate.name === "sessions_send");
expect(tool).toBeDefined();
if (!tool) {
throw new Error("missing sessions_send tool");
}
const result = await tool.execute("call-delayed", {
sessionKey: targetKey,
message: "ping",
timeoutSeconds: 1,
});
expect(result.details).toMatchObject({
status: "accepted",
sessionKey: targetKey,
delivery: { status: "pending", mode: "announce" },
});
await vi.waitFor(
() => {
const requesterReplyCall = calls.find(
(call) =>
call.method === "agent" &&
(call.params as { sessionKey?: string } | undefined)?.sessionKey === requesterKey,
);
expect(requesterReplyCall).toBeDefined();
},
{ timeout: 2_000, interval: 5 },
);
const requesterReplyCall = calls.find(
(call) =>
call.method === "agent" &&
(call.params as { sessionKey?: string } | undefined)?.sessionKey === requesterKey,
);
const replyParams = requesterReplyCall?.params as
| {
extraSystemPrompt?: string;
inputProvenance?: { sourceSessionKey?: string };
message?: string;
sessionKey?: string;
}
| undefined;
expect(replyParams).toMatchObject({
sessionKey: requesterKey,
inputProvenance: { sourceSessionKey: targetKey },
});
expect(replyParams?.message).toContain("late director reply");
expect(replyParams?.extraSystemPrompt).toContain("Agent-to-agent reply step");
expect(replyParams?.extraSystemPrompt).toContain("Current agent: Agent 1 (requester)");
expect(calls.find((call) => call.method === "send")).toBeUndefined();
});
it("sessions_send preserves terminal timeouts without starting A2A", async () => {
const calls: Array<{ method?: string; params?: unknown }> = [];
const requesterKey = "agent:main:main";
const targetKey = "agent:director1:main";
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
calls.push(request);
if (request.method === "agent") {
return { runId: "run-terminal", status: "accepted", acceptedAt: 2000 };
}
if (request.method === "agent.wait") {
return {
runId: "run-terminal",
status: "timeout",
endedAt: 3000,
stopReason: "timeout",
error: "agent run timed out",
};
}
if (request.method === "chat.history") {
return { messages: [] };
}
return {};
});
const tool = createOpenClawTools({
agentSessionKey: requesterKey,
agentChannel: "discord",
}).find((candidate) => candidate.name === "sessions_send");
expect(tool).toBeDefined();
if (!tool) {
throw new Error("missing sessions_send tool");
}
const result = await tool.execute("call-terminal", {
sessionKey: targetKey,
message: "ping",
timeoutSeconds: 1,
});
expect(result.details).toMatchObject({
status: "timeout",
error: "agent run timed out",
sessionKey: targetKey,
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(calls.filter((call) => call.method === "agent")).toHaveLength(1);
});
it("sessions_send skips duplicate A2A delivery for waited parent-owned native subagents", async () => {
const calls: Array<{ method?: string; params?: unknown }> = [];
const requesterKey = "agent:main:discord:direct:parent";

View File

@@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CallGatewayOptions } from "../../gateway/call.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { createSessionConversationTestRegistry } from "../../test-utils/session-conversation-registry.js";
import { readLatestAssistantReplySnapshot, waitForAgentRun } from "../run-wait.js";
import { runAgentStep } from "./agent-step.js";
import type { SessionListRow } from "./sessions-helpers.js";
import { runSessionsSendA2AFlow, __testing } from "./sessions-send-tool.a2a.js";
@@ -14,7 +15,10 @@ vi.mock("../../gateway/call.js", () => ({
vi.mock("../run-wait.js", () => ({
waitForAgentRun: vi.fn().mockResolvedValue({ status: "ok" }),
readLatestAssistantReply: vi.fn().mockResolvedValue("Test announce reply"),
readLatestAssistantReplySnapshot: vi.fn().mockResolvedValue({
text: "Test announce reply",
fingerprint: "test-announce-reply",
}),
}));
vi.mock("./agent-step.js", () => ({
@@ -40,6 +44,11 @@ describe("runSessionsSendA2AFlow announce delivery", () => {
callGatewayMock.mockImplementation(callGateway);
vi.clearAllMocks();
vi.mocked(runAgentStep).mockResolvedValue("Test announce reply");
vi.mocked(waitForAgentRun).mockResolvedValue({ status: "ok" });
vi.mocked(readLatestAssistantReplySnapshot).mockResolvedValue({
text: "Test announce reply",
fingerprint: "test-announce-reply",
});
__testing.setDepsForTest({
callGateway,
});
@@ -153,6 +162,41 @@ describe("runSessionsSendA2AFlow announce delivery", () => {
},
);
it("does not inject a delayed reply that matches the baseline", async () => {
vi.mocked(readLatestAssistantReplySnapshot).mockResolvedValueOnce({
text: "same reply",
fingerprint: "same-reply",
});
await runSessionsSendA2AFlow({
targetSessionKey: "agent:main:discord:group:dev",
displayKey: "agent:main:discord:group:dev",
message: "Test message",
announceTimeoutMs: 10_000,
maxPingPongTurns: 2,
requesterSessionKey: "agent:main:discord:group:req",
requesterChannel: "discord",
baseline: {
text: "same reply",
fingerprint: "same-reply",
},
waitRunId: "run-delayed",
});
expect(waitForAgentRun).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-delayed",
}),
);
expect(readLatestAssistantReplySnapshot).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:main:discord:group:dev",
}),
);
expect(runAgentStep).not.toHaveBeenCalled();
expect(gatewayCalls.find((call) => call.method === "send")).toBeUndefined();
});
it.each(["NO_REPLY", "HEARTBEAT_OK"])(
"suppresses exact announce control reply %s before channel delivery",
async (announceReply) => {

View File

@@ -4,7 +4,11 @@ import { formatErrorMessage } from "../../infra/errors.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
import { resolveNestedAgentLaneForSession } from "../lanes.js";
import { readLatestAssistantReply, waitForAgentRun } from "../run-wait.js";
import {
type AssistantReplySnapshot,
readLatestAssistantReplySnapshot,
waitForAgentRun,
} from "../run-wait.js";
import { runAgentStep } from "./agent-step.js";
import { resolveAnnounceTarget } from "./sessions-announce-target.js";
import {
@@ -38,6 +42,7 @@ export async function runSessionsSendA2AFlow(params: {
maxPingPongTurns: number;
requesterSessionKey?: string;
requesterChannel?: GatewayMessageChannel;
baseline?: AssistantReplySnapshot;
roundOneReply?: string;
waitRunId?: string;
}) {
@@ -52,9 +57,16 @@ export async function runSessionsSendA2AFlow(params: {
callGateway: sessionsSendA2ADeps.callGateway,
});
if (wait.status === "ok") {
primaryReply = await readLatestAssistantReply({
const latestSnapshot = await readLatestAssistantReplySnapshot({
sessionKey: params.targetSessionKey,
callGateway: sessionsSendA2ADeps.callGateway,
});
const baselineFingerprint = params.baseline?.fingerprint;
primaryReply =
latestSnapshot.text &&
(!baselineFingerprint || latestSnapshot.fingerprint !== baselineFingerprint)
? latestSnapshot.text
: undefined;
latestReply = primaryReply;
}
}

View File

@@ -20,6 +20,7 @@ import {
} from "../../utils/message-channel.js";
import { resolveNestedAgentLaneForSession } from "../lanes.js";
import {
type AgentWaitResult,
readLatestAssistantReplySnapshot,
waitForAgentRunAndReadUpdatedAssistantReply,
} from "../run-wait.js";
@@ -71,6 +72,10 @@ function isRequesterParentOfNativeSubagentSession(params: {
return requester === spawnedBy || requester === parentSessionKey;
}
function isTerminalAgentWaitTimeout(result: AgentWaitResult): boolean {
return result.endedAt !== undefined || Boolean(result.stopReason || result.livenessState);
}
async function startAgentRun(params: {
callGateway: GatewayCaller;
runId: string;
@@ -376,6 +381,7 @@ export function createSessionsSendTool(opts?: {
maxPingPongTurns,
requesterSessionKey,
requesterChannel,
baseline: baselineReply,
roundOneReply,
waitRunId,
});
@@ -421,6 +427,15 @@ export function createSessionsSendTool(opts?: {
});
if (result.status === "timeout") {
if (!isTerminalAgentWaitTimeout(result)) {
startA2AFlow(undefined, runId);
return jsonResult({
runId,
status: "accepted",
sessionKey: displayKey,
delivery,
});
}
return jsonResult({
runId,
status: "timeout",