mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 21:34:46 +00:00
* fix(followup): route CLI runtime drains through CLI runner * fix(followup): route queued CLI runtimes --------- Co-authored-by: hclsys <hclsys@openclaw.ai> Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
@@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Gateway/Docker: fail closed for non-loopback gateway starts without explicit shared-secret or trusted-proxy auth, and stop the image default command from bypassing config validation. Fixes #82865. (#82866) Thanks @coygeek.
|
||||
- Agents/followups: route queued followup turns through CLI runtime backends instead of embedded harness lookup, preventing `claude-cli`/`google-gemini-cli` followups from failing before delivery. Fixes #82847. (#82857) Thanks @hclsys.
|
||||
- CLI/sessions: let `openclaw sessions cleanup --fix-missing` prune malformed rows with unresolvable transcript metadata instead of throwing. Fixes #80970. (#82745) Thanks @IWhatsskill.
|
||||
- Gateway/usage: refresh large session usage summaries in the background and reuse durable transcript metadata so `sessions.usage` no longer blocks Gateway requests on full transcript rescans. Fixes #82773. (#82778) Thanks @hclsys.
|
||||
- TUI: restore the submitted draft when chat is busy instead of clearing it or queueing another run. Fixes #45326. (#82774) Thanks @hyspacex.
|
||||
|
||||
164
src/auto-reply/reply/agent-runner-cli-dispatch.ts
Normal file
164
src/auto-reply/reply/agent-runner-cli-dispatch.ts
Normal file
@@ -0,0 +1,164 @@
|
||||
import { runCliAgent } from "../../agents/cli-runner.js";
|
||||
import type { RunCliAgentParams } from "../../agents/cli-runner/types.js";
|
||||
import type { EmbeddedPiRunResult } from "../../agents/pi-embedded.js";
|
||||
import { emitAgentEvent, onAgentEvent } from "../../infra/agent-events.js";
|
||||
import {
|
||||
normalizeLowercaseStringOrEmpty,
|
||||
normalizeOptionalString,
|
||||
} from "../../shared/string-coerce.js";
|
||||
|
||||
function shouldBridgeCliAssistantTextToReasoning(provider: string): boolean {
|
||||
return normalizeLowercaseStringOrEmpty(provider) === "claude-cli";
|
||||
}
|
||||
|
||||
function createAssistantTextBridge(params: {
|
||||
runId: string;
|
||||
suppressed?: boolean;
|
||||
deliver?: (text: string) => Promise<void>;
|
||||
}) {
|
||||
const deliver = params.deliver;
|
||||
if (!deliver) {
|
||||
return {
|
||||
unsubscribe: () => undefined,
|
||||
drain: async (): Promise<void> => undefined,
|
||||
};
|
||||
}
|
||||
let lastText: string | undefined;
|
||||
let unsubscribed = false;
|
||||
let delivery = Promise.resolve();
|
||||
const rawUnsubscribe = onAgentEvent((evt) => {
|
||||
if (evt.runId !== params.runId || evt.stream !== "assistant") {
|
||||
return;
|
||||
}
|
||||
if (params.suppressed) {
|
||||
return;
|
||||
}
|
||||
const text = typeof evt.data.text === "string" ? evt.data.text : undefined;
|
||||
if (text === undefined || text === lastText) {
|
||||
return;
|
||||
}
|
||||
lastText = text;
|
||||
delivery = delivery.then(() => deliver(text)).catch(() => undefined);
|
||||
});
|
||||
return {
|
||||
unsubscribe() {
|
||||
if (unsubscribed) {
|
||||
return;
|
||||
}
|
||||
unsubscribed = true;
|
||||
rawUnsubscribe();
|
||||
},
|
||||
async drain(): Promise<void> {
|
||||
await delivery;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function runCliAgentWithLifecycle(params: {
|
||||
runId: string;
|
||||
provider: string;
|
||||
runParams: RunCliAgentParams;
|
||||
startedAt?: number;
|
||||
emitLifecycleStart?: boolean;
|
||||
emitLifecycleTerminal?: boolean;
|
||||
onAgentRunStart?: () => void;
|
||||
suppressAssistantBridge?: boolean;
|
||||
onAssistantText?: (text: string) => Promise<void>;
|
||||
onReasoningText?: (text: string) => Promise<void>;
|
||||
onErrorBeforeLifecycle?: (err: unknown) => Promise<void>;
|
||||
transformResult?: (result: EmbeddedPiRunResult) => EmbeddedPiRunResult;
|
||||
}): Promise<EmbeddedPiRunResult> {
|
||||
const startedAt = params.startedAt ?? Date.now();
|
||||
const emitLifecycleStart = params.emitLifecycleStart ?? true;
|
||||
const emitLifecycleTerminal = params.emitLifecycleTerminal ?? true;
|
||||
params.onAgentRunStart?.();
|
||||
if (emitLifecycleStart) {
|
||||
emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "start",
|
||||
startedAt,
|
||||
},
|
||||
});
|
||||
}
|
||||
const assistantBridge = createAssistantTextBridge({
|
||||
runId: params.runId,
|
||||
suppressed: params.suppressAssistantBridge,
|
||||
deliver: params.onAssistantText,
|
||||
});
|
||||
const reasoningBridge = createAssistantTextBridge({
|
||||
runId: params.runId,
|
||||
suppressed: params.suppressAssistantBridge,
|
||||
deliver: shouldBridgeCliAssistantTextToReasoning(params.provider)
|
||||
? params.onReasoningText
|
||||
: undefined,
|
||||
});
|
||||
let lifecycleTerminalEmitted = false;
|
||||
try {
|
||||
const rawResult = await runCliAgent(params.runParams);
|
||||
const result = params.transformResult?.(rawResult) ?? rawResult;
|
||||
assistantBridge.unsubscribe();
|
||||
reasoningBridge.unsubscribe();
|
||||
await assistantBridge.drain();
|
||||
await reasoningBridge.drain();
|
||||
|
||||
const cliText = normalizeOptionalString(result.payloads?.[0]?.text);
|
||||
if (cliText) {
|
||||
emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "assistant",
|
||||
data: { text: cliText },
|
||||
});
|
||||
}
|
||||
|
||||
if (emitLifecycleTerminal) {
|
||||
emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
startedAt,
|
||||
endedAt: Date.now(),
|
||||
},
|
||||
});
|
||||
lifecycleTerminalEmitted = true;
|
||||
}
|
||||
return result;
|
||||
} catch (err) {
|
||||
assistantBridge.unsubscribe();
|
||||
reasoningBridge.unsubscribe();
|
||||
await assistantBridge.drain();
|
||||
await reasoningBridge.drain();
|
||||
await params.onErrorBeforeLifecycle?.(err);
|
||||
if (emitLifecycleTerminal) {
|
||||
emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
startedAt,
|
||||
endedAt: Date.now(),
|
||||
error: String(err),
|
||||
},
|
||||
});
|
||||
lifecycleTerminalEmitted = true;
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
assistantBridge.unsubscribe();
|
||||
reasoningBridge.unsubscribe();
|
||||
if (emitLifecycleTerminal && !lifecycleTerminalEmitted) {
|
||||
emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
startedAt,
|
||||
endedAt: Date.now(),
|
||||
error: "CLI run completed without lifecycle terminal event",
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,6 @@ import {
|
||||
classifyOAuthRefreshFailure,
|
||||
} from "../../agents/auth-profiles/oauth-refresh-failure.js";
|
||||
import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js";
|
||||
import { runCliAgent } from "../../agents/cli-runner.js";
|
||||
import { getCliSessionBinding } from "../../agents/cli-session.js";
|
||||
import { resolveContextTokensForModel } from "../../agents/context.js";
|
||||
import { resolveAgentHarnessPolicy } from "../../agents/harness/selection.js";
|
||||
@@ -56,7 +55,7 @@ import {
|
||||
import { resolveSilentReplyPolicy } from "../../config/silent-reply.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { emitAgentEvent, onAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js";
|
||||
import { CommandLane } from "../../process/lanes.js";
|
||||
@@ -87,6 +86,7 @@ import {
|
||||
} from "../tokens.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { resolveRunAuthProfile } from "./agent-runner-auth-profile.js";
|
||||
import { runCliAgentWithLifecycle } from "./agent-runner-cli-dispatch.js";
|
||||
import {
|
||||
GENERIC_EXTERNAL_RUN_FAILURE_TEXT,
|
||||
HEARTBEAT_EXTERNAL_RUN_FAILURE_TEXT,
|
||||
@@ -115,10 +115,6 @@ const GPT_CHAT_BREVITY_ACK_MAX_SENTENCES = 3;
|
||||
const GPT_CHAT_BREVITY_SOFT_MAX_CHARS = 900;
|
||||
const GPT_CHAT_BREVITY_SOFT_MAX_SENTENCES = 6;
|
||||
|
||||
function shouldBridgeCliAssistantTextToReasoning(provider: string): boolean {
|
||||
return normalizeLowercaseStringOrEmpty(provider) === "claude-cli";
|
||||
}
|
||||
|
||||
function readApprovalScopeValue(value: unknown): "turn" | "session" | undefined {
|
||||
return value === "turn" || value === "session" ? value : undefined;
|
||||
}
|
||||
@@ -1124,7 +1120,7 @@ function emitModelFallbackStepLifecycle(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function resolveSessionRuntimeOverrideForProvider(params: {
|
||||
export function resolveSessionRuntimeOverrideForProvider(params: {
|
||||
provider: string;
|
||||
entry?: Pick<SessionEntry, "agentRuntimeOverride">;
|
||||
}): string | undefined {
|
||||
@@ -1747,16 +1743,6 @@ export async function runAgentTurnWithFallback(params: {
|
||||
provider);
|
||||
|
||||
if (isCliProvider(cliExecutionProvider, runtimeConfig)) {
|
||||
const startedAt = Date.now();
|
||||
notifyAgentRunStart();
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "start",
|
||||
startedAt,
|
||||
},
|
||||
});
|
||||
const isRoomEventCliRun = params.followupRun.currentInboundEventKind === "room_event";
|
||||
const cliSessionBinding = isRoomEventCliRun
|
||||
? undefined
|
||||
@@ -1768,195 +1754,99 @@ export async function runAgentTurnWithFallback(params: {
|
||||
originatingChannel: params.followupRun.originatingChannel,
|
||||
provider: params.sessionCtx.Provider,
|
||||
});
|
||||
return (async () => {
|
||||
let lifecycleTerminalEmitted = false;
|
||||
const createAssistantTextBridge = (deliver: (text: string) => Promise<void>) => {
|
||||
let lastText: string | undefined;
|
||||
let unsubscribed = false;
|
||||
let delivery = Promise.resolve();
|
||||
const rawUnsubscribe = onAgentEvent((evt) => {
|
||||
if (evt.runId !== runId || evt.stream !== "assistant") {
|
||||
return;
|
||||
}
|
||||
if (params.followupRun.run.silentExpected) {
|
||||
return;
|
||||
}
|
||||
const text = typeof evt.data.text === "string" ? evt.data.text : undefined;
|
||||
if (text === undefined || text === lastText) {
|
||||
return;
|
||||
}
|
||||
lastText = text;
|
||||
delivery = delivery.then(() => deliver(text)).catch(() => undefined);
|
||||
});
|
||||
return {
|
||||
unsubscribe() {
|
||||
if (unsubscribed) {
|
||||
return;
|
||||
}
|
||||
unsubscribed = true;
|
||||
rawUnsubscribe();
|
||||
},
|
||||
async drain(): Promise<void> {
|
||||
await delivery;
|
||||
},
|
||||
};
|
||||
};
|
||||
const noopBridge = {
|
||||
unsubscribe: () => undefined,
|
||||
drain: async (): Promise<void> => undefined,
|
||||
};
|
||||
const assistantBridge = createAssistantTextBridge(async (text) => {
|
||||
const result = await runCliAgentWithLifecycle({
|
||||
runId,
|
||||
provider: cliExecutionProvider,
|
||||
onAgentRunStart: notifyAgentRunStart,
|
||||
suppressAssistantBridge: params.followupRun.run.silentExpected,
|
||||
onAssistantText: async (text) => {
|
||||
const textForTyping = await handlePartialForTyping({ text } as ReplyPayload);
|
||||
if (textForTyping === undefined || !params.opts?.onPartialReply) {
|
||||
return;
|
||||
}
|
||||
await params.opts.onPartialReply({ text: textForTyping });
|
||||
});
|
||||
const reasoningBridge = shouldBridgeCliAssistantTextToReasoning(cliExecutionProvider)
|
||||
? createAssistantTextBridge(async (text) => {
|
||||
await params.opts?.onReasoningStream?.({ text });
|
||||
})
|
||||
: noopBridge;
|
||||
try {
|
||||
const rawResult = await runCliAgent({
|
||||
sessionId: params.followupRun.run.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.followupRun.run.agentId,
|
||||
trigger: params.isHeartbeat ? "heartbeat" : "user",
|
||||
sessionFile: params.followupRun.run.sessionFile,
|
||||
workspaceDir: params.followupRun.run.workspaceDir,
|
||||
config: runtimeConfig,
|
||||
prompt: params.commandBody,
|
||||
transcriptPrompt: params.transcriptCommandBody,
|
||||
currentInboundEventKind: params.followupRun.currentInboundEventKind,
|
||||
currentInboundContext: params.followupRun.currentInboundContext,
|
||||
inputProvenance: params.followupRun.run.inputProvenance,
|
||||
provider: cliExecutionProvider,
|
||||
model,
|
||||
thinkLevel: params.followupRun.run.thinkLevel,
|
||||
timeoutMs: params.followupRun.run.timeoutMs,
|
||||
runId,
|
||||
lane: runLane,
|
||||
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
|
||||
sourceReplyDeliveryMode: params.followupRun.run.sourceReplyDeliveryMode,
|
||||
silentReplyPromptMode: params.followupRun.run.silentReplyPromptMode,
|
||||
extraSystemPromptStatic: params.followupRun.run.extraSystemPromptStatic,
|
||||
ownerNumbers: params.followupRun.run.ownerNumbers,
|
||||
cliSessionId: cliSessionBinding?.sessionId,
|
||||
cliSessionBinding,
|
||||
authProfileId: authProfile.authProfileId,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature:
|
||||
bootstrapPromptWarningSignaturesSeen[
|
||||
bootstrapPromptWarningSignaturesSeen.length - 1
|
||||
],
|
||||
images: params.opts?.images,
|
||||
imageOrder: params.opts?.imageOrder,
|
||||
skillsSnapshot: params.followupRun.run.skillsSnapshot,
|
||||
messageChannel: params.followupRun.originatingChannel ?? undefined,
|
||||
messageProvider: hookMessageProvider,
|
||||
agentAccountId: params.followupRun.run.agentAccountId,
|
||||
senderIsOwner: params.followupRun.run.senderIsOwner,
|
||||
disableTools: params.opts?.disableTools,
|
||||
abortSignal: params.replyOperation?.abortSignal ?? params.opts?.abortSignal,
|
||||
replyOperation: params.replyOperation,
|
||||
});
|
||||
const result: EmbeddedAgentRunResult =
|
||||
isRoomEventCliRun && rawResult.meta.agentMeta
|
||||
? (() => {
|
||||
const { cliSessionBinding: _cliSessionBinding, ...agentMeta } =
|
||||
rawResult.meta.agentMeta;
|
||||
return {
|
||||
...rawResult,
|
||||
meta: {
|
||||
...rawResult.meta,
|
||||
agentMeta: {
|
||||
...agentMeta,
|
||||
sessionId: "",
|
||||
},
|
||||
},
|
||||
onReasoningText: async (text) => {
|
||||
await params.opts?.onReasoningStream?.({ text });
|
||||
},
|
||||
onErrorBeforeLifecycle: async () => {
|
||||
if (!rollbackFallbackCandidateSelection) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await rollbackFallbackCandidateSelection();
|
||||
clearPendingFallbackRollback(rollbackFallbackCandidateSelection);
|
||||
} catch (rollbackError) {
|
||||
logVerbose(
|
||||
`failed to roll back fallback candidate selection (non-fatal): ${String(rollbackError)}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
runParams: {
|
||||
sessionId: params.followupRun.run.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.followupRun.run.agentId,
|
||||
trigger: params.isHeartbeat ? "heartbeat" : "user",
|
||||
sessionFile: params.followupRun.run.sessionFile,
|
||||
workspaceDir: params.followupRun.run.workspaceDir,
|
||||
config: runtimeConfig,
|
||||
prompt: params.commandBody,
|
||||
transcriptPrompt: params.transcriptCommandBody,
|
||||
currentInboundEventKind: params.followupRun.currentInboundEventKind,
|
||||
currentInboundContext: params.followupRun.currentInboundContext,
|
||||
inputProvenance: params.followupRun.run.inputProvenance,
|
||||
provider: cliExecutionProvider,
|
||||
model,
|
||||
thinkLevel: params.followupRun.run.thinkLevel,
|
||||
timeoutMs: params.followupRun.run.timeoutMs,
|
||||
runId,
|
||||
lane: runLane,
|
||||
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
|
||||
sourceReplyDeliveryMode: params.followupRun.run.sourceReplyDeliveryMode,
|
||||
silentReplyPromptMode: params.followupRun.run.silentReplyPromptMode,
|
||||
extraSystemPromptStatic: params.followupRun.run.extraSystemPromptStatic,
|
||||
ownerNumbers: params.followupRun.run.ownerNumbers,
|
||||
cliSessionId: cliSessionBinding?.sessionId,
|
||||
cliSessionBinding,
|
||||
authProfileId: authProfile.authProfileId,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature:
|
||||
bootstrapPromptWarningSignaturesSeen[
|
||||
bootstrapPromptWarningSignaturesSeen.length - 1
|
||||
],
|
||||
images: params.opts?.images,
|
||||
imageOrder: params.opts?.imageOrder,
|
||||
skillsSnapshot: params.followupRun.run.skillsSnapshot,
|
||||
messageChannel: params.followupRun.originatingChannel ?? undefined,
|
||||
messageProvider: hookMessageProvider,
|
||||
agentAccountId: params.followupRun.run.agentAccountId,
|
||||
senderIsOwner: params.followupRun.run.senderIsOwner,
|
||||
disableTools: params.opts?.disableTools,
|
||||
abortSignal: params.replyOperation?.abortSignal ?? params.opts?.abortSignal,
|
||||
replyOperation: params.replyOperation,
|
||||
},
|
||||
transformResult: (rawResult) =>
|
||||
isRoomEventCliRun && rawResult.meta.agentMeta
|
||||
? (() => {
|
||||
const { cliSessionBinding: _cliSessionBinding, ...agentMeta } =
|
||||
rawResult.meta.agentMeta;
|
||||
return {
|
||||
...rawResult,
|
||||
meta: {
|
||||
...rawResult.meta,
|
||||
agentMeta: {
|
||||
...agentMeta,
|
||||
sessionId: "",
|
||||
},
|
||||
};
|
||||
})()
|
||||
: rawResult;
|
||||
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
|
||||
result.meta?.systemPromptReport,
|
||||
);
|
||||
|
||||
assistantBridge.unsubscribe();
|
||||
reasoningBridge.unsubscribe();
|
||||
await assistantBridge.drain();
|
||||
await reasoningBridge.drain();
|
||||
|
||||
// CLI backends don't emit streaming assistant events, so we need to
|
||||
// emit one with the final text so server-chat can populate its buffer
|
||||
// and send the response to TUI/WebSocket clients.
|
||||
const cliText = normalizeOptionalString(result.payloads?.[0]?.text);
|
||||
if (cliText) {
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "assistant",
|
||||
data: { text: cliText },
|
||||
});
|
||||
}
|
||||
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
startedAt,
|
||||
endedAt: Date.now(),
|
||||
},
|
||||
});
|
||||
lifecycleTerminalEmitted = true;
|
||||
|
||||
return result;
|
||||
} catch (err) {
|
||||
assistantBridge.unsubscribe();
|
||||
reasoningBridge.unsubscribe();
|
||||
await assistantBridge.drain();
|
||||
await reasoningBridge.drain();
|
||||
if (rollbackFallbackCandidateSelection) {
|
||||
try {
|
||||
await rollbackFallbackCandidateSelection();
|
||||
clearPendingFallbackRollback(rollbackFallbackCandidateSelection);
|
||||
} catch (rollbackError) {
|
||||
logVerbose(
|
||||
`failed to roll back fallback candidate selection (non-fatal): ${String(rollbackError)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
startedAt,
|
||||
endedAt: Date.now(),
|
||||
error: String(err),
|
||||
},
|
||||
});
|
||||
lifecycleTerminalEmitted = true;
|
||||
throw err;
|
||||
} finally {
|
||||
assistantBridge.unsubscribe();
|
||||
reasoningBridge.unsubscribe();
|
||||
// Defensive backstop: never let a CLI run complete without a terminal
|
||||
// lifecycle event, otherwise downstream consumers can hang.
|
||||
if (!lifecycleTerminalEmitted) {
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
startedAt,
|
||||
endedAt: Date.now(),
|
||||
error: "CLI run completed without lifecycle terminal event",
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
})();
|
||||
},
|
||||
};
|
||||
})()
|
||||
: rawResult,
|
||||
});
|
||||
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
|
||||
result.meta?.systemPromptReport,
|
||||
);
|
||||
return result;
|
||||
}
|
||||
const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams(
|
||||
{
|
||||
|
||||
@@ -8,6 +8,8 @@ import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { FollowupRun, QueueSettings } from "./queue.js";
|
||||
|
||||
const runEmbeddedPiAgentMock = vi.fn();
|
||||
const runCliAgentMock = vi.fn();
|
||||
const runWithModelFallbackMock = vi.fn();
|
||||
const compactEmbeddedPiSessionMock = vi.fn();
|
||||
const routeReplyMock = vi.fn();
|
||||
const isRoutableChannelMock = vi.fn();
|
||||
@@ -315,10 +317,9 @@ async function persistRunSessionUsageForFollowupTest(
|
||||
async function loadFreshFollowupRunnerModuleForTest() {
|
||||
vi.resetModules();
|
||||
vi.doUnmock("../../config/config.js");
|
||||
vi.doMock(
|
||||
"../../agents/model-fallback.js",
|
||||
async () => await import("../../test-utils/model-fallback.mock.js"),
|
||||
);
|
||||
vi.doMock("../../agents/model-fallback.js", () => ({
|
||||
runWithModelFallback: (params: unknown) => runWithModelFallbackMock(params),
|
||||
}));
|
||||
vi.doMock("../../agents/session-write-lock.js", () => ({
|
||||
acquireSessionWriteLock: vi.fn(async () => ({
|
||||
release: async () => {},
|
||||
@@ -335,6 +336,9 @@ async function loadFreshFollowupRunnerModuleForTest() {
|
||||
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
|
||||
waitForEmbeddedPiRunEnd: vi.fn(async () => undefined),
|
||||
}));
|
||||
vi.doMock("../../agents/cli-runner.js", () => ({
|
||||
runCliAgent: (params: unknown) => runCliAgentMock(params),
|
||||
}));
|
||||
vi.doMock("./queue.js", () => ({
|
||||
clearFollowupQueue: clearFollowupQueueForFollowupTest,
|
||||
completeFollowupRunLifecycle: (run: Pick<FollowupRun, "queuedLifecycle">) =>
|
||||
@@ -440,6 +444,23 @@ beforeAll(async () => {
|
||||
beforeEach(() => {
|
||||
clearRuntimeConfigSnapshot?.();
|
||||
runEmbeddedPiAgentMock.mockReset();
|
||||
runCliAgentMock.mockReset();
|
||||
runWithModelFallbackMock.mockReset();
|
||||
runWithModelFallbackMock.mockImplementation(
|
||||
async (params: {
|
||||
provider: string;
|
||||
model: string;
|
||||
run: (
|
||||
provider: string,
|
||||
model: string,
|
||||
options?: { allowTransientCooldownProbe?: boolean },
|
||||
) => Promise<unknown>;
|
||||
}) => ({
|
||||
result: await params.run(params.provider, params.model),
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
}),
|
||||
);
|
||||
compactEmbeddedPiSessionMock.mockReset();
|
||||
runPreflightCompactionIfNeededMock.mockReset();
|
||||
resolveCommandSecretRefsViaGatewayMock.mockReset();
|
||||
@@ -657,6 +678,153 @@ describe("createFollowupRunner auto fallback primary probes", () => {
|
||||
});
|
||||
|
||||
describe("createFollowupRunner runtime config", () => {
|
||||
it("routes queued followups through CLI runtime dispatch when the model selects a CLI backend", async () => {
|
||||
const runtimeConfig: OpenClawConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
cliBackends: {
|
||||
"claude-cli": { command: "claude" },
|
||||
},
|
||||
models: {
|
||||
"anthropic/claude-opus-4-7": { agentRuntime: { id: "claude-cli" } },
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session-cli-followup",
|
||||
updatedAt: Date.now(),
|
||||
cliSessionBindings: {
|
||||
"claude-cli": {
|
||||
sessionId: "cli-session-1",
|
||||
},
|
||||
},
|
||||
};
|
||||
const sessionStore = { main: sessionEntry };
|
||||
runCliAgentMock.mockResolvedValueOnce({
|
||||
payloads: [],
|
||||
meta: {
|
||||
agentMeta: {
|
||||
provider: "claude-cli",
|
||||
model: "claude-opus-4-7",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const runner = createFollowupRunner({
|
||||
typing: createMockTypingController(),
|
||||
typingMode: "instant",
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey: "main",
|
||||
defaultModel: "anthropic/claude-opus-4-7",
|
||||
});
|
||||
|
||||
await runner(
|
||||
createQueuedRun({
|
||||
originatingChannel: "telegram",
|
||||
run: {
|
||||
config: runtimeConfig,
|
||||
provider: "anthropic",
|
||||
model: "claude-opus-4-7",
|
||||
messageProvider: "telegram",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled();
|
||||
expect(runCliAgentMock).toHaveBeenCalledTimes(1);
|
||||
const call = requireLastMockCallArg(runCliAgentMock, "run cli agent");
|
||||
expect(call.provider).toBe("claude-cli");
|
||||
expect(call.model).toBe("claude-opus-4-7");
|
||||
expect(call.config).toBe(runtimeConfig);
|
||||
expect(call.cliSessionId).toBe("cli-session-1");
|
||||
expect(call.messageChannel).toBe("telegram");
|
||||
});
|
||||
|
||||
it("defers queued CLI attempt terminal lifecycle events until fallback settles", async () => {
|
||||
const realAgentEvents = await vi.importActual<typeof import("../../infra/agent-events.js")>(
|
||||
"../../infra/agent-events.js",
|
||||
);
|
||||
const lifecyclePhases: string[] = [];
|
||||
const unsubscribe = realAgentEvents.onAgentEvent((evt) => {
|
||||
if (evt.stream !== "lifecycle") {
|
||||
return;
|
||||
}
|
||||
const phase = typeof evt.data.phase === "string" ? evt.data.phase : undefined;
|
||||
if (phase) {
|
||||
lifecyclePhases.push(phase);
|
||||
}
|
||||
});
|
||||
const runtimeConfig: OpenClawConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
cliBackends: {
|
||||
"claude-cli": { command: "claude" },
|
||||
},
|
||||
models: {
|
||||
"anthropic/claude-opus-4-7": { agentRuntime: { id: "claude-cli" } },
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
runWithModelFallbackMock.mockImplementationOnce(
|
||||
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => {
|
||||
await expect(params.run("anthropic", "claude-opus-4-7")).rejects.toThrow("cli failed");
|
||||
return {
|
||||
result: await params.run("openai", "gpt-5.4"),
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
};
|
||||
},
|
||||
);
|
||||
runCliAgentMock.mockRejectedValueOnce(new Error("cli failed"));
|
||||
runEmbeddedPiAgentMock.mockImplementationOnce(async (params: { runId: string }) => {
|
||||
realAgentEvents.emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "start", startedAt: Date.now() },
|
||||
});
|
||||
realAgentEvents.emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", endedAt: Date.now() },
|
||||
});
|
||||
return {
|
||||
payloads: [{ text: "fallback ok" }],
|
||||
meta: {},
|
||||
};
|
||||
});
|
||||
|
||||
const runner = createFollowupRunner({
|
||||
typing: createMockTypingController(),
|
||||
typingMode: "instant",
|
||||
sessionKey: "main",
|
||||
defaultModel: "anthropic/claude-opus-4-7",
|
||||
});
|
||||
|
||||
try {
|
||||
await runner(
|
||||
createQueuedRun({
|
||||
originatingChannel: "telegram",
|
||||
originatingTo: "chat-1",
|
||||
run: {
|
||||
config: runtimeConfig,
|
||||
provider: "anthropic",
|
||||
model: "claude-opus-4-7",
|
||||
messageProvider: "telegram",
|
||||
},
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
unsubscribe();
|
||||
}
|
||||
|
||||
expect(runCliAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(lifecyclePhases).toEqual(["start", "start", "end"]);
|
||||
});
|
||||
|
||||
it("uses the active runtime snapshot for queued embedded followup runs", async () => {
|
||||
const sourceConfig: OpenClawConfig = {
|
||||
models: {
|
||||
|
||||
@@ -6,9 +6,12 @@ import {
|
||||
markAutoFallbackPrimaryProbe,
|
||||
} from "../../agents/agent-scope.js";
|
||||
import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js";
|
||||
import { getCliSessionBinding } from "../../agents/cli-session.js";
|
||||
import { resolveContextTokensForModel } from "../../agents/context.js";
|
||||
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
|
||||
import { runWithModelFallback } from "../../agents/model-fallback.js";
|
||||
import { resolveCliRuntimeExecutionProvider } from "../../agents/model-runtime-aliases.js";
|
||||
import { isCliProvider } from "../../agents/model-selection-cli.js";
|
||||
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
||||
import {
|
||||
buildAgentRuntimeDeliveryPlan,
|
||||
@@ -17,12 +20,16 @@ import {
|
||||
import { updateSessionStore, type SessionEntry } from "../../config/sessions.js";
|
||||
import type { TypingMode } from "../../config/types.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { isInternalMessageChannel } from "../../utils/message-channel.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { resolveRunAfterAutoFallbackPrimaryProbeRecheck } from "./agent-runner-execution.js";
|
||||
import { runCliAgentWithLifecycle } from "./agent-runner-cli-dispatch.js";
|
||||
import {
|
||||
resolveRunAfterAutoFallbackPrimaryProbeRecheck,
|
||||
resolveSessionRuntimeOverrideForProvider,
|
||||
} from "./agent-runner-execution.js";
|
||||
import { runPreflightCompactionIfNeeded } from "./agent-runner-memory.js";
|
||||
import {
|
||||
resolveQueuedReplyExecutionConfig,
|
||||
@@ -351,6 +358,13 @@ export function createFollowupRunner(params: {
|
||||
fallbackProvider = run.provider;
|
||||
fallbackModel = run.model;
|
||||
replyOperation.setPhase("running");
|
||||
let pendingDeferredCliTerminal:
|
||||
| {
|
||||
provider: string;
|
||||
model: string;
|
||||
startedAt: number;
|
||||
}
|
||||
| undefined;
|
||||
try {
|
||||
const outcomePlan = buildAgentRuntimeOutcomePlan();
|
||||
const fallbackResult = await runWithModelFallback<EmbeddedAgentRunResult>({
|
||||
@@ -371,8 +385,114 @@ export function createFollowupRunner(params: {
|
||||
const authProfile = resolveRunAuthProfile(candidateRun, provider, {
|
||||
config: runtimeConfig,
|
||||
});
|
||||
const sessionRuntimeOverride = resolveSessionRuntimeOverrideForProvider({
|
||||
provider,
|
||||
entry: activeSessionEntry,
|
||||
});
|
||||
const cliExecutionProvider =
|
||||
sessionRuntimeOverride === "pi"
|
||||
? provider
|
||||
: ((sessionRuntimeOverride && isCliProvider(sessionRuntimeOverride, runtimeConfig)
|
||||
? sessionRuntimeOverride
|
||||
: undefined) ??
|
||||
resolveCliRuntimeExecutionProvider({
|
||||
provider,
|
||||
cfg: runtimeConfig,
|
||||
agentId: run.agentId,
|
||||
modelId: model,
|
||||
}) ??
|
||||
provider);
|
||||
let attemptCompactionCount = 0;
|
||||
try {
|
||||
if (isCliProvider(cliExecutionProvider, runtimeConfig)) {
|
||||
const isRoomEventCliRun = queued.currentInboundEventKind === "room_event";
|
||||
const cliSessionBinding = isRoomEventCliRun
|
||||
? undefined
|
||||
: getCliSessionBinding(activeSessionEntry, cliExecutionProvider);
|
||||
const cliLifecycleStartedAt = Date.now();
|
||||
pendingDeferredCliTerminal = {
|
||||
provider,
|
||||
model,
|
||||
startedAt: cliLifecycleStartedAt,
|
||||
};
|
||||
const result = await runCliAgentWithLifecycle({
|
||||
runId,
|
||||
provider: cliExecutionProvider,
|
||||
startedAt: cliLifecycleStartedAt,
|
||||
emitLifecycleTerminal: false,
|
||||
onAgentRunStart: () => opts?.onAgentRunStart?.(runId),
|
||||
suppressAssistantBridge: run.silentExpected,
|
||||
runParams: {
|
||||
replyOperation,
|
||||
sessionId: run.sessionId,
|
||||
sessionKey: replySessionKey,
|
||||
agentId: run.agentId,
|
||||
trigger: opts?.isHeartbeat === true ? "heartbeat" : "user",
|
||||
sessionFile: run.sessionFile,
|
||||
workspaceDir: run.workspaceDir,
|
||||
config: runtimeConfig,
|
||||
prompt: queued.prompt,
|
||||
transcriptPrompt: queued.transcriptPrompt,
|
||||
currentInboundEventKind: queued.currentInboundEventKind,
|
||||
currentInboundContext: queued.currentInboundContext,
|
||||
inputProvenance: run.inputProvenance,
|
||||
provider: cliExecutionProvider,
|
||||
model,
|
||||
...resolveRunAuthProfile(candidateRun, cliExecutionProvider, {
|
||||
config: runtimeConfig,
|
||||
}),
|
||||
thinkLevel: run.thinkLevel,
|
||||
timeoutMs: run.timeoutMs,
|
||||
runId,
|
||||
extraSystemPrompt: run.extraSystemPrompt,
|
||||
sourceReplyDeliveryMode: run.sourceReplyDeliveryMode,
|
||||
silentReplyPromptMode: run.silentReplyPromptMode,
|
||||
extraSystemPromptStatic: run.extraSystemPromptStatic,
|
||||
ownerNumbers: run.ownerNumbers,
|
||||
cliSessionId: cliSessionBinding?.sessionId,
|
||||
cliSessionBinding,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature:
|
||||
bootstrapPromptWarningSignaturesSeen[
|
||||
bootstrapPromptWarningSignaturesSeen.length - 1
|
||||
],
|
||||
images: queuedImages,
|
||||
imageOrder: queuedImageOrder,
|
||||
skillsSnapshot: run.skillsSnapshot,
|
||||
messageChannel: queued.originatingChannel ?? undefined,
|
||||
messageProvider: resolveOriginMessageProvider({
|
||||
originatingChannel: queued.originatingChannel,
|
||||
provider: run.messageProvider,
|
||||
}),
|
||||
agentAccountId: run.agentAccountId,
|
||||
senderIsOwner: run.senderIsOwner,
|
||||
disableTools: opts?.disableTools,
|
||||
abortSignal: queued.abortSignal ?? opts?.abortSignal,
|
||||
},
|
||||
transformResult: (rawResult) =>
|
||||
isRoomEventCliRun && rawResult.meta.agentMeta
|
||||
? (() => {
|
||||
const { cliSessionBinding: _cliSessionBinding, ...agentMeta } =
|
||||
rawResult.meta.agentMeta;
|
||||
return {
|
||||
...rawResult,
|
||||
meta: {
|
||||
...rawResult.meta,
|
||||
agentMeta: {
|
||||
...agentMeta,
|
||||
sessionId: "",
|
||||
},
|
||||
},
|
||||
};
|
||||
})()
|
||||
: rawResult,
|
||||
});
|
||||
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
|
||||
result.meta?.systemPromptReport,
|
||||
);
|
||||
return result;
|
||||
}
|
||||
pendingDeferredCliTerminal = undefined;
|
||||
const result = await runEmbeddedPiAgent({
|
||||
allowGatewaySubagentBinding: true,
|
||||
replyOperation,
|
||||
@@ -466,6 +586,22 @@ export function createFollowupRunner(params: {
|
||||
runResult = fallbackResult.result;
|
||||
fallbackProvider = fallbackResult.provider;
|
||||
fallbackModel = fallbackResult.model;
|
||||
if (
|
||||
pendingDeferredCliTerminal &&
|
||||
pendingDeferredCliTerminal.provider === fallbackProvider &&
|
||||
pendingDeferredCliTerminal.model === fallbackModel
|
||||
) {
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
startedAt: pendingDeferredCliTerminal.startedAt,
|
||||
endedAt: Date.now(),
|
||||
},
|
||||
});
|
||||
}
|
||||
pendingDeferredCliTerminal = undefined;
|
||||
await clearRecoveredAutoFallbackPrimaryProbe({
|
||||
provider: fallbackProvider,
|
||||
model: fallbackModel,
|
||||
@@ -473,6 +609,19 @@ export function createFollowupRunner(params: {
|
||||
} catch (err) {
|
||||
const message = formatErrorMessage(err);
|
||||
replyOperation.fail("run_failed", err);
|
||||
if (pendingDeferredCliTerminal) {
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "error",
|
||||
startedAt: pendingDeferredCliTerminal.startedAt,
|
||||
endedAt: Date.now(),
|
||||
error: message,
|
||||
},
|
||||
});
|
||||
pendingDeferredCliTerminal = undefined;
|
||||
}
|
||||
defaultRuntime.error?.(`Followup agent failed before reply: ${message}`);
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user