diff --git a/CHANGELOG.md b/CHANGELOG.md index 13a0c323b4d..910676c8004 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/sessions: emit a terminal lifecycle backstop when embedded timeout/error turns return without `agent_end`, so Gateway sessions no longer stay stuck in `running` after failover surfaces a timeout. Fixes #74607. Thanks @millerc79. - Agents/Codex: bound embedded-run cleanup, trajectory flushing, and command-lane task timeouts after runtime failures, so Discord and other chat sessions return to idle instead of staying stuck in processing. Thanks @vincentkoc. - Heartbeat/exec: consume successful metadata-only async exec completions silently so Telegram and other chat surfaces no longer ask users for missing command logs after `No session found`. Fixes #74595. Thanks @gkoch02. - Web fetch: add a documented `tools.web.fetch.ssrfPolicy.allowIpv6UniqueLocalRange` opt-in and thread it through cache keys and DNS/IP checks so trusted fake-IP proxy stacks using `fc00::/7` can work without broad private-network access. Fixes #74351. Thanks @jeffrey701. diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index 63ef7029f00..f4343983a7d 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -1194,6 +1194,110 @@ describe("runAgentTurnWithFallback", () => { }); }); + it("emits an embedded lifecycle terminal backstop when the runner returns without one", async () => { + const agentEvents = await import("../../infra/agent-events.js"); + const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ + stream: "lifecycle", + data: { phase: "start", startedAt: 1_000 }, + }); + return { + payloads: [{ text: "Request timed out before a response was generated.", isError: true }], + meta: { aborted: true, livenessState: "blocked", replayInvalid: true }, + }; + }); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun: createFollowupRun(), + sessionCtx: { + Provider: "whatsapp", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: { runId: "run-timeout" } as GetReplyOptions, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("success"); + expect(emitAgentEvent).toHaveBeenCalledWith({ + runId: "run-timeout", + sessionKey: "main", + stream: "lifecycle", + data: { + phase: "end", + startedAt: 1_000, + endedAt: expect.any(Number), + aborted: true, + livenessState: "blocked", + replayInvalid: true, + }, + }); + }); + + it("does not duplicate embedded lifecycle terminal events already reported by the runner", async () => { + const agentEvents = await import("../../infra/agent-events.js"); + const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ + stream: "lifecycle", + data: { phase: "start", startedAt: 1_000 }, + }); + await params.onAgentEvent?.({ + stream: "lifecycle", + data: { phase: "end", endedAt: 1_500 }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun: createFollowupRun(), + sessionCtx: { + Provider: "whatsapp", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: { runId: "run-complete" } as GetReplyOptions, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("success"); + expect(emitAgentEvent).not.toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-complete", + stream: "lifecycle", + }), + ); + }); + it("trims chatty GPT ack-turn final prose", async () => { state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({ result: await params.run("openai", "gpt-5.4"), diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 3b8be3344c5..498d1d8cd57 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -807,6 +807,66 @@ function isReplyOperationRestartAbort(replyOperation?: ReplyOperation): boolean ); } +function createEmbeddedLifecycleTerminalBackstop(params: { runId: string; sessionKey?: string }) { + let terminalEmitted = false; + let startedAt: number | undefined; + + const note = (evt: { stream: string; data: Record }) => { + if (evt.stream !== "lifecycle") { + return; + } + const phase = readStringValue(evt.data.phase); + if (phase === "start" && typeof evt.data.startedAt === "number") { + startedAt = evt.data.startedAt; + } + if (phase === "end" || phase === "error") { + terminalEmitted = true; + } + }; + + const emit = (phase: "end" | "error", resultOrError: unknown) => { + if (terminalEmitted) { + return; + } + terminalEmitted = true; + const data: Record = { + phase, + endedAt: Date.now(), + ...(startedAt !== undefined ? { startedAt } : {}), + }; + if (phase === "error") { + data.error = formatErrorMessage(resultOrError); + } else { + const meta = + resultOrError && typeof resultOrError === "object" && "meta" in resultOrError + ? (resultOrError as { meta?: Record }).meta + : undefined; + if (meta?.aborted === true) { + data.aborted = true; + } + const stopReason = readStringValue(meta?.stopReason); + if (stopReason) { + data.stopReason = stopReason; + } + const livenessState = readStringValue(meta?.livenessState); + if (livenessState) { + data.livenessState = livenessState; + } + if (meta?.replayInvalid === true) { + data.replayInvalid = true; + } + } + emitAgentEvent({ + runId: params.runId, + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + stream: "lifecycle", + data, + }); + }; + + return { emit, note }; +} + export async function runAgentTurnWithFallback(params: { commandBody: string; transcriptCommandBody?: string; @@ -1333,6 +1393,10 @@ export async function runAgentTurnWithFallback(params: { ); return (async () => { let attemptCompactionCount = 0; + const lifecycleBackstop = createEmbeddedLifecycleTerminalBackstop({ + runId, + sessionKey: params.sessionKey, + }); try { const result = await runEmbeddedPiAgent({ ...embeddedContext, @@ -1405,6 +1469,7 @@ export async function runAgentTurnWithFallback(params: { : undefined, onReasoningEnd: params.opts?.onReasoningEnd, onAgentEvent: async (evt) => { + lifecycleBackstop.note(evt); if (evt.stream.startsWith("codex_app_server.")) { emitAgentEvent({ runId, @@ -1598,6 +1663,7 @@ export async function runAgentTurnWithFallback(params: { bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( result.meta?.systemPromptReport, ); + lifecycleBackstop.emit("end", result); const resultCompactionCount = Math.max( 0, result.meta?.agentMeta?.compactionCount ?? 0, @@ -1615,6 +1681,7 @@ export async function runAgentTurnWithFallback(params: { ); } } + lifecycleBackstop.emit("error", err); throw err; } finally { autoCompactionCount += attemptCompactionCount;