fix(agents): pause yielded subagent runs

This commit is contained in:
Peter Steinberger
2026-04-28 09:56:28 +01:00
parent 2790825ae5
commit 6f3b5f8666
20 changed files with 498 additions and 8 deletions

View File

@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
- Channels/Mattermost: stop enqueueing regular inbound posts as system events, so Mattermost user messages reach the model only as user-role inbound-envelope content instead of also appearing as `System: Mattermost message...` directives. Fixes #71795. Thanks @juan-flores077.
- Agents/Anthropic: send implicit Anthropic beta headers only to direct public Anthropic endpoints, including OAuth, so custom Anthropic-compatible providers no longer mis-handle unsupported beta flags unless explicitly configured. Refs #73346. Thanks @byBrodowski.
- Skills: require explicit `skills.entries.coding-agent.enabled` before exposing the bundled coding-agent skill, so installs with Codex on PATH but no OpenAI auth do not silently offer Codex delegation. Fixes #73358. Thanks @LaFleurAdvertising and @Sanjays2402.
- Agents/subagents: preserve `sessions_yield` as a paused subagent state and ignore its wait text while freezing completion output, so parent sessions wait for the final post-compaction answer instead of receiving intermediate progress or `(no output)`. Fixes #73413. Thanks @Ask-sola.
- Plugins/startup: precompute bundled runtime mirror fingerprints before taking the mirror lock and keep Docker bundled plugin runtime deps/mirrors in a Docker-managed volume instead of the Windows/WSL config bind mount, so cold starts avoid slow host-volume mirror writes. Fixes #73339. Thanks @1yihui.
- Channels/LINE: persist inbound image, video, audio, and file downloads in `~/.openclaw/media/inbound/` instead of temporary files so agents can still read LINE media after `/tmp` cleanup. Fixes #73370. Thanks @hijirii and @wenxu007.
- CLI/plugins: keep bundled plugin installs out of `plugins.load.paths` while preserving install records, so install/inspect/doctor loops no longer warn about the current bundled plugin directory. Thanks @vincentkoc.

View File

