mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 00:20:22 +00:00
fix(subagents): recheck timed-out announce waits (#53127)
Recheck timed-out subagent announce waits against the latest runtime snapshot before announcing timeout, and keep that recheck best-effort so transient gateway failures do not suppress the announcement. Co-authored-by: Val Alexander <68980965+BunsDev@users.noreply.github.com>
This commit is contained in:
@@ -383,6 +383,63 @@ describe("subagent announce formatting", () => {
|
||||
expect(msg).toContain("completed successfully");
|
||||
});
|
||||
|
||||
it("rechecks timed-out waits before announcing timeout when the run finishes immediately after", async () => {
|
||||
const waitStatuses = [
|
||||
{ status: "timeout", startedAt: 10, endedAt: 20 },
|
||||
{ status: "ok", startedAt: 10, endedAt: 30 },
|
||||
];
|
||||
callGatewaySpy.mockImplementation(async (req: unknown) => {
|
||||
const typed = req as { method?: string; params?: { sessionKey?: string } };
|
||||
if (typed.method === "agent") {
|
||||
return await agentSpy(typed);
|
||||
}
|
||||
if (typed.method === "send") {
|
||||
return await sendSpy(typed);
|
||||
}
|
||||
if (typed.method === "agent.wait") {
|
||||
return waitStatuses.shift() ?? { status: "ok", startedAt: 10, endedAt: 30 };
|
||||
}
|
||||
if (typed.method === "chat.history") {
|
||||
return {
|
||||
messages: [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "Worker executed successfully" }],
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
if (typed.method === "sessions.patch" || typed.method === "sessions.delete") {
|
||||
return {};
|
||||
}
|
||||
return {};
|
||||
});
|
||||
readLatestAssistantReplyMock.mockResolvedValue("Worker executed successfully");
|
||||
|
||||
await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
childRunId: "run-timeout-race",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "do thing",
|
||||
timeoutMs: 1000,
|
||||
cleanup: "keep",
|
||||
waitForCompletion: true,
|
||||
startedAt: 10,
|
||||
endedAt: 20,
|
||||
});
|
||||
|
||||
const call = agentSpy.mock.calls[0]?.[0] as {
|
||||
params?: {
|
||||
message?: string;
|
||||
internalEvents?: Array<{ status?: string; statusLabel?: string; result?: string }>;
|
||||
};
|
||||
};
|
||||
expect(call?.params?.internalEvents?.[0]?.status).toBe("ok");
|
||||
expect(call?.params?.internalEvents?.[0]?.statusLabel).toBe("completed successfully");
|
||||
expect(call?.params?.internalEvents?.[0]?.result).toContain("Worker executed successfully");
|
||||
});
|
||||
|
||||
it("uses child-run announce identity for direct idempotency", async () => {
|
||||
await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:worker",
|
||||
|
||||
@@ -81,6 +81,13 @@ type SubagentOutputSnapshot = {
|
||||
toolCallCount: number;
|
||||
};
|
||||
|
||||
type AgentWaitResult = {
|
||||
status?: string;
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): number {
|
||||
const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs;
|
||||
if (typeof configured !== "number" || !Number.isFinite(configured)) {
|
||||
@@ -418,6 +425,49 @@ async function readLatestSubagentOutputWithRetry(params: {
|
||||
return result;
|
||||
}
|
||||
|
||||
async function waitForSubagentRunOutcome(
|
||||
runId: string,
|
||||
timeoutMs: number,
|
||||
): Promise<AgentWaitResult> {
|
||||
const waitMs = Math.max(0, Math.floor(timeoutMs));
|
||||
return await callGateway<AgentWaitResult>({
|
||||
method: "agent.wait",
|
||||
params: {
|
||||
runId,
|
||||
timeoutMs: waitMs,
|
||||
},
|
||||
timeoutMs: waitMs + 2000,
|
||||
});
|
||||
}
|
||||
|
||||
function applySubagentWaitOutcome(params: {
|
||||
wait: AgentWaitResult | undefined;
|
||||
outcome: SubagentRunOutcome | undefined;
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
}) {
|
||||
const next = {
|
||||
outcome: params.outcome,
|
||||
startedAt: params.startedAt,
|
||||
endedAt: params.endedAt,
|
||||
};
|
||||
const waitError = typeof params.wait?.error === "string" ? params.wait.error : undefined;
|
||||
if (params.wait?.status === "timeout") {
|
||||
next.outcome = { status: "timeout" };
|
||||
} else if (params.wait?.status === "error") {
|
||||
next.outcome = { status: "error", error: waitError };
|
||||
} else if (params.wait?.status === "ok") {
|
||||
next.outcome = { status: "ok" };
|
||||
}
|
||||
if (typeof params.wait?.startedAt === "number" && !next.startedAt) {
|
||||
next.startedAt = params.wait.startedAt;
|
||||
}
|
||||
if (typeof params.wait?.endedAt === "number" && !next.endedAt) {
|
||||
next.endedAt = params.wait.endedAt;
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
export async function captureSubagentCompletionReply(
|
||||
sessionKey: string,
|
||||
): Promise<string | undefined> {
|
||||
@@ -1294,34 +1344,16 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
}
|
||||
|
||||
if (!reply && params.waitForCompletion !== false) {
|
||||
const waitMs = settleTimeoutMs;
|
||||
const wait = await callGateway<{
|
||||
status?: string;
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
error?: string;
|
||||
}>({
|
||||
method: "agent.wait",
|
||||
params: {
|
||||
runId: params.childRunId,
|
||||
timeoutMs: waitMs,
|
||||
},
|
||||
timeoutMs: waitMs + 2000,
|
||||
const wait = await waitForSubagentRunOutcome(params.childRunId, settleTimeoutMs);
|
||||
const applied = applySubagentWaitOutcome({
|
||||
wait,
|
||||
outcome,
|
||||
startedAt: params.startedAt,
|
||||
endedAt: params.endedAt,
|
||||
});
|
||||
const waitError = typeof wait?.error === "string" ? wait.error : undefined;
|
||||
if (wait?.status === "timeout") {
|
||||
outcome = { status: "timeout" };
|
||||
} else if (wait?.status === "error") {
|
||||
outcome = { status: "error", error: waitError };
|
||||
} else if (wait?.status === "ok") {
|
||||
outcome = { status: "ok" };
|
||||
}
|
||||
if (typeof wait?.startedAt === "number" && !params.startedAt) {
|
||||
params.startedAt = wait.startedAt;
|
||||
}
|
||||
if (typeof wait?.endedAt === "number" && !params.endedAt) {
|
||||
params.endedAt = wait.endedAt;
|
||||
}
|
||||
outcome = applied.outcome;
|
||||
params.startedAt = applied.startedAt;
|
||||
params.endedAt = applied.endedAt;
|
||||
}
|
||||
|
||||
if (!outcome) {
|
||||
@@ -1422,6 +1454,28 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
reply = fallbackReply;
|
||||
}
|
||||
|
||||
// A worker can finish just after the first wait request timed out.
|
||||
// If we already have real completion content, do one cached recheck so
|
||||
// the final completion event prefers the authoritative terminal state.
|
||||
// This is best-effort; if the recheck fails, keep the known timeout
|
||||
// outcome instead of dropping the announcement entirely.
|
||||
if (outcome?.status === "timeout" && reply?.trim() && params.waitForCompletion !== false) {
|
||||
try {
|
||||
const rechecked = await waitForSubagentRunOutcome(params.childRunId, 0);
|
||||
const applied = applySubagentWaitOutcome({
|
||||
wait: rechecked,
|
||||
outcome,
|
||||
startedAt: params.startedAt,
|
||||
endedAt: params.endedAt,
|
||||
});
|
||||
outcome = applied.outcome;
|
||||
params.startedAt = applied.startedAt;
|
||||
params.endedAt = applied.endedAt;
|
||||
} catch {
|
||||
// Best-effort recheck; keep the existing timeout outcome on failure.
|
||||
}
|
||||
}
|
||||
|
||||
if (isAnnounceSkip(reply) || isSilentReplyText(reply, SILENT_REPLY_TOKEN)) {
|
||||
if (fallbackReply && !fallbackIsSilent) {
|
||||
reply = fallbackReply;
|
||||
@@ -1431,6 +1485,10 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
}
|
||||
}
|
||||
|
||||
if (!outcome) {
|
||||
outcome = { status: "unknown" };
|
||||
}
|
||||
|
||||
// Build status label
|
||||
const statusLabel =
|
||||
outcome.status === "ok"
|
||||
|
||||
Reference in New Issue
Block a user