fix: handle sessions_send active fallback failures (#86638)

Fix run-scoped sessions_send active-run fallback handling.

- surface active queue rejection plus durable fallback admission failures instead of returning accepted too early
- return fallback run/session metadata so normal A2A announcement waits on the fallback run
- retry active steering without transcript-commit waiting when the active runtime does not support it

Thanks @TurboTheTurtle.

Verification:
- node scripts/run-vitest.mjs src/agents/openclaw-tools.sessions.test.ts
- pnpm check:test-types
- git diff --check
- .agents/skills/autoreview/scripts/autoreview --mode branch --base origin/main
This commit is contained in:
Andy Ye
2026-05-26 22:30:56 -07:00
committed by GitHub
parent ae972fe1fe
commit 81e7e8ef24
2 changed files with 299 additions and 5 deletions

View File

@@ -33,6 +33,7 @@ vi.mock("../config/config.js", () => ({
import "./test-helpers/fast-openclaw-tools-sessions.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { testing as embeddedRunsTesting, setActiveEmbeddedRun } from "./pi-embedded-runner/runs.js";
import { testing as agentStepTesting } from "./tools/agent-step.js";
import { createSessionsHistoryTool } from "./tools/sessions-history-tool.js";
import { createSessionsListTool } from "./tools/sessions-list-tool.js";
@@ -224,6 +225,7 @@ function sessionsSendDetails(details: unknown): SessionsSendDetails {
describe("sessions tools", () => {
beforeEach(() => {
callGatewayMock.mockClear();
embeddedRunsTesting.resetActiveEmbeddedRuns();
loadSessionEntryByKeyMock.mockReset();
loadSessionEntryByKeyMock.mockReturnValue(undefined);
installMessagingTestRegistry();
@@ -1296,6 +1298,211 @@ describe("sessions tools", () => {
expect(calls.find((call) => call.method === "send")).toBeUndefined();
});
it("sessions_send reroutes run-scoped active deliveries when transcript steering is rejected", async () => {
const calls: Array<{ method?: string; params?: unknown }> = [];
const requesterKey = "agent:re-portal:main";
const runScopedCallerKey = "agent:leasing-ops:cron:monthly-utility:run:run-fast";
const durableCallerKey = "agent:leasing-ops:cron:monthly-utility";
const queueMessage = vi.fn(async (_text: string, _options?: unknown) => {
throw new Error("active session ended before queued steering message was committed");
});
setActiveEmbeddedRun(
"caller-active-session",
{
queueMessage,
isStreaming: () => true,
isCompacting: () => false,
supportsTranscriptCommitWait: true,
abort: () => {},
},
runScopedCallerKey,
);
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
calls.push(request);
if (request.method === "agent") {
return { runId: "fallback-run", status: "accepted", acceptedAt: 2000 };
}
if (request.method === "agent.wait") {
const params = request.params as { runId?: string } | undefined;
return { runId: params?.runId ?? "fallback-run", status: "ok" };
}
if (request.method === "chat.history") {
return { messages: [] };
}
return {};
});
const tool = createOpenClawTools({
agentSessionKey: requesterKey,
agentChannel: "telegram",
config: {
...TEST_CONFIG,
session: {
...TEST_CONFIG.session,
agentToAgent: { maxPingPongTurns: 0 },
},
},
}).find((candidate) => candidate.name === "sessions_send");
if (!tool) {
throw new Error("missing sessions_send tool");
}
const result = await tool.execute("call-run-scoped-caller", {
sessionKey: runScopedCallerKey,
message: "[TASK-COMPLETE] re-portal occupancy ready",
timeoutSeconds: 0,
});
const details = sessionsSendDetails(result.details);
expect(details.status).toBe("accepted");
expect(details.sessionKey).toBe(runScopedCallerKey);
expect(details.delivery?.status).toBe("pending");
const queuedText = queueMessage.mock.calls[0]?.[0];
expect(queuedText).toContain("[Inter-session message]");
expect(queuedText).toContain("[TASK-COMPLETE] re-portal occupancy ready");
expect(queueMessage).toHaveBeenCalledWith(queuedText, {
steeringMode: "all",
debounceMs: 0,
deliveryTimeoutMs: 30_000,
waitForTranscriptCommit: true,
});
await vi.waitFor(() => {
const fallbackCall = calls.find(
(call) =>
call.method === "agent" &&
(call.params as { sessionKey?: string } | undefined)?.sessionKey === durableCallerKey,
);
expect(fallbackCall).toBeDefined();
});
const agentCalls = calls.filter((call) => call.method === "agent");
expect(
agentCalls.some(
(call) =>
(call.params as { sessionKey?: string } | undefined)?.sessionKey === runScopedCallerKey,
),
).toBe(false);
const fallbackParams = agentCalls.find(
(call) =>
(call.params as { sessionKey?: string } | undefined)?.sessionKey === durableCallerKey,
)?.params as { inputProvenance?: { sourceSessionKey?: string }; message?: string } | undefined;
expect(fallbackParams?.message).toContain("[Inter-session message]");
expect(fallbackParams?.message).toContain("[TASK-COMPLETE] re-portal occupancy ready");
expect(fallbackParams?.inputProvenance?.sourceSessionKey).toBe(requesterKey);
await vi.waitFor(() => {
const waitCall = calls.find(
(call) =>
call.method === "agent.wait" &&
(call.params as { runId?: string } | undefined)?.runId === "fallback-run",
);
expect(waitCall).toBeDefined();
});
await vi.waitFor(() => {
const historyCall = calls.find(
(call) =>
call.method === "chat.history" &&
(call.params as { sessionKey?: string } | undefined)?.sessionKey === durableCallerKey,
);
expect(historyCall).toBeDefined();
});
});
it("sessions_send preserves active delivery when transcript commit wait is unsupported", async () => {
const calls: Array<{ method?: string }> = [];
const runScopedCallerKey = "agent:leasing-ops:cron:monthly-utility:run:run-fast";
const queueMessage = vi.fn(async () => {});
setActiveEmbeddedRun(
"caller-active-session",
{
queueMessage,
isStreaming: () => true,
isCompacting: () => false,
abort: () => {},
},
runScopedCallerKey,
);
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string };
calls.push(request);
if (request.method === "agent") {
throw new Error("fallback agent should not start");
}
return {};
});
const tool = createOpenClawTools({
agentSessionKey: "agent:re-portal:main",
agentChannel: "telegram",
}).find((candidate) => candidate.name === "sessions_send");
if (!tool) {
throw new Error("missing sessions_send tool");
}
const result = await tool.execute("call-run-scoped-caller", {
sessionKey: runScopedCallerKey,
message: "[TASK-COMPLETE] re-portal occupancy ready",
timeoutSeconds: 0,
});
const details = sessionsSendDetails(result.details);
expect(details.status).toBe("accepted");
expect(details.sessionKey).toBe(runScopedCallerKey);
expect(queueMessage).toHaveBeenCalledOnce();
expect(queueMessage).toHaveBeenCalledWith(expect.stringContaining("[Inter-session message]"), {
steeringMode: "all",
debounceMs: 0,
deliveryTimeoutMs: 30_000,
});
expect(calls.some((call) => call.method === "agent")).toBe(false);
});
it("sessions_send reports run-scoped fallback admission failures", async () => {
const runScopedCallerKey = "agent:leasing-ops:cron:monthly-utility:run:run-fast";
const queueMessage = vi.fn(async () => {
throw new Error("active session ended before queued steering message was committed");
});
setActiveEmbeddedRun(
"caller-active-session",
{
queueMessage,
isStreaming: () => true,
isCompacting: () => false,
supportsTranscriptCommitWait: true,
abort: () => {},
},
runScopedCallerKey,
);
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
if (request.method === "agent") {
throw new Error("gateway request timeout for agent");
}
return {};
});
const tool = createOpenClawTools({
agentSessionKey: "agent:re-portal:main",
agentChannel: "telegram",
}).find((candidate) => candidate.name === "sessions_send");
if (!tool) {
throw new Error("missing sessions_send tool");
}
const result = await tool.execute("call-run-scoped-caller", {
sessionKey: runScopedCallerKey,
message: "[TASK-COMPLETE] re-portal occupancy ready",
timeoutSeconds: 0,
});
const details = sessionsSendDetails(result.details);
expect(details.status).toBe("error");
expect(details.sessionKey).toBe(runScopedCallerKey);
expect(details.error).toContain("queue_message_failed reason=runtime_rejected");
expect(details.error).toContain("fallback_failed error=gateway request timeout for agent");
});
it("sessions_send preserves terminal timeouts without starting A2A", async () => {
const calls: Array<{ method?: string; params?: unknown }> = [];
const requesterKey = "agent:main:main";

View File

@@ -21,6 +21,12 @@ import {
} from "../../utils/message-channel.js";
import { listAgentIds } from "../agent-scope.js";
import { resolveNestedAgentLaneForSession } from "../lanes.js";
import {
type EmbeddedPiQueueMessageOptions,
formatEmbeddedPiQueueFailureSummary,
queueEmbeddedPiMessageWithOutcomeAsync,
resolveActiveEmbeddedRunSessionId,
} from "../pi-embedded-runner/runs.js";
import {
type AgentWaitResult,
readLatestAssistantReplySnapshot,
@@ -55,6 +61,11 @@ const SessionsSendToolSchema = Type.Object({
type GatewayCaller = typeof callGateway;
const SESSIONS_SEND_REPLY_HISTORY_LIMIT = 50;
function resolveRunScopedFallbackSessionKey(sessionKey: string): string | undefined {
const match = /^(agent:[^:]+:.+):run:[^:]+$/.exec(sessionKey.trim());
return match?.[1];
}
function resolveConfiguredAgentMainSessionKey(params: {
cfg: OpenClawConfig;
agentId: string;
@@ -155,8 +166,74 @@ async function startAgentRun(params: {
runId: string;
sendParams: Record<string, unknown>;
sessionKey: string;
}): Promise<{ ok: true; runId: string } | { ok: false; result: ReturnType<typeof jsonResult> }> {
deliveryTimeoutMs?: number;
allowActiveRunQueueFallback?: boolean;
}): Promise<
| {
ok: true;
runId: string;
activeRunQueue?: boolean;
a2aSessionKey?: string;
a2aDisplayKey?: string;
}
| { ok: false; result: ReturnType<typeof jsonResult> }
> {
try {
const activeRunSessionId = params.allowActiveRunQueueFallback
? resolveActiveEmbeddedRunSessionId(params.sessionKey)
: undefined;
const fallbackSessionKey = activeRunSessionId
? resolveRunScopedFallbackSessionKey(params.sessionKey)
: undefined;
const messageText =
typeof params.sendParams.message === "string" ? params.sendParams.message : undefined;
if (activeRunSessionId && fallbackSessionKey && messageText) {
const queueOptions: EmbeddedPiQueueMessageOptions = {
steeringMode: "all",
debounceMs: 0,
deliveryTimeoutMs: params.deliveryTimeoutMs,
waitForTranscriptCommit: true,
};
let queueOutcome = await queueEmbeddedPiMessageWithOutcomeAsync(
activeRunSessionId,
messageText,
queueOptions,
);
if (!queueOutcome.queued && queueOutcome.reason === "transcript_commit_wait_unsupported") {
const bestEffortQueueOptions = { ...queueOptions };
delete bestEffortQueueOptions.waitForTranscriptCommit;
queueOutcome = await queueEmbeddedPiMessageWithOutcomeAsync(
activeRunSessionId,
messageText,
bestEffortQueueOptions,
);
}
if (queueOutcome.queued) {
return { ok: true, runId: params.runId, activeRunQueue: true };
}
try {
const response = await params.callGateway<{ runId: string }>({
method: "agent",
params: {
...params.sendParams,
sessionKey: fallbackSessionKey,
idempotencyKey: crypto.randomUUID(),
},
timeoutMs: 10_000,
});
return {
ok: true,
runId:
typeof response?.runId === "string" && response.runId ? response.runId : params.runId,
a2aSessionKey: fallbackSessionKey,
a2aDisplayKey: fallbackSessionKey,
};
} catch (err) {
const queueSummary =
formatEmbeddedPiQueueFailureSummary(queueOutcome) ?? "active run queue rejected";
throw new Error(`${queueSummary}; fallback_failed error=${formatErrorMessage(err)}`);
}
}
const response = await params.callGateway<{ runId: string }>({
method: "agent",
params: params.sendParams,
@@ -473,13 +550,18 @@ export function createSessionsSendTool(opts?: {
? ({ status: "skipped", mode: "announce" } as const)
: ({ status: "pending", mode: "announce" } as const);
const startA2AFlow = (roundOneReply?: string, waitRunId?: string) => {
const startA2AFlow = (
roundOneReply?: string,
waitRunId?: string,
flowTargetSessionKey = resolvedKey,
flowDisplayKey = displayKey,
) => {
if (skipA2AFlow) {
return;
}
void runSessionsSendA2AFlow({
targetSessionKey: resolvedKey,
displayKey,
targetSessionKey: flowTargetSessionKey,
displayKey: flowDisplayKey,
message,
announceTimeoutMs,
maxPingPongTurns,
@@ -497,12 +579,16 @@ export function createSessionsSendTool(opts?: {
runId,
sendParams,
sessionKey: displayKey,
deliveryTimeoutMs: announceTimeoutMs,
allowActiveRunQueueFallback: true,
});
if (!start.ok) {
return start.result;
}
runId = start.runId;
startA2AFlow(undefined, runId);
if (!start.activeRunQueue) {
startA2AFlow(undefined, runId, start.a2aSessionKey, start.a2aDisplayKey);
}
return jsonResult({
runId,
status: "accepted",
@@ -516,6 +602,7 @@ export function createSessionsSendTool(opts?: {
runId,
sendParams,
sessionKey: displayKey,
deliveryTimeoutMs: announceTimeoutMs,
});
if (!start.ok) {
return start.result;