@@ -2341,13 +2341,15 @@ export async function runEmbeddedPiAgent(
});
}
const replayInvalid = resolveReplayInvalidForAttempt(null);
const livenessState = resolveRunLivenessState({
payloadCount,
aborted,
timedOut,
attempt,
incompleteTurnText: null,
});
const livenessState = attempt.yieldDetected
? "paused"
: resolveRunLivenessState({
payloadCount,
aborted,
timedOut,
attempt,
incompleteTurnText: null,
});
const stopReason = attempt.clientToolCall
? "tool_calls"
: attempt.yieldDetected
@@ -2359,6 +2361,8 @@ export async function runEmbeddedPiAgent(
attempt.setTerminalLifecycleMeta?.({
replayInvalid,
livenessState,
stopReason,
yielded: attempt.yieldDetected === true,
});
return {
payloads: terminalPayloads?.length ? terminalPayloads : undefined,
@@ -2376,6 +2380,7 @@ export async function runEmbeddedPiAgent(
replayInvalid,
livenessState,
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
...(attempt.yieldDetected ? { yielded: true } : {}),
...(emptyAssistantReplyIsSilent
? { terminalReplyKind: "silent-empty" as const }
: {}),

View File

@@ -116,5 +116,7 @@ export type EmbeddedRunAttemptResult = {
setTerminalLifecycleMeta?: (meta: {
replayInvalid?: boolean;
livenessState?: EmbeddedRunLivenessState;
stopReason?: string;
yielded?: boolean;
}) => void;
};

View File

@@ -130,6 +130,7 @@ export type EmbeddedPiRunMeta = {
livenessState?: EmbeddedRunLivenessState;
agentHarnessResultClassification?: "empty" | "reasoning-only" | "planning-only";
terminalReplyKind?: "silent-empty";
yielded?: boolean;
error?: {
kind:
| "context_overflow"

View File

@@ -111,6 +111,10 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
}
const emitLifecycleTerminal = () => {
const terminalMeta = {
...(ctx.state.terminalStopReason ? { stopReason: ctx.state.terminalStopReason } : {}),
...(ctx.state.yielded === true ? { yielded: true } : {}),
};
if (isError) {
emitAgentEvent({
runId: ctx.params.runId,
@@ -118,6 +122,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
data: {
phase: "error",
error: lifecycleErrorText ?? "LLM request failed.",
...terminalMeta,
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
endedAt: Date.now(),
@@ -128,6 +133,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
data: {
phase: "error",
error: lifecycleErrorText ?? "LLM request failed.",
...terminalMeta,
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
},
@@ -139,6 +145,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
stream: "lifecycle",
data: {
phase: "end",
...terminalMeta,
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
endedAt: Date.now(),
@@ -148,6 +155,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
stream: "lifecycle",
data: {
phase: "end",
...terminalMeta,
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
},

View File

@@ -72,6 +72,8 @@ export type EmbeddedPiSubscribeState = {
unsubscribed: boolean;
replayState: EmbeddedRunReplayState;
livenessState?: EmbeddedRunLivenessState;
terminalStopReason?: string;
yielded?: boolean;
hadDeterministicSideEffect?: boolean;
messagingToolSentTexts: string[];

View File

@@ -881,6 +881,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
setTerminalLifecycleMeta: (meta: {
replayInvalid?: boolean;
livenessState?: EmbeddedRunLivenessState;
stopReason?: string;
yielded?: boolean;
}) => {
if (typeof meta.replayInvalid === "boolean") {
state.replayState = { ...state.replayState, replayInvalid: meta.replayInvalid };
@@ -888,6 +890,12 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (meta.livenessState) {
state.livenessState = meta.livenessState;
}
if (typeof meta.stopReason === "string") {
state.terminalStopReason = meta.stopReason;
}
if (typeof meta.yielded === "boolean") {
state.yielded = meta.yielded;
}
},
isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
isCompactionInFlight: () => state.compactionInFlight,

View File

@@ -22,6 +22,9 @@ export type AgentWaitResult = {
error?: string;
startedAt?: number;
endedAt?: number;
stopReason?: string;
livenessState?: string;
yielded?: boolean;
};
export type AgentRunsDrainResult = {
@@ -35,6 +38,9 @@ type RawAgentWaitResponse = {
error?: string;
startedAt?: unknown;
endedAt?: unknown;
stopReason?: unknown;
livenessState?: unknown;
yielded?: unknown;
};
function normalizeAgentWaitResult(
@@ -46,6 +52,9 @@ function normalizeAgentWaitResult(
error: typeof wait?.error === "string" ? wait.error : undefined,
startedAt: typeof wait?.startedAt === "number" ? wait.startedAt : undefined,
endedAt: typeof wait?.endedAt === "number" ? wait.endedAt : undefined,
stopReason: typeof wait?.stopReason === "string" ? wait.stopReason : undefined,
livenessState: typeof wait?.livenessState === "string" ? wait.livenessState : undefined,
yielded: wait?.yielded === true ? true : undefined,
};
}

View File

@@ -0,0 +1,103 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { __testing, readSubagentOutput } from "./subagent-announce-output.js";
type CallGateway = typeof import("../gateway/call.js").callGateway;
type ReadLatestAssistantReply = typeof import("./tools/agent-step.js").readLatestAssistantReply;
function installOutputDeps(params: { messages: Array<unknown>; latestAssistantReply?: string }) {
const callGateway = vi.fn(async () => ({ messages: params.messages }));
const readLatestAssistantReply = vi.fn(async () => params.latestAssistantReply);
__testing.setDepsForTest({
callGateway: callGateway as unknown as CallGateway,
readLatestAssistantReply: readLatestAssistantReply as unknown as ReadLatestAssistantReply,
});
return { callGateway, readLatestAssistantReply };
}
function sessionsYieldTurn(message = "Waiting for subagent completion.") {
return [
{
role: "assistant",
stopReason: "toolUse",
content: [
{ type: "text", text: message },
{
type: "toolCall",
id: "call-yield",
name: "sessions_yield",
arguments: { message },
},
],
},
{
role: "toolResult",
toolCallId: "call-yield",
toolName: "sessions_yield",
content: [
{
type: "text",
text: JSON.stringify({ status: "yielded", message }, null, 2),
},
],
details: { status: "yielded", message },
},
];
}
describe("readSubagentOutput", () => {
afterEach(() => {
__testing.setDepsForTest();
});
it("does not treat a sessions_yield wait turn as subagent completion output", async () => {
const deps = installOutputDeps({
messages: sessionsYieldTurn(),
latestAssistantReply: "Waiting for subagent completion.",
});
await expect(readSubagentOutput("agent:main:subagent:child")).resolves.toBeUndefined();
expect(deps.readLatestAssistantReply).not.toHaveBeenCalled();
});
it("returns final assistant output that arrives after a sessions_yield wait turn", async () => {
installOutputDeps({
messages: [
...sessionsYieldTurn(),
{
role: "system",
content: [{ type: "text", text: "Compaction" }],
__openclaw: { kind: "compaction" },
},
{
role: "assistant",
stopReason: "stop",
content: [{ type: "text", text: "Created /tmp/final-deck.pptx" }],
},
],
latestAssistantReply: "Waiting for subagent completion.",
});
await expect(readSubagentOutput("agent:main:subagent:child")).resolves.toBe(
"Created /tmp/final-deck.pptx",
);
});
it("keeps normal tool-use assistant output when the tool is not sessions_yield", async () => {
installOutputDeps({
messages: [
{
role: "assistant",
stopReason: "toolUse",
content: [
{ type: "text", text: "Mapped the code path." },
{ type: "toolCall", id: "call-read", name: "read", arguments: {} },
],
},
],
});
await expect(readSubagentOutput("agent:main:subagent:child")).resolves.toBe(
"Mapped the code path.",
);
});
});

View File

@@ -11,6 +11,7 @@ import {
resolveAgentIdFromSessionKey,
resolveStorePath,
} from "./subagent-announce.runtime.js";
import { assistantCallsSessionsYield, isSessionsYieldToolResult } from "./subagent-yield-output.js";
import { readLatestAssistantReply } from "./tools/agent-step.js";
import { extractAssistantText, sanitizeTextContent } from "./tools/session-message-text.js";
import { isAnnounceSkip } from "./tools/sessions-send-tokens.js";
@@ -46,6 +47,7 @@ type SubagentOutputSnapshot = {
latestRawText?: string;
assistantFragments: string[];
toolCallCount: number;
waitingForContinuation?: boolean;
};
export type AgentWaitResult = {
@@ -53,6 +55,9 @@ export type AgentWaitResult = {
startedAt?: number;
endedAt?: number;
error?: string;
stopReason?: string;
livenessState?: string;
yielded?: boolean;
};
export type SubagentRunOutcome = {
@@ -199,6 +204,7 @@ function summarizeSubagentOutputHistory(messages: Array<unknown>): SubagentOutpu
assistantFragments: [],
toolCallCount: 0,
};
let previousAssistantCalledYield = false;
for (const message of messages) {
if (!message || typeof message !== "object") {
continue;
@@ -206,25 +212,50 @@ function summarizeSubagentOutputHistory(messages: Array<unknown>): SubagentOutpu
const role = (message as { role?: unknown }).role;
if (role === "assistant") {
snapshot.toolCallCount += countAssistantToolCalls((message as { content?: unknown }).content);
if (assistantCallsSessionsYield(message)) {
snapshot.latestAssistantText = undefined;
snapshot.latestRawText = undefined;
snapshot.latestSilentText = undefined;
snapshot.assistantFragments = [];
snapshot.waitingForContinuation = true;
previousAssistantCalledYield = true;
continue;
}
const text = extractSubagentOutputText(message).trim();
if (!text) {
previousAssistantCalledYield = false;
continue;
}
if (isAnnounceSkip(text) || isSilentReplyText(text, SILENT_REPLY_TOKEN)) {
snapshot.latestSilentText = text;
snapshot.latestAssistantText = undefined;
snapshot.assistantFragments = [];
snapshot.waitingForContinuation = false;
previousAssistantCalledYield = false;
continue;
}
snapshot.latestSilentText = undefined;
snapshot.latestAssistantText = text;
snapshot.assistantFragments.push(text);
snapshot.waitingForContinuation = false;
previousAssistantCalledYield = false;
continue;
}
if (isSessionsYieldToolResult(message, previousAssistantCalledYield)) {
snapshot.latestAssistantText = undefined;
snapshot.latestRawText = undefined;
snapshot.latestSilentText = undefined;
snapshot.assistantFragments = [];
snapshot.waitingForContinuation = true;
previousAssistantCalledYield = false;
continue;
}
const text = extractSubagentOutputText(message).trim();
if (text) {
snapshot.latestRawText = text;
snapshot.waitingForContinuation = false;
}
previousAssistantCalledYield = false;
}
return snapshot;
}
@@ -256,6 +287,9 @@ function selectSubagentOutputText(
snapshot: SubagentOutputSnapshot,
outcome?: SubagentRunOutcome,
): string | undefined {
if (snapshot.waitingForContinuation) {
return undefined;
}
if (snapshot.latestSilentText) {
return snapshot.latestSilentText;
}
@@ -278,10 +312,14 @@ export async function readSubagentOutput(
params: { sessionKey, limit: 100 },
});
const messages = Array.isArray(history?.messages) ? history.messages : [];
const selected = selectSubagentOutputText(summarizeSubagentOutputHistory(messages), outcome);
const snapshot = summarizeSubagentOutputHistory(messages);
const selected = selectSubagentOutputText(snapshot, outcome);
if (selected?.trim()) {
return selected;
}
if (snapshot.waitingForContinuation) {
return undefined;
}
const latestAssistant = await subagentAnnounceOutputDeps.readLatestAssistantReply({
sessionKey,
limit: 100,

View File

@@ -701,6 +701,10 @@ export function createSubagentRegistryLifecycleController(params: {
entry.endedReason = completeParams.reason;
mutated = true;
}
if (entry.pauseReason !== undefined) {
entry.pauseReason = undefined;
mutated = true;
}
if (await freezeRunResultAtCompletion(entry, outcome)) {
mutated = true;

View File

@@ -36,6 +36,50 @@ function shouldDeleteAttachments(entry: SubagentRunRecord) {
return entry.cleanup === "delete" || !entry.retainAttachmentsOnKeep;
}
export function markSubagentRunPausedAfterYield(params: {
entry: SubagentRunRecord;
startedAt?: number;
endedAt?: number;
now?: number;
}): boolean {
const { entry } = params;
let mutated = false;
if (typeof params.startedAt === "number" && entry.startedAt !== params.startedAt) {
entry.startedAt = params.startedAt;
if (typeof entry.sessionStartedAt !== "number") {
entry.sessionStartedAt = params.startedAt;
}
mutated = true;
}
const endedAt = typeof params.endedAt === "number" ? params.endedAt : (params.now ?? Date.now());
if (entry.endedAt !== endedAt) {
entry.endedAt = endedAt;
mutated = true;
}
if (entry.pauseReason !== "sessions_yield") {
entry.pauseReason = "sessions_yield";
mutated = true;
}
if (entry.outcome !== undefined) {
entry.outcome = undefined;
mutated = true;
}
if (entry.endedReason !== undefined) {
entry.endedReason = undefined;
mutated = true;
}
if (entry.cleanupHandled === true) {
entry.cleanupHandled = false;
mutated = true;
}
if (entry.frozenResultText !== undefined) {
entry.frozenResultText = undefined;
entry.frozenResultCapturedAt = undefined;
mutated = true;
}
return mutated;
}
export type RegisterSubagentRunParams = {
runId: string;
childSessionKey: string;
@@ -116,6 +160,18 @@ export function createSubagentRunManager(params: {
if (wait.status === "pending") {
return;
}
if (wait.yielded === true) {
if (
markSubagentRunPausedAfterYield({
entry,
startedAt: wait.startedAt,
endedAt: wait.endedAt,
})
) {
params.persist();
}
return;
}
if (wait.status === "error" && isRecoverableAgentWaitError(wait.error)) {
log.info("subagent wait interrupted; scheduling recovery", {
runId,
@@ -282,6 +338,7 @@ export function createSubagentRunManager(params: {
accumulatedRuntimeMs,
endedAt: undefined,
endedReason: undefined,
pauseReason: undefined,
endedHookEmittedAt: undefined,
wakeOnDescendantSettle: undefined,
outcome: undefined,

View File

@@ -199,6 +199,54 @@ describe("subagent registry seam flow", () => {
expect(run?.outcome).toBeUndefined();
});
it("keeps sessions_yield-ended subagent runs paused instead of announcing no output", async () => {
mocks.callGateway.mockImplementation(async (request: { method?: string }) => {
if (request.method === "agent.wait") {
return {
status: "ok",
startedAt: 111,
endedAt: 222,
stopReason: "end_turn",
livenessState: "paused",
yielded: true,
};
}
return {};
});
mod.registerSubagentRun({
runId: "run-yield-paused",
childSessionKey: "agent:main:subagent:child",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "wait for child continuation",
cleanup: "keep",
});
await waitForFast(() => {
const run = mod
.listSubagentRunsForRequester("agent:main:main")
.find((entry) => entry.runId === "run-yield-paused");
expect(run?.endedAt).toBe(222);
expect(run?.pauseReason).toBe("sessions_yield");
});
expect(mocks.runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(mod.countPendingDescendantRuns("agent:main:main")).toBe(1);
expect(
mod.replaceSubagentRunAfterSteer({
previousRunId: "run-yield-paused",
nextRunId: "run-yield-continuation",
}),
).toBe(true);
const replacement = mod
.listSubagentRunsForRequester("agent:main:main")
.find((entry) => entry.runId === "run-yield-continuation");
expect(replacement?.runId).toBe("run-yield-continuation");
expect(replacement?.pauseReason).toBeUndefined();
expect(replacement?.endedAt).toBeUndefined();
});
it("reconciles stale active runs from persisted terminal session state during sweep", async () => {
mocks.callGateway.mockImplementation(async (request: { method?: string }) => {
if (request.method === "agent.wait") {

View File

@@ -55,6 +55,7 @@ import {
} from "./subagent-registry-queries.js";
import {
createSubagentRunManager,
markSubagentRunPausedAfterYield,
type RegisterSubagentRunParams,
} from "./subagent-registry-run-manager.js";
import {
@@ -578,6 +579,9 @@ function resumeSubagentRun(runId: string) {
if (entry.cleanupCompletedAt) {
return;
}
if (entry.pauseReason === "sessions_yield") {
return;
}
// Skip entries that have exhausted their retry budget or expired (#18264).
if ((entry.announceRetryCount ?? 0) >= MAX_ANNOUNCE_RETRY_COUNT) {
void finalizeResumedAnnounceGiveUp({
@@ -924,6 +928,19 @@ function ensureListener() {
});
return;
}
if (evt.data?.yielded === true) {
if (
markSubagentRunPausedAfterYield({
entry,
endedAt,
startedAt:
typeof evt.data?.startedAt === "number" ? evt.data.startedAt : entry.startedAt,
})
) {
persistSubagentRuns();
}
return;
}
clearPendingLifecycleError(evt.runId);
clearPendingLifecycleTimeout(evt.runId);
await completeSubagentRun({

View File

@@ -32,6 +32,7 @@ export type SubagentRunRecord = {
lastAnnounceRetryAt?: number;
lastAnnounceDeliveryError?: string;
endedReason?: SubagentLifecycleEndedReason;
pauseReason?: "sessions_yield";
wakeOnDescendantSettle?: boolean;
frozenResultText?: string | null;
frozenResultCapturedAt?: number;

View File

@@ -0,0 +1,106 @@
function asRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function readToolName(value: unknown): string | undefined {
const record = asRecord(value);
if (!record) {
return undefined;
}
for (const key of ["name", "toolName", "tool_name", "functionName", "function_name"]) {
const candidate = record[key];
if (typeof candidate === "string" && candidate.trim()) {
return candidate.trim();
}
}
return undefined;
}
function isToolCallBlock(value: unknown): boolean {
const record = asRecord(value);
if (!record) {
return false;
}
return (
record.type === "toolCall" ||
record.type === "tool_use" ||
record.type === "toolUse" ||
record.type === "functionCall" ||
record.type === "function_call"
);
}
export function assistantCallsSessionsYield(message: unknown): boolean {
const record = asRecord(message);
if (!record || record.role !== "assistant" || !Array.isArray(record.content)) {
return false;
}
return record.content.some(
(block) => isToolCallBlock(block) && readToolName(block) === "sessions_yield",
);
}
function parseJsonObject(text: string): Record<string, unknown> | undefined {
const trimmed = text.trim();
if (!trimmed.startsWith("{")) {
return undefined;
}
try {
return asRecord(JSON.parse(trimmed));
} catch {
return undefined;
}
}
function readStructuredToolPayload(content: unknown): Record<string, unknown> | undefined {
const record = asRecord(content);
if (record) {
return record;
}
if (typeof content === "string") {
return parseJsonObject(content);
}
if (!Array.isArray(content)) {
return undefined;
}
for (const block of content) {
const blockRecord = asRecord(block);
if (!blockRecord) {
continue;
}
const text = blockRecord.text;
if (typeof text !== "string") {
continue;
}
const parsed = parseJsonObject(text);
if (parsed) {
return parsed;
}
}
return undefined;
}
export function isSessionsYieldToolResult(
message: unknown,
previousAssistantCalledYield: boolean,
): boolean {
const record = asRecord(message);
if (!record || (record.role !== "toolResult" && record.role !== "tool")) {
return false;
}
const toolName = readToolName(record);
if (toolName === "sessions_yield") {
return true;
}
if (!previousAssistantCalledYield) {
return false;
}
const details = asRecord(record.details);
if (details?.status === "yielded") {
return true;
}
const payload = readStructuredToolPayload(record.content);
return payload?.status === "yielded";
}

View File

@@ -29,6 +29,9 @@ type AgentRunSnapshot = {
startedAt?: number;
endedAt?: number;
error?: string;
stopReason?: string;
livenessState?: string;
yielded?: boolean;
ts: number;
};
@@ -135,12 +138,17 @@ function createSnapshotFromLifecycleEvent(params: {
typeof data?.startedAt === "number" ? data.startedAt : agentRunStarts.get(runId);
const endedAt = typeof data?.endedAt === "number" ? data.endedAt : undefined;
const error = typeof data?.error === "string" ? data.error : undefined;
const stopReason = typeof data?.stopReason === "string" ? data.stopReason : undefined;
const livenessState = typeof data?.livenessState === "string" ? data.livenessState : undefined;
return {
runId,
status: phase === "error" ? "error" : data?.aborted ? "timeout" : "ok",
startedAt,
endedAt,
error,
stopReason,
livenessState,
...(data?.yielded === true ? { yielded: true } : {}),
ts: Date.now(),
};
}

View File

@@ -70,6 +70,45 @@ describe("agent wait dedupe helper", () => {
expect(__testing.getWaiterCount(runId)).toBe(0);
});
it("preserves structured yield metadata from terminal agent results", () => {
const dedupe = new Map();
const runId = "run-yielded";
setRunEntry({
dedupe,
kind: "agent",
runId,
payload: {
runId,
status: "ok",
startedAt: 100,
endedAt: 200,
result: {
meta: {
stopReason: "end_turn",
livenessState: "paused",
yielded: true,
},
},
},
});
expect(
readTerminalSnapshotFromGatewayDedupe({
dedupe,
runId,
}),
).toEqual({
status: "ok",
startedAt: 100,
endedAt: 200,
error: undefined,
stopReason: "end_turn",
livenessState: "paused",
yielded: true,
});
});
it("keeps stale chat dedupe blocked while agent dedupe is in-flight", async () => {
const dedupe = new Map();
const runId = "run-stale-chat";

View File

@@ -6,6 +6,9 @@ export type AgentWaitTerminalSnapshot = {
startedAt?: number;
endedAt?: number;
error?: string;
stopReason?: string;
livenessState?: string;
yielded?: boolean;
};
const AGENT_WAITERS_BY_RUN_ID = new Map<string, Set<() => void>>();
@@ -24,6 +27,16 @@ function asFiniteNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function asRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function asString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value : undefined;
}
function removeWaiter(runId: string, waiter: () => void): void {
const waiters = AGENT_WAITERS_BY_RUN_ID.get(runId);
if (!waiters) {
@@ -73,6 +86,10 @@ export function readTerminalSnapshotFromDedupeEntry(
endedAt?: unknown;
error?: unknown;
summary?: unknown;
stopReason?: unknown;
livenessState?: unknown;
yielded?: unknown;
result?: unknown;
}
| undefined;
const status = typeof payload?.status === "string" ? payload.status : undefined;
@@ -82,6 +99,10 @@ export function readTerminalSnapshotFromDedupeEntry(
const startedAt = asFiniteNumber(payload?.startedAt);
const endedAt = asFiniteNumber(payload?.endedAt) ?? entry.ts;
const resultMeta = asRecord(asRecord(payload?.result)?.meta);
const stopReason = asString(payload?.stopReason) ?? asString(resultMeta?.stopReason);
const livenessState = asString(payload?.livenessState) ?? asString(resultMeta?.livenessState);
const yielded = payload?.yielded === true || resultMeta?.yielded === true;
const errorMessage =
typeof payload?.error === "string"
? payload.error
@@ -95,6 +116,9 @@ export function readTerminalSnapshotFromDedupeEntry(
startedAt,
endedAt,
error: status === "timeout" ? errorMessage : undefined,
stopReason,
livenessState,
...(yielded ? { yielded } : {}),
};
}
if (status === "error" || !entry.ok) {
@@ -103,6 +127,9 @@ export function readTerminalSnapshotFromDedupeEntry(
startedAt,
endedAt,
error: errorMessage,
stopReason,
livenessState,
...(yielded ? { yielded } : {}),
};
}
return null;

View File

@@ -1320,6 +1320,9 @@ export const agentHandlers: GatewayRequestHandlers = {
startedAt: cachedGatewaySnapshot.startedAt,
endedAt: cachedGatewaySnapshot.endedAt,
error: cachedGatewaySnapshot.error,
stopReason: cachedGatewaySnapshot.stopReason,
livenessState: cachedGatewaySnapshot.livenessState,
yielded: cachedGatewaySnapshot.yielded,
});
return;
}
@@ -1374,6 +1377,9 @@ export const agentHandlers: GatewayRequestHandlers = {
startedAt: snapshot.startedAt,
endedAt: snapshot.endedAt,
error: snapshot.error,
stopReason: snapshot.stopReason,
livenessState: snapshot.livenessState,
yielded: snapshot.yielded,
});
},
};