Avoid stale subagent reply carryover

This commit is contained in:
Tak Hoffman
2026-03-26 22:27:49 -05:00
parent d81b5fc792
commit 6ef6e80abe
2 changed files with 115 additions and 10 deletions

View File

@@ -172,15 +172,15 @@ describe("sendControlledSubagentMessage", () => {
__testing.setDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "chat.history") {
return { messages: [] } as T;
}
if (request.method === "agent") {
return { runId: "run-followup-send" } as T;
}
if (request.method === "agent.wait") {
return { status: "done" } as T;
}
if (request.method === "chat.history") {
return { messages: [] } as T;
}
throw new Error(`unexpected method: ${request.method}`);
},
});
@@ -247,15 +247,15 @@ describe("sendControlledSubagentMessage", () => {
__testing.setDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "chat.history") {
return { messages: [] } as T;
}
if (request.method === "agent") {
return { runId: "run-followup-stale-send" } as T;
}
if (request.method === "agent.wait") {
return { status: "done" } as T;
}
if (request.method === "chat.history") {
return { messages: [] } as T;
}
throw new Error(`unexpected method: ${request.method}`);
},
});
@@ -292,6 +292,77 @@ describe("sendControlledSubagentMessage", () => {
replyText: undefined,
});
});
it("does not return the previous assistant reply when no new assistant message appears", async () => {
addSubagentRunForTests({
runId: "run-owned-stale-reply",
childSessionKey: "agent:main:subagent:owned-stale-reply",
controllerSessionKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "continue work",
cleanup: "keep",
createdAt: Date.now() - 5_000,
startedAt: Date.now() - 4_000,
endedAt: Date.now() - 1_000,
outcome: { status: "ok" },
});
let historyCalls = 0;
const staleAssistantMessage = {
role: "assistant",
content: [{ type: "text", text: "older reply from a previous run" }],
};
__testing.setDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "chat.history") {
historyCalls += 1;
return { messages: [staleAssistantMessage] } as T;
}
if (request.method === "agent") {
return { runId: "run-followup-stale-reply" } as T;
}
if (request.method === "agent.wait") {
return { status: "done" } as T;
}
throw new Error(`unexpected method: ${request.method}`);
},
});
const result = await sendControlledSubagentMessage({
cfg: {
channels: { whatsapp: { allowFrom: ["*"] } },
} as OpenClawConfig,
controller: {
controllerSessionKey: "agent:main:main",
callerSessionKey: "agent:main:main",
callerIsSubagent: false,
controlScope: "children",
},
entry: {
runId: "run-owned-stale-reply",
childSessionKey: "agent:main:subagent:owned-stale-reply",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
controllerSessionKey: "agent:main:main",
task: "continue work",
cleanup: "keep",
createdAt: Date.now() - 5_000,
startedAt: Date.now() - 4_000,
endedAt: Date.now() - 1_000,
outcome: { status: "ok" },
},
message: "continue",
});
expect(historyCalls).toBe(2);
expect(result).toEqual({
status: "ok",
runId: "run-followup-stale-reply",
replyText: undefined,
});
});
});
describe("killSubagentRunAdmin", () => {

View File

@@ -50,6 +50,7 @@ export const MAX_RECENT_MINUTES = 24 * 60;
export const MAX_STEER_MESSAGE_CHARS = 4_000;
export const STEER_RATE_LIMIT_MS = 2_000;
export const STEER_ABORT_SETTLE_TIMEOUT_MS = 5_000;
const SUBAGENT_REPLY_HISTORY_LIMIT = 50;
const steerRateLimit = new Map<string, number>();
@@ -195,6 +196,27 @@ export function isActiveSubagentRun(
return !entry.endedAt || pendingDescendantCount(entry.childSessionKey) > 0;
}
function resolveLatestAssistantReplySnapshot(messages: unknown[]): {
text?: string;
fingerprint?: string;
} {
for (let index = messages.length - 1; index >= 0; index -= 1) {
const message = messages[index];
const text = extractAssistantText(message);
if (!text) {
continue;
}
let fingerprint: string | undefined;
try {
fingerprint = JSON.stringify(message);
} catch {
fingerprint = text;
}
return { text, fingerprint };
}
return {};
}
function resolveRunStatus(entry: SubagentRunRecord, options?: { pendingDescendants?: number }) {
const pendingDescendants = Math.max(0, options?.pendingDescendants ?? 0);
if (pendingDescendants > 0) {
@@ -879,6 +901,14 @@ export async function sendControlledSubagentMessage(params: {
const idempotencyKey = crypto.randomUUID();
let runId: string = idempotencyKey;
try {
const historyBefore = await subagentControlDeps.callGateway<{ messages: Array<unknown> }>({
method: "chat.history",
params: { sessionKey: targetSessionKey, limit: SUBAGENT_REPLY_HISTORY_LIMIT },
});
const baselineReply = resolveLatestAssistantReplySnapshot(
stripToolMessages(Array.isArray(historyBefore?.messages) ? historyBefore.messages : []),
);
const response = await subagentControlDeps.callGateway<{ runId: string }>({
method: "agent",
params: {
@@ -914,11 +944,15 @@ export async function sendControlledSubagentMessage(params: {
const history = await subagentControlDeps.callGateway<{ messages: Array<unknown> }>({
method: "chat.history",
params: { sessionKey: targetSessionKey, limit: 50 },
params: { sessionKey: targetSessionKey, limit: SUBAGENT_REPLY_HISTORY_LIMIT },
});
const filtered = stripToolMessages(Array.isArray(history?.messages) ? history.messages : []);
const last = filtered.length > 0 ? filtered[filtered.length - 1] : undefined;
const replyText = last ? extractAssistantText(last) : undefined;
const latestReply = resolveLatestAssistantReplySnapshot(
stripToolMessages(Array.isArray(history?.messages) ? history.messages : []),
);
const replyText =
latestReply.text && latestReply.fingerprint !== baselineReply.fingerprint
? latestReply.text
: undefined;
return { status: "ok" as const, runId, replyText };
} catch (err) {
const error = err instanceof Error ? err.message : String(err);