refactor: split agent command execution helpers

This commit is contained in:
Peter Steinberger
2026-03-27 00:58:14 +00:00
parent 6fd1725a06
commit 046a950877
2 changed files with 507 additions and 440 deletions

View File

@@ -1,27 +1,15 @@
import fs from "node:fs/promises";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../acp/policy.js";
import { toAcpRuntimeError } from "../acp/runtime/errors.js";
import { resolveAcpSessionCwd } from "../acp/runtime/session-identifiers.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
const log = createSubsystemLogger("agents/agent-command");
import { normalizeReplyPayload } from "../auto-reply/reply/normalize-reply.js";
import {
formatThinkingLevels,
formatXHighModelHint,
normalizeThinkLevel,
normalizeVerboseLevel,
supportsXHighThinking,
type ThinkLevel,
type VerboseLevel,
} from "../auto-reply/thinking.js";
import {
isSilentReplyPrefixText,
isSilentReplyText,
SILENT_REPLY_TOKEN,
} from "../auto-reply/tokens.js";
import { formatCliCommand } from "../cli/command-format.js";
import { resolveCommandSecretRefsViaGateway } from "../cli/command-secret-gateway.js";
import { getAgentRuntimeCommandSecretTargetIds } from "../cli/command-secret-targets.js";
@@ -31,12 +19,7 @@ import {
readConfigFileSnapshotForWrite,
setRuntimeConfigSnapshot,
} from "../config/config.js";
import {
mergeSessionEntry,
resolveAgentIdFromSessionKey,
type SessionEntry,
updateSessionStore,
} from "../config/sessions.js";
import { resolveAgentIdFromSessionKey, type SessionEntry } from "../config/sessions.js";
import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js";
import {
clearAgentRunContext,
@@ -45,12 +28,12 @@ import {
} from "../infra/agent-events.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { getRemoteSkillEligibility } from "../infra/skills-remote.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { applyVerboseOverride } from "../sessions/level-overrides.js";
import { applyModelOverrideToSessionEntry } from "../sessions/model-overrides.js";
import { resolveSendPolicy } from "../sessions/send-policy.js";
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { sanitizeForLog } from "../terminal/ansi.js";
import { resolveMessageChannel } from "../utils/message-channel.js";
import {
@@ -63,23 +46,29 @@ import {
} from "./agent-scope.js";
import { ensureAuthProfileStore } from "./auth-profiles.js";
import { clearSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
import { resolveBootstrapWarningSignaturesSeen } from "./bootstrap-budget.js";
import { runCliAgent } from "./cli-runner.js";
import { clearCliSession, getCliSessionBinding, setCliSessionBinding } from "./cli-session.js";
import {
buildAcpResult,
createAcpVisibleTextAccumulator,
emitAcpAssistantDelta,
emitAcpLifecycleEnd,
emitAcpLifecycleError,
emitAcpLifecycleStart,
persistAcpTurnTranscript,
persistSessionEntry as persistSessionEntryBase,
prependInternalEventContext,
runAgentAttempt,
} from "./command/attempt-execution.js";
import { deliverAgentCommandResult } from "./command/delivery.js";
import { resolveAgentRunContext } from "./command/run-context.js";
import { updateSessionStoreAfterAgentRun } from "./command/session-store.js";
import { resolveSession } from "./command/session.js";
import type { AgentCommandIngressOpts, AgentCommandOpts } from "./command/types.js";
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js";
import { FailoverError } from "./failover-error.js";
import { formatAgentInternalEventsForPrompt } from "./internal-events.js";
import { AGENT_LANE_SUBAGENT } from "./lanes.js";
import { loadModelCatalog } from "./model-catalog.js";
import { runWithModelFallback } from "./model-fallback.js";
import {
buildAllowedModelSet,
isCliProvider,
modelKey,
normalizeModelRef,
parseModelRef,
@@ -87,14 +76,14 @@ import {
resolveDefaultModelForAgent,
resolveThinkingDefault,
} from "./model-selection.js";
import { prepareSessionManagerForRun } from "./pi-embedded-runner/session-manager-init.js";
import { runEmbeddedPiAgent } from "./pi-embedded.js";
import { buildWorkspaceSkillSnapshot } from "./skills.js";
import { getSkillsSnapshotVersion } from "./skills/refresh.js";
import { normalizeSpawnedRunMetadata } from "./spawned-context.js";
import { resolveAgentTimeoutMs } from "./timeout.js";
import { ensureAgentWorkspace } from "./workspace.js";
const log = createSubsystemLogger("agents/agent-command");
type PersistSessionEntryParams = {
sessionStore: Record<string, SessionEntry>;
sessionKey: string;
@@ -127,6 +116,13 @@ const OVERRIDE_FIELDS_CLEARED_BY_DELETE: OverrideFieldClearedByDelete[] = [
const OVERRIDE_VALUE_MAX_LENGTH = 256;
async function persistSessionEntry(params: PersistSessionEntryParams): Promise<void> {
await persistSessionEntryBase({
...params,
clearedFields: OVERRIDE_FIELDS_CLEARED_BY_DELETE,
});
}
function containsControlCharacters(value: string): boolean {
for (const char of value) {
const code = char.codePointAt(0);
@@ -155,378 +151,6 @@ function normalizeExplicitOverrideInput(raw: string, kind: "provider" | "model")
return trimmed;
}
async function persistSessionEntry(params: PersistSessionEntryParams): Promise<void> {
const persisted = await updateSessionStore(params.storePath, (store) => {
const merged = mergeSessionEntry(store[params.sessionKey], params.entry);
// Preserve explicit `delete` clears done by session override helpers.
for (const field of OVERRIDE_FIELDS_CLEARED_BY_DELETE) {
if (!Object.hasOwn(params.entry, field)) {
Reflect.deleteProperty(merged, field);
}
}
store[params.sessionKey] = merged;
return merged;
});
params.sessionStore[params.sessionKey] = persisted;
}
function resolveFallbackRetryPrompt(params: { body: string; isFallbackRetry: boolean }): string {
if (!params.isFallbackRetry) {
return params.body;
}
return "Continue where you left off. The previous model attempt failed or timed out.";
}
function prependInternalEventContext(
body: string,
events: AgentCommandOpts["internalEvents"],
): string {
if (body.includes("OpenClaw runtime context (internal):")) {
return body;
}
const renderedEvents = formatAgentInternalEventsForPrompt(events);
if (!renderedEvents) {
return body;
}
return [renderedEvents, body].filter(Boolean).join("\n\n");
}
function createAcpVisibleTextAccumulator() {
let pendingSilentPrefix = "";
let visibleText = "";
const startsWithWordChar = (chunk: string): boolean => /^[\p{L}\p{N}]/u.test(chunk);
const resolveNextCandidate = (base: string, chunk: string): string => {
if (!base) {
return chunk;
}
if (
isSilentReplyText(base, SILENT_REPLY_TOKEN) &&
!chunk.startsWith(base) &&
startsWithWordChar(chunk)
) {
return chunk;
}
// Some ACP backends emit cumulative snapshots even on text_delta-style hooks.
// Accept those only when they strictly extend the buffered text.
if (chunk.startsWith(base) && chunk.length > base.length) {
return chunk;
}
return `${base}${chunk}`;
};
const mergeVisibleChunk = (base: string, chunk: string): { text: string; delta: string } => {
if (!base) {
return { text: chunk, delta: chunk };
}
if (chunk.startsWith(base) && chunk.length > base.length) {
const delta = chunk.slice(base.length);
return { text: chunk, delta };
}
return {
text: `${base}${chunk}`,
delta: chunk,
};
};
return {
consume(chunk: string): { text: string; delta: string } | null {
if (!chunk) {
return null;
}
if (!visibleText) {
const leadCandidate = resolveNextCandidate(pendingSilentPrefix, chunk);
const trimmedLeadCandidate = leadCandidate.trim();
if (
isSilentReplyText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) ||
isSilentReplyPrefixText(trimmedLeadCandidate, SILENT_REPLY_TOKEN)
) {
pendingSilentPrefix = leadCandidate;
return null;
}
if (pendingSilentPrefix) {
pendingSilentPrefix = "";
visibleText = leadCandidate;
return {
text: visibleText,
delta: leadCandidate,
};
}
}
const nextVisible = mergeVisibleChunk(visibleText, chunk);
visibleText = nextVisible.text;
return nextVisible.delta ? nextVisible : null;
},
finalize(): string {
return visibleText.trim();
},
finalizeRaw(): string {
return visibleText;
},
};
}
const ACP_TRANSCRIPT_USAGE = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
} as const;
async function persistAcpTurnTranscript(params: {
body: string;
finalText: string;
sessionId: string;
sessionKey: string;
sessionEntry: SessionEntry | undefined;
sessionStore?: Record<string, SessionEntry>;
storePath?: string;
sessionAgentId: string;
threadId?: string | number;
sessionCwd: string;
}): Promise<SessionEntry | undefined> {
const promptText = params.body;
const replyText = params.finalText;
if (!promptText && !replyText) {
return params.sessionEntry;
}
const { sessionFile, sessionEntry } = await resolveSessionTranscriptFile({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
storePath: params.storePath,
agentId: params.sessionAgentId,
threadId: params.threadId,
});
const hadSessionFile = await fs
.access(sessionFile)
.then(() => true)
.catch(() => false);
const sessionManager = SessionManager.open(sessionFile);
await prepareSessionManagerForRun({
sessionManager,
sessionFile,
hadSessionFile,
sessionId: params.sessionId,
cwd: params.sessionCwd,
});
if (promptText) {
sessionManager.appendMessage({
role: "user",
content: promptText,
timestamp: Date.now(),
});
}
if (replyText) {
sessionManager.appendMessage({
role: "assistant",
content: [{ type: "text", text: replyText }],
api: "openai-responses",
provider: "openclaw",
model: "acp-runtime",
usage: ACP_TRANSCRIPT_USAGE,
stopReason: "stop",
timestamp: Date.now(),
});
}
emitSessionTranscriptUpdate(sessionFile);
return sessionEntry;
}
function runAgentAttempt(params: {
providerOverride: string;
modelOverride: string;
cfg: ReturnType<typeof loadConfig>;
sessionEntry: SessionEntry | undefined;
sessionId: string;
sessionKey: string | undefined;
sessionAgentId: string;
sessionFile: string;
workspaceDir: string;
body: string;
isFallbackRetry: boolean;
resolvedThinkLevel: ThinkLevel;
timeoutMs: number;
runId: string;
opts: AgentCommandOpts & { senderIsOwner: boolean };
runContext: ReturnType<typeof resolveAgentRunContext>;
spawnedBy: string | undefined;
messageChannel: ReturnType<typeof resolveMessageChannel>;
skillsSnapshot: ReturnType<typeof buildWorkspaceSkillSnapshot> | undefined;
resolvedVerboseLevel: VerboseLevel | undefined;
agentDir: string;
onAgentEvent: (evt: { stream: string; data?: Record<string, unknown> }) => void;
authProfileProvider: string;
sessionStore?: Record<string, SessionEntry>;
storePath?: string;
allowTransientCooldownProbe?: boolean;
}) {
const effectivePrompt = resolveFallbackRetryPrompt({
body: params.body,
isFallbackRetry: params.isFallbackRetry,
});
const bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
params.sessionEntry?.systemPromptReport,
);
const bootstrapPromptWarningSignature =
bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1];
const authProfileId =
params.providerOverride === params.authProfileProvider
? params.sessionEntry?.authProfileOverride
: undefined;
if (isCliProvider(params.providerOverride, params.cfg)) {
const cliSessionBinding = getCliSessionBinding(params.sessionEntry, params.providerOverride);
const runCliWithSession = (nextCliSessionId: string | undefined) =>
runCliAgent({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: params.sessionAgentId,
sessionFile: params.sessionFile,
workspaceDir: params.workspaceDir,
config: params.cfg,
prompt: effectivePrompt,
provider: params.providerOverride,
model: params.modelOverride,
thinkLevel: params.resolvedThinkLevel,
timeoutMs: params.timeoutMs,
runId: params.runId,
extraSystemPrompt: params.opts.extraSystemPrompt,
cliSessionId: nextCliSessionId,
cliSessionBinding:
nextCliSessionId === cliSessionBinding?.sessionId ? cliSessionBinding : undefined,
authProfileId,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
images: params.isFallbackRetry ? undefined : params.opts.images,
streamParams: params.opts.streamParams,
});
return runCliWithSession(cliSessionBinding?.sessionId).catch(async (err) => {
// Handle CLI session expired error
if (
err instanceof FailoverError &&
err.reason === "session_expired" &&
cliSessionBinding?.sessionId &&
params.sessionKey &&
params.sessionStore &&
params.storePath
) {
log.warn(
`CLI session expired, clearing from session store: provider=${sanitizeForLog(params.providerOverride)} sessionKey=${params.sessionKey}`,
);
// Clear the expired session ID from the session store
const entry = params.sessionStore[params.sessionKey];
if (entry) {
const updatedEntry = { ...entry };
clearCliSession(updatedEntry, params.providerOverride);
updatedEntry.updatedAt = Date.now();
await persistSessionEntry({
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
entry: updatedEntry,
});
// Update the session entry reference
params.sessionEntry = updatedEntry;
}
// Retry with no session ID (will create a new session)
return runCliWithSession(undefined).then(async (result) => {
// Update session store with new CLI session ID if available
if (
result.meta.agentMeta?.cliSessionBinding?.sessionId &&
params.sessionKey &&
params.sessionStore &&
params.storePath
) {
const entry = params.sessionStore[params.sessionKey];
if (entry) {
const updatedEntry = { ...entry };
setCliSessionBinding(
updatedEntry,
params.providerOverride,
result.meta.agentMeta.cliSessionBinding,
);
updatedEntry.updatedAt = Date.now();
await persistSessionEntry({
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
entry: updatedEntry,
});
}
}
return result;
});
}
throw err;
});
}
return runEmbeddedPiAgent({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: params.sessionAgentId,
trigger: "user",
messageChannel: params.messageChannel,
agentAccountId: params.runContext.accountId,
messageTo: params.opts.replyTo ?? params.opts.to,
messageThreadId: params.opts.threadId,
groupId: params.runContext.groupId,
groupChannel: params.runContext.groupChannel,
groupSpace: params.runContext.groupSpace,
spawnedBy: params.spawnedBy,
currentChannelId: params.runContext.currentChannelId,
currentThreadTs: params.runContext.currentThreadTs,
replyToMode: params.runContext.replyToMode,
hasRepliedRef: params.runContext.hasRepliedRef,
senderIsOwner: params.opts.senderIsOwner,
sessionFile: params.sessionFile,
workspaceDir: params.workspaceDir,
config: params.cfg,
skillsSnapshot: params.skillsSnapshot,
prompt: effectivePrompt,
images: params.isFallbackRetry ? undefined : params.opts.images,
clientTools: params.opts.clientTools,
provider: params.providerOverride,
model: params.modelOverride,
authProfileId,
authProfileIdSource: authProfileId ? params.sessionEntry?.authProfileOverrideSource : undefined,
thinkLevel: params.resolvedThinkLevel,
verboseLevel: params.resolvedVerboseLevel,
timeoutMs: params.timeoutMs,
runId: params.runId,
lane: params.opts.lane,
abortSignal: params.opts.abortSignal,
extraSystemPrompt: params.opts.extraSystemPrompt,
inputProvenance: params.opts.inputProvenance,
streamParams: params.opts.streamParams,
agentDir: params.agentDir,
allowTransientCooldownProbe: params.allowTransientCooldownProbe,
onAgentEvent: params.onAgentEvent,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
});
}
async function prepareAgentCommandExecution(
opts: AgentCommandOpts & { senderIsOwner: boolean },
runtime: RuntimeEnv,
@@ -756,14 +380,7 @@ async function agentCommandInternal(
registerAgentRunContext(runId, {
sessionKey,
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "start",
startedAt,
},
});
emitAcpLifecycleStart({ runId, startedAt });
const visibleTextAccumulator = createAcpVisibleTextAccumulator();
let stopReason: string | undefined;
@@ -805,13 +422,10 @@ async function agentCommandInternal(
if (!visibleUpdate) {
return;
}
emitAgentEvent({
emitAcpAssistantDelta({
runId,
stream: "assistant",
data: {
text: visibleUpdate.text,
delta: visibleUpdate.delta,
},
text: visibleUpdate.text,
delta: visibleUpdate.delta,
});
},
});
@@ -821,26 +435,14 @@ async function agentCommandInternal(
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP turn failed before completion.",
});
emitAgentEvent({
emitAcpLifecycleError({
runId,
stream: "lifecycle",
data: {
phase: "error",
error: acpError.message,
endedAt: Date.now(),
},
message: acpError.message,
});
throw acpError;
}
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
endedAt: Date.now(),
},
});
emitAcpLifecycleEnd({ runId });
const finalTextRaw = visibleTextAccumulator.finalizeRaw();
const finalText = visibleTextAccumulator.finalize();
@@ -863,18 +465,13 @@ async function agentCommandInternal(
);
}
const normalizedFinalPayload = normalizeReplyPayload({
text: finalText,
const result = buildAcpResult({
payloadText: finalText,
startedAt,
stopReason,
abortSignal: opts.abortSignal,
});
const payloads = normalizedFinalPayload ? [normalizedFinalPayload] : [];
const result = {
payloads,
meta: {
durationMs: Date.now() - startedAt,
aborted: opts.abortSignal?.aborted === true,
stopReason,
},
};
const payloads = result.payloads;
return await deliverAgentCommandResult({
cfg,
@@ -1131,7 +728,7 @@ async function agentCommandInternal(
const startedAt = Date.now();
let lifecycleEnded = false;
let result: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let result: Awaited<ReturnType<typeof runAgentAttempt>>;
let fallbackProvider = provider;
let fallbackModel = model;
try {

View File

@@ -0,0 +1,470 @@
import fs from "node:fs/promises";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js";
import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js";
import {
isSilentReplyPrefixText,
isSilentReplyText,
SILENT_REPLY_TOKEN,
} from "../../auto-reply/tokens.js";
import { loadConfig } from "../../config/config.js";
import { mergeSessionEntry, type SessionEntry, updateSessionStore } from "../../config/sessions.js";
import { resolveSessionTranscriptFile } from "../../config/sessions/transcript.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import { sanitizeForLog } from "../../terminal/ansi.js";
import { resolveMessageChannel } from "../../utils/message-channel.js";
import { resolveBootstrapWarningSignaturesSeen } from "../bootstrap-budget.js";
import { runCliAgent } from "../cli-runner.js";
import { clearCliSession, getCliSessionBinding, setCliSessionBinding } from "../cli-session.js";
import { FailoverError } from "../failover-error.js";
import { formatAgentInternalEventsForPrompt } from "../internal-events.js";
import { isCliProvider } from "../model-selection.js";
import { prepareSessionManagerForRun } from "../pi-embedded-runner/session-manager-init.js";
import { runEmbeddedPiAgent } from "../pi-embedded.js";
import { buildWorkspaceSkillSnapshot } from "../skills.js";
import { resolveAgentRunContext } from "./run-context.js";
import type { AgentCommandOpts } from "./types.js";
const log = createSubsystemLogger("agents/agent-command");
export type PersistSessionEntryParams = {
sessionStore: Record<string, SessionEntry>;
sessionKey: string;
storePath: string;
entry: SessionEntry;
clearedFields?: string[];
};
export async function persistSessionEntry(params: PersistSessionEntryParams): Promise<void> {
const persisted = await updateSessionStore(params.storePath, (store) => {
const merged = mergeSessionEntry(store[params.sessionKey], params.entry);
for (const field of params.clearedFields ?? []) {
if (!Object.hasOwn(params.entry, field)) {
Reflect.deleteProperty(merged, field);
}
}
store[params.sessionKey] = merged;
return merged;
});
params.sessionStore[params.sessionKey] = persisted;
}
export function resolveFallbackRetryPrompt(params: {
body: string;
isFallbackRetry: boolean;
}): string {
if (!params.isFallbackRetry) {
return params.body;
}
return "Continue where you left off. The previous model attempt failed or timed out.";
}
export function prependInternalEventContext(
body: string,
events: AgentCommandOpts["internalEvents"],
): string {
if (body.includes("OpenClaw runtime context (internal):")) {
return body;
}
const renderedEvents = formatAgentInternalEventsForPrompt(events);
if (!renderedEvents) {
return body;
}
return [renderedEvents, body].filter(Boolean).join("\n\n");
}
export function createAcpVisibleTextAccumulator() {
let pendingSilentPrefix = "";
let visibleText = "";
const startsWithWordChar = (chunk: string): boolean => /^[\p{L}\p{N}]/u.test(chunk);
const resolveNextCandidate = (base: string, chunk: string): string => {
if (!base) {
return chunk;
}
if (
isSilentReplyText(base, SILENT_REPLY_TOKEN) &&
!chunk.startsWith(base) &&
startsWithWordChar(chunk)
) {
return chunk;
}
if (chunk.startsWith(base) && chunk.length > base.length) {
return chunk;
}
return `${base}${chunk}`;
};
const mergeVisibleChunk = (base: string, chunk: string): { text: string; delta: string } => {
if (!base) {
return { text: chunk, delta: chunk };
}
if (chunk.startsWith(base) && chunk.length > base.length) {
const delta = chunk.slice(base.length);
return { text: chunk, delta };
}
return {
text: `${base}${chunk}`,
delta: chunk,
};
};
return {
consume(chunk: string): { text: string; delta: string } | null {
if (!chunk) {
return null;
}
if (!visibleText) {
const leadCandidate = resolveNextCandidate(pendingSilentPrefix, chunk);
const trimmedLeadCandidate = leadCandidate.trim();
if (
isSilentReplyText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) ||
isSilentReplyPrefixText(trimmedLeadCandidate, SILENT_REPLY_TOKEN)
) {
pendingSilentPrefix = leadCandidate;
return null;
}
if (pendingSilentPrefix) {
pendingSilentPrefix = "";
visibleText = leadCandidate;
return {
text: visibleText,
delta: leadCandidate,
};
}
}
const nextVisible = mergeVisibleChunk(visibleText, chunk);
visibleText = nextVisible.text;
return nextVisible.delta ? nextVisible : null;
},
finalize(): string {
return visibleText.trim();
},
finalizeRaw(): string {
return visibleText;
},
};
}
const ACP_TRANSCRIPT_USAGE = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
} as const;
export async function persistAcpTurnTranscript(params: {
body: string;
finalText: string;
sessionId: string;
sessionKey: string;
sessionEntry: SessionEntry | undefined;
sessionStore?: Record<string, SessionEntry>;
storePath?: string;
sessionAgentId: string;
threadId?: string | number;
sessionCwd: string;
}): Promise<SessionEntry | undefined> {
const promptText = params.body;
const replyText = params.finalText;
if (!promptText && !replyText) {
return params.sessionEntry;
}
const { sessionFile, sessionEntry } = await resolveSessionTranscriptFile({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
storePath: params.storePath,
agentId: params.sessionAgentId,
threadId: params.threadId,
});
const hadSessionFile = await fs
.access(sessionFile)
.then(() => true)
.catch(() => false);
const sessionManager = SessionManager.open(sessionFile);
await prepareSessionManagerForRun({
sessionManager,
sessionFile,
hadSessionFile,
sessionId: params.sessionId,
cwd: params.sessionCwd,
});
if (promptText) {
sessionManager.appendMessage({
role: "user",
content: promptText,
timestamp: Date.now(),
});
}
if (replyText) {
sessionManager.appendMessage({
role: "assistant",
content: [{ type: "text", text: replyText }],
api: "openai-responses",
provider: "openclaw",
model: "acp-runtime",
usage: ACP_TRANSCRIPT_USAGE,
stopReason: "stop",
timestamp: Date.now(),
});
}
emitSessionTranscriptUpdate(sessionFile);
return sessionEntry;
}
export function runAgentAttempt(params: {
providerOverride: string;
modelOverride: string;
cfg: ReturnType<typeof loadConfig>;
sessionEntry: SessionEntry | undefined;
sessionId: string;
sessionKey: string | undefined;
sessionAgentId: string;
sessionFile: string;
workspaceDir: string;
body: string;
isFallbackRetry: boolean;
resolvedThinkLevel: ThinkLevel;
timeoutMs: number;
runId: string;
opts: AgentCommandOpts & { senderIsOwner: boolean };
runContext: ReturnType<typeof resolveAgentRunContext>;
spawnedBy: string | undefined;
messageChannel: ReturnType<typeof resolveMessageChannel>;
skillsSnapshot: ReturnType<typeof buildWorkspaceSkillSnapshot> | undefined;
resolvedVerboseLevel: VerboseLevel | undefined;
agentDir: string;
onAgentEvent: (evt: { stream: string; data?: Record<string, unknown> }) => void;
authProfileProvider: string;
sessionStore?: Record<string, SessionEntry>;
storePath?: string;
allowTransientCooldownProbe?: boolean;
}) {
const effectivePrompt = resolveFallbackRetryPrompt({
body: params.body,
isFallbackRetry: params.isFallbackRetry,
});
const bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
params.sessionEntry?.systemPromptReport,
);
const bootstrapPromptWarningSignature =
bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1];
const authProfileId =
params.providerOverride === params.authProfileProvider
? params.sessionEntry?.authProfileOverride
: undefined;
if (isCliProvider(params.providerOverride, params.cfg)) {
const cliSessionBinding = getCliSessionBinding(params.sessionEntry, params.providerOverride);
const runCliWithSession = (nextCliSessionId: string | undefined) =>
runCliAgent({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: params.sessionAgentId,
sessionFile: params.sessionFile,
workspaceDir: params.workspaceDir,
config: params.cfg,
prompt: effectivePrompt,
provider: params.providerOverride,
model: params.modelOverride,
thinkLevel: params.resolvedThinkLevel,
timeoutMs: params.timeoutMs,
runId: params.runId,
extraSystemPrompt: params.opts.extraSystemPrompt,
cliSessionId: nextCliSessionId,
cliSessionBinding:
nextCliSessionId === cliSessionBinding?.sessionId ? cliSessionBinding : undefined,
authProfileId,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
images: params.isFallbackRetry ? undefined : params.opts.images,
streamParams: params.opts.streamParams,
});
return runCliWithSession(cliSessionBinding?.sessionId).catch(async (err) => {
if (
err instanceof FailoverError &&
err.reason === "session_expired" &&
cliSessionBinding?.sessionId &&
params.sessionKey &&
params.sessionStore &&
params.storePath
) {
log.warn(
`CLI session expired, clearing from session store: provider=${sanitizeForLog(params.providerOverride)} sessionKey=${params.sessionKey}`,
);
const entry = params.sessionStore[params.sessionKey];
if (entry) {
const updatedEntry = { ...entry };
clearCliSession(updatedEntry, params.providerOverride);
updatedEntry.updatedAt = Date.now();
await persistSessionEntry({
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
entry: updatedEntry,
});
params.sessionEntry = updatedEntry;
}
return runCliWithSession(undefined).then(async (result) => {
if (
result.meta.agentMeta?.cliSessionBinding?.sessionId &&
params.sessionKey &&
params.sessionStore &&
params.storePath
) {
const entry = params.sessionStore[params.sessionKey];
if (entry) {
const updatedEntry = { ...entry };
setCliSessionBinding(
updatedEntry,
params.providerOverride,
result.meta.agentMeta.cliSessionBinding,
);
updatedEntry.updatedAt = Date.now();
await persistSessionEntry({
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
entry: updatedEntry,
});
}
}
return result;
});
}
throw err;
});
}
return runEmbeddedPiAgent({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: params.sessionAgentId,
trigger: "user",
messageChannel: params.messageChannel,
agentAccountId: params.runContext.accountId,
messageTo: params.opts.replyTo ?? params.opts.to,
messageThreadId: params.opts.threadId,
groupId: params.runContext.groupId,
groupChannel: params.runContext.groupChannel,
groupSpace: params.runContext.groupSpace,
spawnedBy: params.spawnedBy,
currentChannelId: params.runContext.currentChannelId,
currentThreadTs: params.runContext.currentThreadTs,
replyToMode: params.runContext.replyToMode,
hasRepliedRef: params.runContext.hasRepliedRef,
senderIsOwner: params.opts.senderIsOwner,
sessionFile: params.sessionFile,
workspaceDir: params.workspaceDir,
config: params.cfg,
skillsSnapshot: params.skillsSnapshot,
prompt: effectivePrompt,
images: params.isFallbackRetry ? undefined : params.opts.images,
clientTools: params.opts.clientTools,
provider: params.providerOverride,
model: params.modelOverride,
authProfileId,
authProfileIdSource: authProfileId ? params.sessionEntry?.authProfileOverrideSource : undefined,
thinkLevel: params.resolvedThinkLevel,
verboseLevel: params.resolvedVerboseLevel,
timeoutMs: params.timeoutMs,
runId: params.runId,
lane: params.opts.lane,
abortSignal: params.opts.abortSignal,
extraSystemPrompt: params.opts.extraSystemPrompt,
inputProvenance: params.opts.inputProvenance,
streamParams: params.opts.streamParams,
agentDir: params.agentDir,
allowTransientCooldownProbe: params.allowTransientCooldownProbe,
onAgentEvent: params.onAgentEvent,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
});
}
export function buildAcpResult(params: {
payloadText: string;
startedAt: number;
stopReason?: string;
abortSignal?: AbortSignal;
}) {
const normalizedFinalPayload = normalizeReplyPayload({
text: params.payloadText,
});
const payloads = normalizedFinalPayload ? [normalizedFinalPayload] : [];
return {
payloads,
meta: {
durationMs: Date.now() - params.startedAt,
aborted: params.abortSignal?.aborted === true,
stopReason: params.stopReason,
},
};
}
export function emitAcpLifecycleStart(params: { runId: string; startedAt: number }) {
emitAgentEvent({
runId: params.runId,
stream: "lifecycle",
data: {
phase: "start",
startedAt: params.startedAt,
},
});
}
export function emitAcpLifecycleEnd(params: { runId: string }) {
emitAgentEvent({
runId: params.runId,
stream: "lifecycle",
data: {
phase: "end",
endedAt: Date.now(),
},
});
}
export function emitAcpLifecycleError(params: { runId: string; message: string }) {
emitAgentEvent({
runId: params.runId,
stream: "lifecycle",
data: {
phase: "error",
error: params.message,
endedAt: Date.now(),
},
});
}
export function emitAcpAssistantDelta(params: { runId: string; text: string; delta: string }) {
emitAgentEvent({
runId: params.runId,
stream: "assistant",
data: {
text: params.text,
delta: params.delta,
},
});
}