fix(agents): finalize embedded lifecycle backstop

This commit is contained in:
Peter Steinberger
2026-04-29 22:29:07 +01:00
parent d51af16fab
commit ebff12e84f
3 changed files with 172 additions and 0 deletions

View File

@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Agents/sessions: emit a terminal lifecycle backstop when embedded timeout/error turns return without `agent_end`, so Gateway sessions no longer stay stuck in `running` after failover surfaces a timeout. Fixes #74607. Thanks @millerc79.
- Agents/Codex: bound embedded-run cleanup, trajectory flushing, and command-lane task timeouts after runtime failures, so Discord and other chat sessions return to idle instead of staying stuck in processing. Thanks @vincentkoc.
- Heartbeat/exec: consume successful metadata-only async exec completions silently so Telegram and other chat surfaces no longer ask users for missing command logs after `No session found`. Fixes #74595. Thanks @gkoch02.
- Web fetch: add a documented `tools.web.fetch.ssrfPolicy.allowIpv6UniqueLocalRange` opt-in and thread it through cache keys and DNS/IP checks so trusted fake-IP proxy stacks using `fc00::/7` can work without broad private-network access. Fixes #74351. Thanks @jeffrey701.

View File

@@ -1194,6 +1194,110 @@ describe("runAgentTurnWithFallback", () => {
});
});
it("emits an embedded lifecycle terminal backstop when the runner returns without one", async () => {
const agentEvents = await import("../../infra/agent-events.js");
const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent);
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "lifecycle",
data: { phase: "start", startedAt: 1_000 },
});
return {
payloads: [{ text: "Request timed out before a response was generated.", isError: true }],
meta: { aborted: true, livenessState: "blocked", replayInvalid: true },
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { runId: "run-timeout" } as GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("success");
expect(emitAgentEvent).toHaveBeenCalledWith({
runId: "run-timeout",
sessionKey: "main",
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1_000,
endedAt: expect.any(Number),
aborted: true,
livenessState: "blocked",
replayInvalid: true,
},
});
});
it("does not duplicate embedded lifecycle terminal events already reported by the runner", async () => {
const agentEvents = await import("../../infra/agent-events.js");
const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent);
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "lifecycle",
data: { phase: "start", startedAt: 1_000 },
});
await params.onAgentEvent?.({
stream: "lifecycle",
data: { phase: "end", endedAt: 1_500 },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { runId: "run-complete" } as GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("success");
expect(emitAgentEvent).not.toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-complete",
stream: "lifecycle",
}),
);
});
it("trims chatty GPT ack-turn final prose", async () => {
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({
result: await params.run("openai", "gpt-5.4"),

View File

@@ -807,6 +807,66 @@ function isReplyOperationRestartAbort(replyOperation?: ReplyOperation): boolean
);
}
function createEmbeddedLifecycleTerminalBackstop(params: { runId: string; sessionKey?: string }) {
let terminalEmitted = false;
let startedAt: number | undefined;
const note = (evt: { stream: string; data: Record<string, unknown> }) => {
if (evt.stream !== "lifecycle") {
return;
}
const phase = readStringValue(evt.data.phase);
if (phase === "start" && typeof evt.data.startedAt === "number") {
startedAt = evt.data.startedAt;
}
if (phase === "end" || phase === "error") {
terminalEmitted = true;
}
};
const emit = (phase: "end" | "error", resultOrError: unknown) => {
if (terminalEmitted) {
return;
}
terminalEmitted = true;
const data: Record<string, unknown> = {
phase,
endedAt: Date.now(),
...(startedAt !== undefined ? { startedAt } : {}),
};
if (phase === "error") {
data.error = formatErrorMessage(resultOrError);
} else {
const meta =
resultOrError && typeof resultOrError === "object" && "meta" in resultOrError
? (resultOrError as { meta?: Record<string, unknown> }).meta
: undefined;
if (meta?.aborted === true) {
data.aborted = true;
}
const stopReason = readStringValue(meta?.stopReason);
if (stopReason) {
data.stopReason = stopReason;
}
const livenessState = readStringValue(meta?.livenessState);
if (livenessState) {
data.livenessState = livenessState;
}
if (meta?.replayInvalid === true) {
data.replayInvalid = true;
}
}
emitAgentEvent({
runId: params.runId,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
stream: "lifecycle",
data,
});
};
return { emit, note };
}
export async function runAgentTurnWithFallback(params: {
commandBody: string;
transcriptCommandBody?: string;
@@ -1333,6 +1393,10 @@ export async function runAgentTurnWithFallback(params: {
);
return (async () => {
let attemptCompactionCount = 0;
const lifecycleBackstop = createEmbeddedLifecycleTerminalBackstop({
runId,
sessionKey: params.sessionKey,
});
try {
const result = await runEmbeddedPiAgent({
...embeddedContext,
@@ -1405,6 +1469,7 @@ export async function runAgentTurnWithFallback(params: {
: undefined,
onReasoningEnd: params.opts?.onReasoningEnd,
onAgentEvent: async (evt) => {
lifecycleBackstop.note(evt);
if (evt.stream.startsWith("codex_app_server.")) {
emitAgentEvent({
runId,
@@ -1598,6 +1663,7 @@ export async function runAgentTurnWithFallback(params: {
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
lifecycleBackstop.emit("end", result);
const resultCompactionCount = Math.max(
0,
result.meta?.agentMeta?.compactionCount ?? 0,
@@ -1615,6 +1681,7 @@ export async function runAgentTurnWithFallback(params: {
);
}
}
lifecycleBackstop.emit("error", err);
throw err;
} finally {
autoCompactionCount += attemptCompactionCount;