refactor(agents): share run wait reply helpers

This commit is contained in:
Peter Steinberger
2026-04-04 20:06:28 +09:00
parent bbb0b574c4
commit 7b6334b0f4
6 changed files with 325 additions and 173 deletions

View File

@@ -40,11 +40,10 @@ import {
type SubagentRunRecord,
} from "./subagent-registry.js";
import {
extractAssistantText,
resolveInternalSessionKey,
resolveMainSessionAlias,
stripToolMessages,
} from "./tools/sessions-helpers.js";
readLatestAssistantReplySnapshot,
waitForAgentRunAndReadUpdatedAssistantReply,
} from "./tools/agent-step.js";
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./tools/sessions-helpers.js";
export const DEFAULT_RECENT_MINUTES = 30;
export const MAX_RECENT_MINUTES = 24 * 60;
@@ -197,27 +196,6 @@ export function isActiveSubagentRun(
return !entry.endedAt || pendingDescendantCount(entry.childSessionKey) > 0;
}
function resolveLatestAssistantReplySnapshot(messages: unknown[]): {
text?: string;
fingerprint?: string;
} {
for (let index = messages.length - 1; index >= 0; index -= 1) {
const message = messages[index];
const text = extractAssistantText(message);
if (!text) {
continue;
}
let fingerprint: string | undefined;
try {
fingerprint = JSON.stringify(message);
} catch {
fingerprint = text;
}
return { text, fingerprint };
}
return {};
}
function resolveRunStatus(entry: SubagentRunRecord, options?: { pendingDescendants?: number }) {
const pendingDescendants = Math.max(0, options?.pendingDescendants ?? 0);
if (pendingDescendants > 0) {
@@ -880,13 +858,11 @@ export async function sendControlledSubagentMessage(params: {
const idempotencyKey = crypto.randomUUID();
let runId: string = idempotencyKey;
try {
const historyBefore = await subagentControlDeps.callGateway<{ messages: Array<unknown> }>({
method: "chat.history",
params: { sessionKey: targetSessionKey, limit: SUBAGENT_REPLY_HISTORY_LIMIT },
const baselineReply = await readLatestAssistantReplySnapshot({
sessionKey: targetSessionKey,
limit: SUBAGENT_REPLY_HISTORY_LIMIT,
callGateway: subagentControlDeps.callGateway,
});
const baselineReply = resolveLatestAssistantReplySnapshot(
stripToolMessages(Array.isArray(historyBefore?.messages) ? historyBefore.messages : []),
);
const response = await subagentControlDeps.callGateway<{ runId: string }>({
method: "agent",
@@ -907,32 +883,25 @@ export async function sendControlledSubagentMessage(params: {
runId = responseRunId;
}
const waitMs = 30_000;
const wait = await subagentControlDeps.callGateway<{ status?: string; error?: string }>({
method: "agent.wait",
params: { runId, timeoutMs: waitMs },
timeoutMs: waitMs + 2_000,
const result = await waitForAgentRunAndReadUpdatedAssistantReply({
runId,
sessionKey: targetSessionKey,
timeoutMs: 30_000,
limit: SUBAGENT_REPLY_HISTORY_LIMIT,
baseline: baselineReply,
callGateway: subagentControlDeps.callGateway,
});
if (wait?.status === "timeout") {
if (result.status === "timeout") {
return { status: "timeout" as const, runId };
}
if (wait?.status === "error") {
const waitError = typeof wait.error === "string" ? wait.error : "unknown error";
return { status: "error" as const, runId, error: waitError };
if (result.status === "error") {
return {
status: "error" as const,
runId,
error: result.error ?? "unknown error",
};
}
const history = await subagentControlDeps.callGateway<{ messages: Array<unknown> }>({
method: "chat.history",
params: { sessionKey: targetSessionKey, limit: SUBAGENT_REPLY_HISTORY_LIMIT },
});
const latestReply = resolveLatestAssistantReplySnapshot(
stripToolMessages(Array.isArray(history?.messages) ? history.messages : []),
);
const replyText =
latestReply.text && latestReply.fingerprint !== baselineReply.fingerprint
? latestReply.text
: undefined;
return { status: "ok" as const, runId, replyText };
return { status: "ok" as const, runId, replyText: result.replyText };
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
return { status: "error" as const, runId, error };

View File

@@ -21,6 +21,7 @@ import {
safeRemoveAttachmentsDir,
} from "./subagent-registry-helpers.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
import { waitForAgentRun } from "./tools/agent-step.js";
const log = createSubsystemLogger("agents/subagent-registry");
@@ -74,23 +75,11 @@ export function createSubagentRunManager(params: {
}) {
const waitForSubagentCompletion = async (runId: string, waitTimeoutMs: number) => {
try {
const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs));
const wait = await params.callGateway<{
status?: string;
startedAt?: number;
endedAt?: number;
error?: string;
}>({
method: "agent.wait",
params: {
runId,
timeoutMs,
},
timeoutMs: timeoutMs + 10_000,
const wait = await waitForAgentRun({
runId,
timeoutMs: Math.max(1, Math.floor(waitTimeoutMs)),
callGateway: params.callGateway,
});
if (wait?.status !== "ok" && wait?.status !== "error" && wait?.status !== "timeout") {
return;
}
const entry = params.runs.get(runId);
if (!entry) {
return;

View File

@@ -5,7 +5,13 @@ vi.mock("../../gateway/call.js", () => ({
callGateway: (opts: unknown) => callGatewayMock(opts),
}));
import { __testing, readLatestAssistantReply } from "./agent-step.js";
import {
__testing,
readLatestAssistantReply,
readLatestAssistantReplySnapshot,
waitForAgentRun,
waitForAgentRunAndReadUpdatedAssistantReply,
} from "./agent-step.js";
describe("readLatestAssistantReply", () => {
beforeEach(() => {
@@ -49,4 +55,127 @@ describe("readLatestAssistantReply", () => {
expect(result).toBe("older output");
});
it("returns assistant fingerprints for delta comparisons", async () => {
callGatewayMock.mockResolvedValue({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "new output" }],
timestamp: 42,
},
],
});
const result = await readLatestAssistantReplySnapshot({ sessionKey: "agent:main:child" });
expect(result.text).toBe("new output");
expect(result.fingerprint).toContain('"timestamp":42');
});
});
describe("waitForAgentRun", () => {
beforeEach(() => {
callGatewayMock.mockClear();
__testing.setDepsForTest({
callGateway: async (opts) => await callGatewayMock(opts),
});
});
it("maps gateway timeouts to timeout status", async () => {
callGatewayMock.mockRejectedValue(new Error("gateway timeout while waiting"));
const result = await waitForAgentRun({ runId: "run-1", timeoutMs: 500 });
expect(result).toEqual({
status: "timeout",
error: "gateway timeout while waiting",
});
});
it("preserves timing metadata from agent.wait", async () => {
callGatewayMock.mockResolvedValue({
status: "ok",
startedAt: 100,
endedAt: 200,
});
const result = await waitForAgentRun({ runId: "run-2", timeoutMs: 500 });
expect(result).toEqual({
status: "ok",
startedAt: 100,
endedAt: 200,
});
});
});
describe("waitForAgentRunAndReadUpdatedAssistantReply", () => {
beforeEach(() => {
callGatewayMock.mockClear();
__testing.setDepsForTest({
callGateway: async (opts) => await callGatewayMock(opts),
});
});
it("returns undefined when the latest assistant fingerprint matches the baseline", async () => {
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text: "same reply" }],
timestamp: 42,
};
callGatewayMock
.mockResolvedValueOnce({
status: "ok",
})
.mockResolvedValueOnce({
messages: [assistantMessage],
});
const result = await waitForAgentRunAndReadUpdatedAssistantReply({
runId: "run-1",
sessionKey: "agent:main:child",
timeoutMs: 1_000,
baseline: {
text: "same reply",
fingerprint: JSON.stringify(assistantMessage),
},
});
expect(result).toEqual({
status: "ok",
replyText: undefined,
});
});
it("returns the new assistant text when the fingerprint changes", async () => {
callGatewayMock
.mockResolvedValueOnce({
status: "ok",
})
.mockResolvedValueOnce({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "fresh reply" }],
timestamp: 99,
},
],
});
const result = await waitForAgentRunAndReadUpdatedAssistantReply({
runId: "run-2",
sessionKey: "agent:main:child",
timeoutMs: 1_000,
baseline: {
text: "older reply",
fingerprint: "old-fingerprint",
},
});
expect(result).toEqual({
status: "ok",
replyText: "fresh reply",
});
});
});

View File

@@ -14,17 +14,40 @@ let agentStepDeps: {
callGateway: GatewayCaller;
} = defaultAgentStepDeps;
export async function readLatestAssistantReply(params: {
sessionKey: string;
limit?: number;
}): Promise<string | undefined> {
const history = await agentStepDeps.callGateway<{ messages: Array<unknown> }>({
method: "chat.history",
params: { sessionKey: params.sessionKey, limit: params.limit ?? 50 },
});
const filtered = stripToolMessages(Array.isArray(history?.messages) ? history.messages : []);
for (let i = filtered.length - 1; i >= 0; i -= 1) {
const candidate = filtered[i];
export type AssistantReplySnapshot = {
text?: string;
fingerprint?: string;
};
export type AgentWaitResult = {
status: "ok" | "timeout" | "error";
error?: string;
startedAt?: number;
endedAt?: number;
};
type RawAgentWaitResponse = {
status?: string;
error?: string;
startedAt?: unknown;
endedAt?: unknown;
};
function normalizeAgentWaitResult(
status: AgentWaitResult["status"],
wait?: RawAgentWaitResponse,
): AgentWaitResult {
return {
status,
error: typeof wait?.error === "string" ? wait.error : undefined,
startedAt: typeof wait?.startedAt === "number" ? wait.startedAt : undefined,
endedAt: typeof wait?.endedAt === "number" ? wait.endedAt : undefined,
};
}
function resolveLatestAssistantReplySnapshot(messages: unknown[]): AssistantReplySnapshot {
for (let i = messages.length - 1; i >= 0; i -= 1) {
const candidate = messages[i];
if (!candidate || typeof candidate !== "object") {
continue;
}
@@ -35,9 +58,107 @@ export async function readLatestAssistantReply(params: {
if (!text?.trim()) {
continue;
}
return text;
let fingerprint: string | undefined;
try {
fingerprint = JSON.stringify(candidate);
} catch {
fingerprint = text;
}
return { text, fingerprint };
}
return undefined;
return {};
}
export async function readLatestAssistantReplySnapshot(params: {
sessionKey: string;
limit?: number;
callGateway?: GatewayCaller;
}): Promise<AssistantReplySnapshot> {
const history = await (params.callGateway ?? agentStepDeps.callGateway)<{
messages: Array<unknown>;
}>({
method: "chat.history",
params: { sessionKey: params.sessionKey, limit: params.limit ?? 50 },
});
return resolveLatestAssistantReplySnapshot(
stripToolMessages(Array.isArray(history?.messages) ? history.messages : []),
);
}
export async function readLatestAssistantReply(params: {
sessionKey: string;
limit?: number;
}): Promise<string | undefined> {
return (
await readLatestAssistantReplySnapshot({
sessionKey: params.sessionKey,
limit: params.limit,
})
).text;
}
export async function waitForAgentRun(params: {
runId: string;
timeoutMs: number;
callGateway?: GatewayCaller;
}): Promise<AgentWaitResult> {
const timeoutMs = Math.max(1, Math.floor(params.timeoutMs));
try {
const wait = await (params.callGateway ?? agentStepDeps.callGateway)<RawAgentWaitResponse>({
method: "agent.wait",
params: {
runId: params.runId,
timeoutMs,
},
timeoutMs: timeoutMs + 2000,
});
if (wait?.status === "timeout") {
return normalizeAgentWaitResult("timeout", wait);
}
if (wait?.status === "error") {
return normalizeAgentWaitResult("error", wait);
}
return normalizeAgentWaitResult("ok", wait);
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
return {
status: error.includes("gateway timeout") ? "timeout" : "error",
error,
};
}
}
export async function waitForAgentRunAndReadUpdatedAssistantReply(params: {
runId: string;
sessionKey: string;
timeoutMs: number;
limit?: number;
baseline?: AssistantReplySnapshot;
callGateway?: GatewayCaller;
}): Promise<AgentWaitResult & { replyText?: string }> {
const wait = await waitForAgentRun({
runId: params.runId,
timeoutMs: params.timeoutMs,
callGateway: params.callGateway,
});
if (wait.status !== "ok") {
return wait;
}
const latestReply = await readLatestAssistantReplySnapshot({
sessionKey: params.sessionKey,
limit: params.limit,
callGateway: params.callGateway,
});
const baselineFingerprint = params.baseline?.fingerprint;
const replyText =
latestReply.text && (!baselineFingerprint || latestReply.fingerprint !== baselineFingerprint)
? latestReply.text
: undefined;
return {
status: "ok",
replyText,
};
}
export async function runAgentStep(params: {
@@ -74,19 +195,15 @@ export async function runAgentStep(params: {
const stepRunId = typeof response?.runId === "string" && response.runId ? response.runId : "";
const resolvedRunId = stepRunId || stepIdem;
const stepWaitMs = Math.min(params.timeoutMs, 60_000);
const wait = await agentStepDeps.callGateway<{ status?: string }>({
method: "agent.wait",
params: {
runId: resolvedRunId,
timeoutMs: stepWaitMs,
},
timeoutMs: stepWaitMs + 2000,
const result = await waitForAgentRunAndReadUpdatedAssistantReply({
runId: resolvedRunId,
sessionKey: params.sessionKey,
timeoutMs: Math.min(params.timeoutMs, 60_000),
});
if (wait?.status !== "ok") {
if (result.status !== "ok") {
return undefined;
}
return await readLatestAssistantReply({ sessionKey: params.sessionKey });
return result.replyText;
}
export const __testing = {

View File

@@ -4,7 +4,7 @@ import { formatErrorMessage } from "../../infra/errors.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
import { AGENT_LANE_NESTED } from "../lanes.js";
import { readLatestAssistantReply, runAgentStep } from "./agent-step.js";
import { readLatestAssistantReply, runAgentStep, waitForAgentRun } from "./agent-step.js";
import { resolveAnnounceTarget } from "./sessions-announce-target.js";
import {
buildAgentToAgentAnnounceContext,
@@ -41,16 +41,12 @@ export async function runSessionsSendA2AFlow(params: {
let primaryReply = params.roundOneReply;
let latestReply = params.roundOneReply;
if (!primaryReply && params.waitRunId) {
const waitMs = Math.min(params.announceTimeoutMs, 60_000);
const wait = await sessionsSendA2ADeps.callGateway<{ status: string }>({
method: "agent.wait",
params: {
runId: params.waitRunId,
timeoutMs: waitMs,
},
timeoutMs: waitMs + 2000,
const wait = await waitForAgentRun({
runId: params.waitRunId,
timeoutMs: Math.min(params.announceTimeoutMs, 60_000),
callGateway: sessionsSendA2ADeps.callGateway,
});
if (wait?.status === "ok") {
if (wait.status === "ok") {
primaryReply = await readLatestAssistantReply({
sessionKey: params.targetSessionKey,
});

View File

@@ -9,17 +9,19 @@ import {
INTERNAL_MESSAGE_CHANNEL,
} from "../../utils/message-channel.js";
import { AGENT_LANE_NESTED } from "../lanes.js";
import {
readLatestAssistantReplySnapshot,
waitForAgentRunAndReadUpdatedAssistantReply,
} from "./agent-step.js";
import type { AnyAgentTool } from "./common.js";
import { jsonResult, readStringParam } from "./common.js";
import {
createSessionVisibilityGuard,
createAgentToAgentPolicy,
extractAssistantText,
resolveEffectiveSessionToolsVisibility,
resolveSessionReference,
resolveSessionToolContext,
resolveVisibleSessionReference,
stripToolMessages,
} from "./sessions-helpers.js";
import { buildAgentToAgentMessageContext, resolvePingPongTurns } from "./sessions-send-helpers.js";
import { runSessionsSendA2AFlow } from "./sessions-send-tool.a2a.js";
@@ -35,27 +37,6 @@ const SessionsSendToolSchema = Type.Object({
type GatewayCaller = typeof callGateway;
const SESSIONS_SEND_REPLY_HISTORY_LIMIT = 50;
function resolveLatestAssistantReplySnapshot(messages: unknown[]): {
text?: string;
fingerprint?: string;
} {
for (let index = messages.length - 1; index >= 0; index -= 1) {
const message = messages[index];
const text = extractAssistantText(message);
if (!text) {
continue;
}
let fingerprint: string | undefined;
try {
fingerprint = JSON.stringify(message);
} catch {
fingerprint = text;
}
return { text, fingerprint };
}
return {};
}
async function startAgentRun(params: {
callGateway: GatewayCaller;
runId: string;
@@ -334,66 +315,37 @@ export function createSessionsSendTool(opts?: {
}
runId = start.runId;
const historyBefore = await gatewayCall<{ messages: Array<unknown> }>({
method: "chat.history",
params: { sessionKey: resolvedKey, limit: SESSIONS_SEND_REPLY_HISTORY_LIMIT },
const baselineReply = await readLatestAssistantReplySnapshot({
sessionKey: resolvedKey,
limit: SESSIONS_SEND_REPLY_HISTORY_LIMIT,
callGateway: gatewayCall,
});
const result = await waitForAgentRunAndReadUpdatedAssistantReply({
runId,
sessionKey: resolvedKey,
timeoutMs,
limit: SESSIONS_SEND_REPLY_HISTORY_LIMIT,
baseline: baselineReply,
callGateway: gatewayCall,
});
const baselineReply = resolveLatestAssistantReplySnapshot(
stripToolMessages(Array.isArray(historyBefore?.messages) ? historyBefore.messages : []),
);
let waitStatus: string | undefined;
let waitError: string | undefined;
try {
const wait = await gatewayCall<{ status?: string; error?: string }>({
method: "agent.wait",
params: {
runId,
timeoutMs,
},
timeoutMs: timeoutMs + 2000,
});
waitStatus = typeof wait?.status === "string" ? wait.status : undefined;
waitError = typeof wait?.error === "string" ? wait.error : undefined;
} catch (err) {
const messageText =
err instanceof Error ? err.message : typeof err === "string" ? err : "error";
return jsonResult({
runId,
status: messageText.includes("gateway timeout") ? "timeout" : "error",
error: messageText,
sessionKey: displayKey,
});
}
if (waitStatus === "timeout") {
if (result.status === "timeout") {
return jsonResult({
runId,
status: "timeout",
error: waitError,
error: result.error,
sessionKey: displayKey,
});
}
if (waitStatus === "error") {
if (result.status === "error") {
return jsonResult({
runId,
status: "error",
error: waitError ?? "agent error",
error: result.error ?? "agent error",
sessionKey: displayKey,
});
}
const history = await gatewayCall<{ messages: Array<unknown> }>({
method: "chat.history",
params: { sessionKey: resolvedKey, limit: SESSIONS_SEND_REPLY_HISTORY_LIMIT },
});
const latestReply = resolveLatestAssistantReplySnapshot(
stripToolMessages(Array.isArray(history?.messages) ? history.messages : []),
);
const reply =
latestReply.text && latestReply.fingerprint !== baselineReply.fingerprint
? latestReply.text
: undefined;
const reply = result.replyText;
startA2AFlow(reply ?? undefined);
return jsonResult({