diff --git a/CHANGELOG.md b/CHANGELOG.md index 177fc31070a..ef6e5e0b027 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Gateway/Docker: fail closed for non-loopback gateway starts without explicit shared-secret or trusted-proxy auth, and stop the image default command from bypassing config validation. Fixes #82865. (#82866) Thanks @coygeek. +- Agents/followups: route queued followup turns through CLI runtime backends instead of embedded harness lookup, preventing `claude-cli`/`google-gemini-cli` followups from failing before delivery. Fixes #82847. (#82857) Thanks @hclsys. - CLI/sessions: let `openclaw sessions cleanup --fix-missing` prune malformed rows with unresolvable transcript metadata instead of throwing. Fixes #80970. (#82745) Thanks @IWhatsskill. - Gateway/usage: refresh large session usage summaries in the background and reuse durable transcript metadata so `sessions.usage` no longer blocks Gateway requests on full transcript rescans. Fixes #82773. (#82778) Thanks @hclsys. - TUI: restore the submitted draft when chat is busy instead of clearing it or queueing another run. Fixes #45326. (#82774) Thanks @hyspacex. diff --git a/src/auto-reply/reply/agent-runner-cli-dispatch.ts b/src/auto-reply/reply/agent-runner-cli-dispatch.ts new file mode 100644 index 00000000000..eb13041d513 --- /dev/null +++ b/src/auto-reply/reply/agent-runner-cli-dispatch.ts @@ -0,0 +1,164 @@ +import { runCliAgent } from "../../agents/cli-runner.js"; +import type { RunCliAgentParams } from "../../agents/cli-runner/types.js"; +import type { EmbeddedPiRunResult } from "../../agents/pi-embedded.js"; +import { emitAgentEvent, onAgentEvent } from "../../infra/agent-events.js"; +import { + normalizeLowercaseStringOrEmpty, + normalizeOptionalString, +} from "../../shared/string-coerce.js"; + +function shouldBridgeCliAssistantTextToReasoning(provider: string): boolean { + return normalizeLowercaseStringOrEmpty(provider) === "claude-cli"; +} + +function createAssistantTextBridge(params: { + runId: string; + suppressed?: boolean; + deliver?: (text: string) => Promise; +}) { + const deliver = params.deliver; + if (!deliver) { + return { + unsubscribe: () => undefined, + drain: async (): Promise => undefined, + }; + } + let lastText: string | undefined; + let unsubscribed = false; + let delivery = Promise.resolve(); + const rawUnsubscribe = onAgentEvent((evt) => { + if (evt.runId !== params.runId || evt.stream !== "assistant") { + return; + } + if (params.suppressed) { + return; + } + const text = typeof evt.data.text === "string" ? evt.data.text : undefined; + if (text === undefined || text === lastText) { + return; + } + lastText = text; + delivery = delivery.then(() => deliver(text)).catch(() => undefined); + }); + return { + unsubscribe() { + if (unsubscribed) { + return; + } + unsubscribed = true; + rawUnsubscribe(); + }, + async drain(): Promise { + await delivery; + }, + }; +} + +export async function runCliAgentWithLifecycle(params: { + runId: string; + provider: string; + runParams: RunCliAgentParams; + startedAt?: number; + emitLifecycleStart?: boolean; + emitLifecycleTerminal?: boolean; + onAgentRunStart?: () => void; + suppressAssistantBridge?: boolean; + onAssistantText?: (text: string) => Promise; + onReasoningText?: (text: string) => Promise; + onErrorBeforeLifecycle?: (err: unknown) => Promise; + transformResult?: (result: EmbeddedPiRunResult) => EmbeddedPiRunResult; +}): Promise { + const startedAt = params.startedAt ?? Date.now(); + const emitLifecycleStart = params.emitLifecycleStart ?? true; + const emitLifecycleTerminal = params.emitLifecycleTerminal ?? true; + params.onAgentRunStart?.(); + if (emitLifecycleStart) { + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "start", + startedAt, + }, + }); + } + const assistantBridge = createAssistantTextBridge({ + runId: params.runId, + suppressed: params.suppressAssistantBridge, + deliver: params.onAssistantText, + }); + const reasoningBridge = createAssistantTextBridge({ + runId: params.runId, + suppressed: params.suppressAssistantBridge, + deliver: shouldBridgeCliAssistantTextToReasoning(params.provider) + ? params.onReasoningText + : undefined, + }); + let lifecycleTerminalEmitted = false; + try { + const rawResult = await runCliAgent(params.runParams); + const result = params.transformResult?.(rawResult) ?? rawResult; + assistantBridge.unsubscribe(); + reasoningBridge.unsubscribe(); + await assistantBridge.drain(); + await reasoningBridge.drain(); + + const cliText = normalizeOptionalString(result.payloads?.[0]?.text); + if (cliText) { + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { text: cliText }, + }); + } + + if (emitLifecycleTerminal) { + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt, + endedAt: Date.now(), + }, + }); + lifecycleTerminalEmitted = true; + } + return result; + } catch (err) { + assistantBridge.unsubscribe(); + reasoningBridge.unsubscribe(); + await assistantBridge.drain(); + await reasoningBridge.drain(); + await params.onErrorBeforeLifecycle?.(err); + if (emitLifecycleTerminal) { + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt, + endedAt: Date.now(), + error: String(err), + }, + }); + lifecycleTerminalEmitted = true; + } + throw err; + } finally { + assistantBridge.unsubscribe(); + reasoningBridge.unsubscribe(); + if (emitLifecycleTerminal && !lifecycleTerminalEmitted) { + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt, + endedAt: Date.now(), + error: "CLI run completed without lifecycle terminal event", + }, + }); + } + } +} diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 625a6062c3c..37588c5eb54 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -16,7 +16,6 @@ import { classifyOAuthRefreshFailure, } from "../../agents/auth-profiles/oauth-refresh-failure.js"; import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js"; -import { runCliAgent } from "../../agents/cli-runner.js"; import { getCliSessionBinding } from "../../agents/cli-session.js"; import { resolveContextTokensForModel } from "../../agents/context.js"; import { resolveAgentHarnessPolicy } from "../../agents/harness/selection.js"; @@ -56,7 +55,7 @@ import { import { resolveSilentReplyPolicy } from "../../config/silent-reply.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { logVerbose } from "../../globals.js"; -import { emitAgentEvent, onAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; +import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js"; import { CommandLane } from "../../process/lanes.js"; @@ -87,6 +86,7 @@ import { } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { resolveRunAuthProfile } from "./agent-runner-auth-profile.js"; +import { runCliAgentWithLifecycle } from "./agent-runner-cli-dispatch.js"; import { GENERIC_EXTERNAL_RUN_FAILURE_TEXT, HEARTBEAT_EXTERNAL_RUN_FAILURE_TEXT, @@ -115,10 +115,6 @@ const GPT_CHAT_BREVITY_ACK_MAX_SENTENCES = 3; const GPT_CHAT_BREVITY_SOFT_MAX_CHARS = 900; const GPT_CHAT_BREVITY_SOFT_MAX_SENTENCES = 6; -function shouldBridgeCliAssistantTextToReasoning(provider: string): boolean { - return normalizeLowercaseStringOrEmpty(provider) === "claude-cli"; -} - function readApprovalScopeValue(value: unknown): "turn" | "session" | undefined { return value === "turn" || value === "session" ? value : undefined; } @@ -1124,7 +1120,7 @@ function emitModelFallbackStepLifecycle(params: { }); } -function resolveSessionRuntimeOverrideForProvider(params: { +export function resolveSessionRuntimeOverrideForProvider(params: { provider: string; entry?: Pick; }): string | undefined { @@ -1747,16 +1743,6 @@ export async function runAgentTurnWithFallback(params: { provider); if (isCliProvider(cliExecutionProvider, runtimeConfig)) { - const startedAt = Date.now(); - notifyAgentRunStart(); - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "start", - startedAt, - }, - }); const isRoomEventCliRun = params.followupRun.currentInboundEventKind === "room_event"; const cliSessionBinding = isRoomEventCliRun ? undefined @@ -1768,195 +1754,99 @@ export async function runAgentTurnWithFallback(params: { originatingChannel: params.followupRun.originatingChannel, provider: params.sessionCtx.Provider, }); - return (async () => { - let lifecycleTerminalEmitted = false; - const createAssistantTextBridge = (deliver: (text: string) => Promise) => { - let lastText: string | undefined; - let unsubscribed = false; - let delivery = Promise.resolve(); - const rawUnsubscribe = onAgentEvent((evt) => { - if (evt.runId !== runId || evt.stream !== "assistant") { - return; - } - if (params.followupRun.run.silentExpected) { - return; - } - const text = typeof evt.data.text === "string" ? evt.data.text : undefined; - if (text === undefined || text === lastText) { - return; - } - lastText = text; - delivery = delivery.then(() => deliver(text)).catch(() => undefined); - }); - return { - unsubscribe() { - if (unsubscribed) { - return; - } - unsubscribed = true; - rawUnsubscribe(); - }, - async drain(): Promise { - await delivery; - }, - }; - }; - const noopBridge = { - unsubscribe: () => undefined, - drain: async (): Promise => undefined, - }; - const assistantBridge = createAssistantTextBridge(async (text) => { + const result = await runCliAgentWithLifecycle({ + runId, + provider: cliExecutionProvider, + onAgentRunStart: notifyAgentRunStart, + suppressAssistantBridge: params.followupRun.run.silentExpected, + onAssistantText: async (text) => { const textForTyping = await handlePartialForTyping({ text } as ReplyPayload); if (textForTyping === undefined || !params.opts?.onPartialReply) { return; } await params.opts.onPartialReply({ text: textForTyping }); - }); - const reasoningBridge = shouldBridgeCliAssistantTextToReasoning(cliExecutionProvider) - ? createAssistantTextBridge(async (text) => { - await params.opts?.onReasoningStream?.({ text }); - }) - : noopBridge; - try { - const rawResult = await runCliAgent({ - sessionId: params.followupRun.run.sessionId, - sessionKey: params.sessionKey, - agentId: params.followupRun.run.agentId, - trigger: params.isHeartbeat ? "heartbeat" : "user", - sessionFile: params.followupRun.run.sessionFile, - workspaceDir: params.followupRun.run.workspaceDir, - config: runtimeConfig, - prompt: params.commandBody, - transcriptPrompt: params.transcriptCommandBody, - currentInboundEventKind: params.followupRun.currentInboundEventKind, - currentInboundContext: params.followupRun.currentInboundContext, - inputProvenance: params.followupRun.run.inputProvenance, - provider: cliExecutionProvider, - model, - thinkLevel: params.followupRun.run.thinkLevel, - timeoutMs: params.followupRun.run.timeoutMs, - runId, - lane: runLane, - extraSystemPrompt: params.followupRun.run.extraSystemPrompt, - sourceReplyDeliveryMode: params.followupRun.run.sourceReplyDeliveryMode, - silentReplyPromptMode: params.followupRun.run.silentReplyPromptMode, - extraSystemPromptStatic: params.followupRun.run.extraSystemPromptStatic, - ownerNumbers: params.followupRun.run.ownerNumbers, - cliSessionId: cliSessionBinding?.sessionId, - cliSessionBinding, - authProfileId: authProfile.authProfileId, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature: - bootstrapPromptWarningSignaturesSeen[ - bootstrapPromptWarningSignaturesSeen.length - 1 - ], - images: params.opts?.images, - imageOrder: params.opts?.imageOrder, - skillsSnapshot: params.followupRun.run.skillsSnapshot, - messageChannel: params.followupRun.originatingChannel ?? undefined, - messageProvider: hookMessageProvider, - agentAccountId: params.followupRun.run.agentAccountId, - senderIsOwner: params.followupRun.run.senderIsOwner, - disableTools: params.opts?.disableTools, - abortSignal: params.replyOperation?.abortSignal ?? params.opts?.abortSignal, - replyOperation: params.replyOperation, - }); - const result: EmbeddedAgentRunResult = - isRoomEventCliRun && rawResult.meta.agentMeta - ? (() => { - const { cliSessionBinding: _cliSessionBinding, ...agentMeta } = - rawResult.meta.agentMeta; - return { - ...rawResult, - meta: { - ...rawResult.meta, - agentMeta: { - ...agentMeta, - sessionId: "", - }, + }, + onReasoningText: async (text) => { + await params.opts?.onReasoningStream?.({ text }); + }, + onErrorBeforeLifecycle: async () => { + if (!rollbackFallbackCandidateSelection) { + return; + } + try { + await rollbackFallbackCandidateSelection(); + clearPendingFallbackRollback(rollbackFallbackCandidateSelection); + } catch (rollbackError) { + logVerbose( + `failed to roll back fallback candidate selection (non-fatal): ${String(rollbackError)}`, + ); + } + }, + runParams: { + sessionId: params.followupRun.run.sessionId, + sessionKey: params.sessionKey, + agentId: params.followupRun.run.agentId, + trigger: params.isHeartbeat ? "heartbeat" : "user", + sessionFile: params.followupRun.run.sessionFile, + workspaceDir: params.followupRun.run.workspaceDir, + config: runtimeConfig, + prompt: params.commandBody, + transcriptPrompt: params.transcriptCommandBody, + currentInboundEventKind: params.followupRun.currentInboundEventKind, + currentInboundContext: params.followupRun.currentInboundContext, + inputProvenance: params.followupRun.run.inputProvenance, + provider: cliExecutionProvider, + model, + thinkLevel: params.followupRun.run.thinkLevel, + timeoutMs: params.followupRun.run.timeoutMs, + runId, + lane: runLane, + extraSystemPrompt: params.followupRun.run.extraSystemPrompt, + sourceReplyDeliveryMode: params.followupRun.run.sourceReplyDeliveryMode, + silentReplyPromptMode: params.followupRun.run.silentReplyPromptMode, + extraSystemPromptStatic: params.followupRun.run.extraSystemPromptStatic, + ownerNumbers: params.followupRun.run.ownerNumbers, + cliSessionId: cliSessionBinding?.sessionId, + cliSessionBinding, + authProfileId: authProfile.authProfileId, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: + bootstrapPromptWarningSignaturesSeen[ + bootstrapPromptWarningSignaturesSeen.length - 1 + ], + images: params.opts?.images, + imageOrder: params.opts?.imageOrder, + skillsSnapshot: params.followupRun.run.skillsSnapshot, + messageChannel: params.followupRun.originatingChannel ?? undefined, + messageProvider: hookMessageProvider, + agentAccountId: params.followupRun.run.agentAccountId, + senderIsOwner: params.followupRun.run.senderIsOwner, + disableTools: params.opts?.disableTools, + abortSignal: params.replyOperation?.abortSignal ?? params.opts?.abortSignal, + replyOperation: params.replyOperation, + }, + transformResult: (rawResult) => + isRoomEventCliRun && rawResult.meta.agentMeta + ? (() => { + const { cliSessionBinding: _cliSessionBinding, ...agentMeta } = + rawResult.meta.agentMeta; + return { + ...rawResult, + meta: { + ...rawResult.meta, + agentMeta: { + ...agentMeta, + sessionId: "", }, - }; - })() - : rawResult; - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - - assistantBridge.unsubscribe(); - reasoningBridge.unsubscribe(); - await assistantBridge.drain(); - await reasoningBridge.drain(); - - // CLI backends don't emit streaming assistant events, so we need to - // emit one with the final text so server-chat can populate its buffer - // and send the response to TUI/WebSocket clients. - const cliText = normalizeOptionalString(result.payloads?.[0]?.text); - if (cliText) { - emitAgentEvent({ - runId, - stream: "assistant", - data: { text: cliText }, - }); - } - - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "end", - startedAt, - endedAt: Date.now(), - }, - }); - lifecycleTerminalEmitted = true; - - return result; - } catch (err) { - assistantBridge.unsubscribe(); - reasoningBridge.unsubscribe(); - await assistantBridge.drain(); - await reasoningBridge.drain(); - if (rollbackFallbackCandidateSelection) { - try { - await rollbackFallbackCandidateSelection(); - clearPendingFallbackRollback(rollbackFallbackCandidateSelection); - } catch (rollbackError) { - logVerbose( - `failed to roll back fallback candidate selection (non-fatal): ${String(rollbackError)}`, - ); - } - } - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "error", - startedAt, - endedAt: Date.now(), - error: String(err), - }, - }); - lifecycleTerminalEmitted = true; - throw err; - } finally { - assistantBridge.unsubscribe(); - reasoningBridge.unsubscribe(); - // Defensive backstop: never let a CLI run complete without a terminal - // lifecycle event, otherwise downstream consumers can hang. - if (!lifecycleTerminalEmitted) { - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "error", - startedAt, - endedAt: Date.now(), - error: "CLI run completed without lifecycle terminal event", - }, - }); - } - } - })(); + }, + }; + })() + : rawResult, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + return result; } const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams( { diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index a8bdfd90e54..b3566eb6dae 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -8,6 +8,8 @@ import type { SessionEntry } from "../../config/sessions/types.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; const runEmbeddedPiAgentMock = vi.fn(); +const runCliAgentMock = vi.fn(); +const runWithModelFallbackMock = vi.fn(); const compactEmbeddedPiSessionMock = vi.fn(); const routeReplyMock = vi.fn(); const isRoutableChannelMock = vi.fn(); @@ -315,10 +317,9 @@ async function persistRunSessionUsageForFollowupTest( async function loadFreshFollowupRunnerModuleForTest() { vi.resetModules(); vi.doUnmock("../../config/config.js"); - vi.doMock( - "../../agents/model-fallback.js", - async () => await import("../../test-utils/model-fallback.mock.js"), - ); + vi.doMock("../../agents/model-fallback.js", () => ({ + runWithModelFallback: (params: unknown) => runWithModelFallbackMock(params), + })); vi.doMock("../../agents/session-write-lock.js", () => ({ acquireSessionWriteLock: vi.fn(async () => ({ release: async () => {}, @@ -335,6 +336,9 @@ async function loadFreshFollowupRunnerModuleForTest() { runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), waitForEmbeddedPiRunEnd: vi.fn(async () => undefined), })); + vi.doMock("../../agents/cli-runner.js", () => ({ + runCliAgent: (params: unknown) => runCliAgentMock(params), + })); vi.doMock("./queue.js", () => ({ clearFollowupQueue: clearFollowupQueueForFollowupTest, completeFollowupRunLifecycle: (run: Pick) => @@ -440,6 +444,23 @@ beforeAll(async () => { beforeEach(() => { clearRuntimeConfigSnapshot?.(); runEmbeddedPiAgentMock.mockReset(); + runCliAgentMock.mockReset(); + runWithModelFallbackMock.mockReset(); + runWithModelFallbackMock.mockImplementation( + async (params: { + provider: string; + model: string; + run: ( + provider: string, + model: string, + options?: { allowTransientCooldownProbe?: boolean }, + ) => Promise; + }) => ({ + result: await params.run(params.provider, params.model), + provider: params.provider, + model: params.model, + }), + ); compactEmbeddedPiSessionMock.mockReset(); runPreflightCompactionIfNeededMock.mockReset(); resolveCommandSecretRefsViaGatewayMock.mockReset(); @@ -657,6 +678,153 @@ describe("createFollowupRunner auto fallback primary probes", () => { }); describe("createFollowupRunner runtime config", () => { + it("routes queued followups through CLI runtime dispatch when the model selects a CLI backend", async () => { + const runtimeConfig: OpenClawConfig = { + agents: { + defaults: { + cliBackends: { + "claude-cli": { command: "claude" }, + }, + models: { + "anthropic/claude-opus-4-7": { agentRuntime: { id: "claude-cli" } }, + }, + }, + }, + }; + const sessionEntry: SessionEntry = { + sessionId: "session-cli-followup", + updatedAt: Date.now(), + cliSessionBindings: { + "claude-cli": { + sessionId: "cli-session-1", + }, + }, + }; + const sessionStore = { main: sessionEntry }; + runCliAgentMock.mockResolvedValueOnce({ + payloads: [], + meta: { + agentMeta: { + provider: "claude-cli", + model: "claude-opus-4-7", + }, + }, + }); + + const runner = createFollowupRunner({ + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey: "main", + defaultModel: "anthropic/claude-opus-4-7", + }); + + await runner( + createQueuedRun({ + originatingChannel: "telegram", + run: { + config: runtimeConfig, + provider: "anthropic", + model: "claude-opus-4-7", + messageProvider: "telegram", + }, + }), + ); + + expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled(); + expect(runCliAgentMock).toHaveBeenCalledTimes(1); + const call = requireLastMockCallArg(runCliAgentMock, "run cli agent"); + expect(call.provider).toBe("claude-cli"); + expect(call.model).toBe("claude-opus-4-7"); + expect(call.config).toBe(runtimeConfig); + expect(call.cliSessionId).toBe("cli-session-1"); + expect(call.messageChannel).toBe("telegram"); + }); + + it("defers queued CLI attempt terminal lifecycle events until fallback settles", async () => { + const realAgentEvents = await vi.importActual( + "../../infra/agent-events.js", + ); + const lifecyclePhases: string[] = []; + const unsubscribe = realAgentEvents.onAgentEvent((evt) => { + if (evt.stream !== "lifecycle") { + return; + } + const phase = typeof evt.data.phase === "string" ? evt.data.phase : undefined; + if (phase) { + lifecyclePhases.push(phase); + } + }); + const runtimeConfig: OpenClawConfig = { + agents: { + defaults: { + cliBackends: { + "claude-cli": { command: "claude" }, + }, + models: { + "anthropic/claude-opus-4-7": { agentRuntime: { id: "claude-cli" } }, + }, + }, + }, + }; + runWithModelFallbackMock.mockImplementationOnce( + async (params: { run: (provider: string, model: string) => Promise }) => { + await expect(params.run("anthropic", "claude-opus-4-7")).rejects.toThrow("cli failed"); + return { + result: await params.run("openai", "gpt-5.4"), + provider: "openai", + model: "gpt-5.4", + }; + }, + ); + runCliAgentMock.mockRejectedValueOnce(new Error("cli failed")); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params: { runId: string }) => { + realAgentEvents.emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { phase: "start", startedAt: Date.now() }, + }); + realAgentEvents.emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { phase: "end", endedAt: Date.now() }, + }); + return { + payloads: [{ text: "fallback ok" }], + meta: {}, + }; + }); + + const runner = createFollowupRunner({ + typing: createMockTypingController(), + typingMode: "instant", + sessionKey: "main", + defaultModel: "anthropic/claude-opus-4-7", + }); + + try { + await runner( + createQueuedRun({ + originatingChannel: "telegram", + originatingTo: "chat-1", + run: { + config: runtimeConfig, + provider: "anthropic", + model: "claude-opus-4-7", + messageProvider: "telegram", + }, + }), + ); + } finally { + unsubscribe(); + } + + expect(runCliAgentMock).toHaveBeenCalledTimes(1); + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + expect(lifecyclePhases).toEqual(["start", "start", "end"]); + }); + it("uses the active runtime snapshot for queued embedded followup runs", async () => { const sourceConfig: OpenClawConfig = { models: { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 957de558f74..e1de0fc8d16 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -6,9 +6,12 @@ import { markAutoFallbackPrimaryProbe, } from "../../agents/agent-scope.js"; import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js"; +import { getCliSessionBinding } from "../../agents/cli-session.js"; import { resolveContextTokensForModel } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; +import { resolveCliRuntimeExecutionProvider } from "../../agents/model-runtime-aliases.js"; +import { isCliProvider } from "../../agents/model-selection-cli.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { buildAgentRuntimeDeliveryPlan, @@ -17,12 +20,16 @@ import { import { updateSessionStore, type SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; -import { registerAgentRunContext } from "../../infra/agent-events.js"; +import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { defaultRuntime } from "../../runtime.js"; import { isInternalMessageChannel } from "../../utils/message-channel.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { resolveRunAfterAutoFallbackPrimaryProbeRecheck } from "./agent-runner-execution.js"; +import { runCliAgentWithLifecycle } from "./agent-runner-cli-dispatch.js"; +import { + resolveRunAfterAutoFallbackPrimaryProbeRecheck, + resolveSessionRuntimeOverrideForProvider, +} from "./agent-runner-execution.js"; import { runPreflightCompactionIfNeeded } from "./agent-runner-memory.js"; import { resolveQueuedReplyExecutionConfig, @@ -351,6 +358,13 @@ export function createFollowupRunner(params: { fallbackProvider = run.provider; fallbackModel = run.model; replyOperation.setPhase("running"); + let pendingDeferredCliTerminal: + | { + provider: string; + model: string; + startedAt: number; + } + | undefined; try { const outcomePlan = buildAgentRuntimeOutcomePlan(); const fallbackResult = await runWithModelFallback({ @@ -371,8 +385,114 @@ export function createFollowupRunner(params: { const authProfile = resolveRunAuthProfile(candidateRun, provider, { config: runtimeConfig, }); + const sessionRuntimeOverride = resolveSessionRuntimeOverrideForProvider({ + provider, + entry: activeSessionEntry, + }); + const cliExecutionProvider = + sessionRuntimeOverride === "pi" + ? provider + : ((sessionRuntimeOverride && isCliProvider(sessionRuntimeOverride, runtimeConfig) + ? sessionRuntimeOverride + : undefined) ?? + resolveCliRuntimeExecutionProvider({ + provider, + cfg: runtimeConfig, + agentId: run.agentId, + modelId: model, + }) ?? + provider); let attemptCompactionCount = 0; try { + if (isCliProvider(cliExecutionProvider, runtimeConfig)) { + const isRoomEventCliRun = queued.currentInboundEventKind === "room_event"; + const cliSessionBinding = isRoomEventCliRun + ? undefined + : getCliSessionBinding(activeSessionEntry, cliExecutionProvider); + const cliLifecycleStartedAt = Date.now(); + pendingDeferredCliTerminal = { + provider, + model, + startedAt: cliLifecycleStartedAt, + }; + const result = await runCliAgentWithLifecycle({ + runId, + provider: cliExecutionProvider, + startedAt: cliLifecycleStartedAt, + emitLifecycleTerminal: false, + onAgentRunStart: () => opts?.onAgentRunStart?.(runId), + suppressAssistantBridge: run.silentExpected, + runParams: { + replyOperation, + sessionId: run.sessionId, + sessionKey: replySessionKey, + agentId: run.agentId, + trigger: opts?.isHeartbeat === true ? "heartbeat" : "user", + sessionFile: run.sessionFile, + workspaceDir: run.workspaceDir, + config: runtimeConfig, + prompt: queued.prompt, + transcriptPrompt: queued.transcriptPrompt, + currentInboundEventKind: queued.currentInboundEventKind, + currentInboundContext: queued.currentInboundContext, + inputProvenance: run.inputProvenance, + provider: cliExecutionProvider, + model, + ...resolveRunAuthProfile(candidateRun, cliExecutionProvider, { + config: runtimeConfig, + }), + thinkLevel: run.thinkLevel, + timeoutMs: run.timeoutMs, + runId, + extraSystemPrompt: run.extraSystemPrompt, + sourceReplyDeliveryMode: run.sourceReplyDeliveryMode, + silentReplyPromptMode: run.silentReplyPromptMode, + extraSystemPromptStatic: run.extraSystemPromptStatic, + ownerNumbers: run.ownerNumbers, + cliSessionId: cliSessionBinding?.sessionId, + cliSessionBinding, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: + bootstrapPromptWarningSignaturesSeen[ + bootstrapPromptWarningSignaturesSeen.length - 1 + ], + images: queuedImages, + imageOrder: queuedImageOrder, + skillsSnapshot: run.skillsSnapshot, + messageChannel: queued.originatingChannel ?? undefined, + messageProvider: resolveOriginMessageProvider({ + originatingChannel: queued.originatingChannel, + provider: run.messageProvider, + }), + agentAccountId: run.agentAccountId, + senderIsOwner: run.senderIsOwner, + disableTools: opts?.disableTools, + abortSignal: queued.abortSignal ?? opts?.abortSignal, + }, + transformResult: (rawResult) => + isRoomEventCliRun && rawResult.meta.agentMeta + ? (() => { + const { cliSessionBinding: _cliSessionBinding, ...agentMeta } = + rawResult.meta.agentMeta; + return { + ...rawResult, + meta: { + ...rawResult.meta, + agentMeta: { + ...agentMeta, + sessionId: "", + }, + }, + }; + })() + : rawResult, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + return result; + } + pendingDeferredCliTerminal = undefined; const result = await runEmbeddedPiAgent({ allowGatewaySubagentBinding: true, replyOperation, @@ -466,6 +586,22 @@ export function createFollowupRunner(params: { runResult = fallbackResult.result; fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; + if ( + pendingDeferredCliTerminal && + pendingDeferredCliTerminal.provider === fallbackProvider && + pendingDeferredCliTerminal.model === fallbackModel + ) { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: pendingDeferredCliTerminal.startedAt, + endedAt: Date.now(), + }, + }); + } + pendingDeferredCliTerminal = undefined; await clearRecoveredAutoFallbackPrimaryProbe({ provider: fallbackProvider, model: fallbackModel, @@ -473,6 +609,19 @@ export function createFollowupRunner(params: { } catch (err) { const message = formatErrorMessage(err); replyOperation.fail("run_failed", err); + if (pendingDeferredCliTerminal) { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt: pendingDeferredCliTerminal.startedAt, + endedAt: Date.now(), + error: message, + }, + }); + pendingDeferredCliTerminal = undefined; + } defaultRuntime.error?.(`Followup agent failed before reply: ${message}`); return; }