diff --git a/src/agents/agent-command.ts b/src/agents/agent-command.ts index 6c6b62f8aec..4d8a4a2e114 100644 --- a/src/agents/agent-command.ts +++ b/src/agents/agent-command.ts @@ -1,27 +1,15 @@ -import fs from "node:fs/promises"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; import { getAcpSessionManager } from "../acp/control-plane/manager.js"; import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../acp/policy.js"; import { toAcpRuntimeError } from "../acp/runtime/errors.js"; import { resolveAcpSessionCwd } from "../acp/runtime/session-identifiers.js"; -import { createSubsystemLogger } from "../logging/subsystem.js"; - -const log = createSubsystemLogger("agents/agent-command"); -import { normalizeReplyPayload } from "../auto-reply/reply/normalize-reply.js"; import { formatThinkingLevels, formatXHighModelHint, normalizeThinkLevel, normalizeVerboseLevel, supportsXHighThinking, - type ThinkLevel, type VerboseLevel, } from "../auto-reply/thinking.js"; -import { - isSilentReplyPrefixText, - isSilentReplyText, - SILENT_REPLY_TOKEN, -} from "../auto-reply/tokens.js"; import { formatCliCommand } from "../cli/command-format.js"; import { resolveCommandSecretRefsViaGateway } from "../cli/command-secret-gateway.js"; import { getAgentRuntimeCommandSecretTargetIds } from "../cli/command-secret-targets.js"; @@ -31,12 +19,7 @@ import { readConfigFileSnapshotForWrite, setRuntimeConfigSnapshot, } from "../config/config.js"; -import { - mergeSessionEntry, - resolveAgentIdFromSessionKey, - type SessionEntry, - updateSessionStore, -} from "../config/sessions.js"; +import { resolveAgentIdFromSessionKey, type SessionEntry } from "../config/sessions.js"; import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js"; import { clearAgentRunContext, @@ -45,12 +28,12 @@ import { } from "../infra/agent-events.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { getRemoteSkillEligibility } from "../infra/skills-remote.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { applyVerboseOverride } from "../sessions/level-overrides.js"; import { applyModelOverrideToSessionEntry } from "../sessions/model-overrides.js"; import { resolveSendPolicy } from "../sessions/send-policy.js"; -import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { sanitizeForLog } from "../terminal/ansi.js"; import { resolveMessageChannel } from "../utils/message-channel.js"; import { @@ -63,23 +46,29 @@ import { } from "./agent-scope.js"; import { ensureAuthProfileStore } from "./auth-profiles.js"; import { clearSessionAuthProfileOverride } from "./auth-profiles/session-override.js"; -import { resolveBootstrapWarningSignaturesSeen } from "./bootstrap-budget.js"; -import { runCliAgent } from "./cli-runner.js"; -import { clearCliSession, getCliSessionBinding, setCliSessionBinding } from "./cli-session.js"; +import { + buildAcpResult, + createAcpVisibleTextAccumulator, + emitAcpAssistantDelta, + emitAcpLifecycleEnd, + emitAcpLifecycleError, + emitAcpLifecycleStart, + persistAcpTurnTranscript, + persistSessionEntry as persistSessionEntryBase, + prependInternalEventContext, + runAgentAttempt, +} from "./command/attempt-execution.js"; import { deliverAgentCommandResult } from "./command/delivery.js"; import { resolveAgentRunContext } from "./command/run-context.js"; import { updateSessionStoreAfterAgentRun } from "./command/session-store.js"; import { resolveSession } from "./command/session.js"; import type { AgentCommandIngressOpts, AgentCommandOpts } from "./command/types.js"; import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js"; -import { FailoverError } from "./failover-error.js"; -import { formatAgentInternalEventsForPrompt } from "./internal-events.js"; import { AGENT_LANE_SUBAGENT } from "./lanes.js"; import { loadModelCatalog } from "./model-catalog.js"; import { runWithModelFallback } from "./model-fallback.js"; import { buildAllowedModelSet, - isCliProvider, modelKey, normalizeModelRef, parseModelRef, @@ -87,14 +76,14 @@ import { resolveDefaultModelForAgent, resolveThinkingDefault, } from "./model-selection.js"; -import { prepareSessionManagerForRun } from "./pi-embedded-runner/session-manager-init.js"; -import { runEmbeddedPiAgent } from "./pi-embedded.js"; import { buildWorkspaceSkillSnapshot } from "./skills.js"; import { getSkillsSnapshotVersion } from "./skills/refresh.js"; import { normalizeSpawnedRunMetadata } from "./spawned-context.js"; import { resolveAgentTimeoutMs } from "./timeout.js"; import { ensureAgentWorkspace } from "./workspace.js"; +const log = createSubsystemLogger("agents/agent-command"); + type PersistSessionEntryParams = { sessionStore: Record; sessionKey: string; @@ -127,6 +116,13 @@ const OVERRIDE_FIELDS_CLEARED_BY_DELETE: OverrideFieldClearedByDelete[] = [ const OVERRIDE_VALUE_MAX_LENGTH = 256; +async function persistSessionEntry(params: PersistSessionEntryParams): Promise { + await persistSessionEntryBase({ + ...params, + clearedFields: OVERRIDE_FIELDS_CLEARED_BY_DELETE, + }); +} + function containsControlCharacters(value: string): boolean { for (const char of value) { const code = char.codePointAt(0); @@ -155,378 +151,6 @@ function normalizeExplicitOverrideInput(raw: string, kind: "provider" | "model") return trimmed; } -async function persistSessionEntry(params: PersistSessionEntryParams): Promise { - const persisted = await updateSessionStore(params.storePath, (store) => { - const merged = mergeSessionEntry(store[params.sessionKey], params.entry); - // Preserve explicit `delete` clears done by session override helpers. - for (const field of OVERRIDE_FIELDS_CLEARED_BY_DELETE) { - if (!Object.hasOwn(params.entry, field)) { - Reflect.deleteProperty(merged, field); - } - } - store[params.sessionKey] = merged; - return merged; - }); - params.sessionStore[params.sessionKey] = persisted; -} - -function resolveFallbackRetryPrompt(params: { body: string; isFallbackRetry: boolean }): string { - if (!params.isFallbackRetry) { - return params.body; - } - return "Continue where you left off. The previous model attempt failed or timed out."; -} - -function prependInternalEventContext( - body: string, - events: AgentCommandOpts["internalEvents"], -): string { - if (body.includes("OpenClaw runtime context (internal):")) { - return body; - } - const renderedEvents = formatAgentInternalEventsForPrompt(events); - if (!renderedEvents) { - return body; - } - return [renderedEvents, body].filter(Boolean).join("\n\n"); -} - -function createAcpVisibleTextAccumulator() { - let pendingSilentPrefix = ""; - let visibleText = ""; - const startsWithWordChar = (chunk: string): boolean => /^[\p{L}\p{N}]/u.test(chunk); - - const resolveNextCandidate = (base: string, chunk: string): string => { - if (!base) { - return chunk; - } - if ( - isSilentReplyText(base, SILENT_REPLY_TOKEN) && - !chunk.startsWith(base) && - startsWithWordChar(chunk) - ) { - return chunk; - } - // Some ACP backends emit cumulative snapshots even on text_delta-style hooks. - // Accept those only when they strictly extend the buffered text. - if (chunk.startsWith(base) && chunk.length > base.length) { - return chunk; - } - return `${base}${chunk}`; - }; - - const mergeVisibleChunk = (base: string, chunk: string): { text: string; delta: string } => { - if (!base) { - return { text: chunk, delta: chunk }; - } - if (chunk.startsWith(base) && chunk.length > base.length) { - const delta = chunk.slice(base.length); - return { text: chunk, delta }; - } - return { - text: `${base}${chunk}`, - delta: chunk, - }; - }; - - return { - consume(chunk: string): { text: string; delta: string } | null { - if (!chunk) { - return null; - } - - if (!visibleText) { - const leadCandidate = resolveNextCandidate(pendingSilentPrefix, chunk); - const trimmedLeadCandidate = leadCandidate.trim(); - if ( - isSilentReplyText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) || - isSilentReplyPrefixText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) - ) { - pendingSilentPrefix = leadCandidate; - return null; - } - if (pendingSilentPrefix) { - pendingSilentPrefix = ""; - visibleText = leadCandidate; - return { - text: visibleText, - delta: leadCandidate, - }; - } - } - - const nextVisible = mergeVisibleChunk(visibleText, chunk); - visibleText = nextVisible.text; - return nextVisible.delta ? nextVisible : null; - }, - finalize(): string { - return visibleText.trim(); - }, - finalizeRaw(): string { - return visibleText; - }, - }; -} - -const ACP_TRANSCRIPT_USAGE = { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - total: 0, - }, -} as const; - -async function persistAcpTurnTranscript(params: { - body: string; - finalText: string; - sessionId: string; - sessionKey: string; - sessionEntry: SessionEntry | undefined; - sessionStore?: Record; - storePath?: string; - sessionAgentId: string; - threadId?: string | number; - sessionCwd: string; -}): Promise { - const promptText = params.body; - const replyText = params.finalText; - if (!promptText && !replyText) { - return params.sessionEntry; - } - - const { sessionFile, sessionEntry } = await resolveSessionTranscriptFile({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - sessionEntry: params.sessionEntry, - sessionStore: params.sessionStore, - storePath: params.storePath, - agentId: params.sessionAgentId, - threadId: params.threadId, - }); - const hadSessionFile = await fs - .access(sessionFile) - .then(() => true) - .catch(() => false); - const sessionManager = SessionManager.open(sessionFile); - await prepareSessionManagerForRun({ - sessionManager, - sessionFile, - hadSessionFile, - sessionId: params.sessionId, - cwd: params.sessionCwd, - }); - - if (promptText) { - sessionManager.appendMessage({ - role: "user", - content: promptText, - timestamp: Date.now(), - }); - } - - if (replyText) { - sessionManager.appendMessage({ - role: "assistant", - content: [{ type: "text", text: replyText }], - api: "openai-responses", - provider: "openclaw", - model: "acp-runtime", - usage: ACP_TRANSCRIPT_USAGE, - stopReason: "stop", - timestamp: Date.now(), - }); - } - - emitSessionTranscriptUpdate(sessionFile); - return sessionEntry; -} - -function runAgentAttempt(params: { - providerOverride: string; - modelOverride: string; - cfg: ReturnType; - sessionEntry: SessionEntry | undefined; - sessionId: string; - sessionKey: string | undefined; - sessionAgentId: string; - sessionFile: string; - workspaceDir: string; - body: string; - isFallbackRetry: boolean; - resolvedThinkLevel: ThinkLevel; - timeoutMs: number; - runId: string; - opts: AgentCommandOpts & { senderIsOwner: boolean }; - runContext: ReturnType; - spawnedBy: string | undefined; - messageChannel: ReturnType; - skillsSnapshot: ReturnType | undefined; - resolvedVerboseLevel: VerboseLevel | undefined; - agentDir: string; - onAgentEvent: (evt: { stream: string; data?: Record }) => void; - authProfileProvider: string; - sessionStore?: Record; - storePath?: string; - allowTransientCooldownProbe?: boolean; -}) { - const effectivePrompt = resolveFallbackRetryPrompt({ - body: params.body, - isFallbackRetry: params.isFallbackRetry, - }); - const bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - params.sessionEntry?.systemPromptReport, - ); - const bootstrapPromptWarningSignature = - bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1]; - const authProfileId = - params.providerOverride === params.authProfileProvider - ? params.sessionEntry?.authProfileOverride - : undefined; - if (isCliProvider(params.providerOverride, params.cfg)) { - const cliSessionBinding = getCliSessionBinding(params.sessionEntry, params.providerOverride); - const runCliWithSession = (nextCliSessionId: string | undefined) => - runCliAgent({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - agentId: params.sessionAgentId, - sessionFile: params.sessionFile, - workspaceDir: params.workspaceDir, - config: params.cfg, - prompt: effectivePrompt, - provider: params.providerOverride, - model: params.modelOverride, - thinkLevel: params.resolvedThinkLevel, - timeoutMs: params.timeoutMs, - runId: params.runId, - extraSystemPrompt: params.opts.extraSystemPrompt, - cliSessionId: nextCliSessionId, - cliSessionBinding: - nextCliSessionId === cliSessionBinding?.sessionId ? cliSessionBinding : undefined, - authProfileId, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature, - images: params.isFallbackRetry ? undefined : params.opts.images, - streamParams: params.opts.streamParams, - }); - return runCliWithSession(cliSessionBinding?.sessionId).catch(async (err) => { - // Handle CLI session expired error - if ( - err instanceof FailoverError && - err.reason === "session_expired" && - cliSessionBinding?.sessionId && - params.sessionKey && - params.sessionStore && - params.storePath - ) { - log.warn( - `CLI session expired, clearing from session store: provider=${sanitizeForLog(params.providerOverride)} sessionKey=${params.sessionKey}`, - ); - - // Clear the expired session ID from the session store - const entry = params.sessionStore[params.sessionKey]; - if (entry) { - const updatedEntry = { ...entry }; - clearCliSession(updatedEntry, params.providerOverride); - updatedEntry.updatedAt = Date.now(); - - await persistSessionEntry({ - sessionStore: params.sessionStore, - sessionKey: params.sessionKey, - storePath: params.storePath, - entry: updatedEntry, - }); - - // Update the session entry reference - params.sessionEntry = updatedEntry; - } - - // Retry with no session ID (will create a new session) - return runCliWithSession(undefined).then(async (result) => { - // Update session store with new CLI session ID if available - if ( - result.meta.agentMeta?.cliSessionBinding?.sessionId && - params.sessionKey && - params.sessionStore && - params.storePath - ) { - const entry = params.sessionStore[params.sessionKey]; - if (entry) { - const updatedEntry = { ...entry }; - setCliSessionBinding( - updatedEntry, - params.providerOverride, - result.meta.agentMeta.cliSessionBinding, - ); - updatedEntry.updatedAt = Date.now(); - - await persistSessionEntry({ - sessionStore: params.sessionStore, - sessionKey: params.sessionKey, - storePath: params.storePath, - entry: updatedEntry, - }); - } - } - return result; - }); - } - throw err; - }); - } - - return runEmbeddedPiAgent({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - agentId: params.sessionAgentId, - trigger: "user", - messageChannel: params.messageChannel, - agentAccountId: params.runContext.accountId, - messageTo: params.opts.replyTo ?? params.opts.to, - messageThreadId: params.opts.threadId, - groupId: params.runContext.groupId, - groupChannel: params.runContext.groupChannel, - groupSpace: params.runContext.groupSpace, - spawnedBy: params.spawnedBy, - currentChannelId: params.runContext.currentChannelId, - currentThreadTs: params.runContext.currentThreadTs, - replyToMode: params.runContext.replyToMode, - hasRepliedRef: params.runContext.hasRepliedRef, - senderIsOwner: params.opts.senderIsOwner, - sessionFile: params.sessionFile, - workspaceDir: params.workspaceDir, - config: params.cfg, - skillsSnapshot: params.skillsSnapshot, - prompt: effectivePrompt, - images: params.isFallbackRetry ? undefined : params.opts.images, - clientTools: params.opts.clientTools, - provider: params.providerOverride, - model: params.modelOverride, - authProfileId, - authProfileIdSource: authProfileId ? params.sessionEntry?.authProfileOverrideSource : undefined, - thinkLevel: params.resolvedThinkLevel, - verboseLevel: params.resolvedVerboseLevel, - timeoutMs: params.timeoutMs, - runId: params.runId, - lane: params.opts.lane, - abortSignal: params.opts.abortSignal, - extraSystemPrompt: params.opts.extraSystemPrompt, - inputProvenance: params.opts.inputProvenance, - streamParams: params.opts.streamParams, - agentDir: params.agentDir, - allowTransientCooldownProbe: params.allowTransientCooldownProbe, - onAgentEvent: params.onAgentEvent, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature, - }); -} - async function prepareAgentCommandExecution( opts: AgentCommandOpts & { senderIsOwner: boolean }, runtime: RuntimeEnv, @@ -756,14 +380,7 @@ async function agentCommandInternal( registerAgentRunContext(runId, { sessionKey, }); - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "start", - startedAt, - }, - }); + emitAcpLifecycleStart({ runId, startedAt }); const visibleTextAccumulator = createAcpVisibleTextAccumulator(); let stopReason: string | undefined; @@ -805,13 +422,10 @@ async function agentCommandInternal( if (!visibleUpdate) { return; } - emitAgentEvent({ + emitAcpAssistantDelta({ runId, - stream: "assistant", - data: { - text: visibleUpdate.text, - delta: visibleUpdate.delta, - }, + text: visibleUpdate.text, + delta: visibleUpdate.delta, }); }, }); @@ -821,26 +435,14 @@ async function agentCommandInternal( fallbackCode: "ACP_TURN_FAILED", fallbackMessage: "ACP turn failed before completion.", }); - emitAgentEvent({ + emitAcpLifecycleError({ runId, - stream: "lifecycle", - data: { - phase: "error", - error: acpError.message, - endedAt: Date.now(), - }, + message: acpError.message, }); throw acpError; } - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "end", - endedAt: Date.now(), - }, - }); + emitAcpLifecycleEnd({ runId }); const finalTextRaw = visibleTextAccumulator.finalizeRaw(); const finalText = visibleTextAccumulator.finalize(); @@ -863,18 +465,13 @@ async function agentCommandInternal( ); } - const normalizedFinalPayload = normalizeReplyPayload({ - text: finalText, + const result = buildAcpResult({ + payloadText: finalText, + startedAt, + stopReason, + abortSignal: opts.abortSignal, }); - const payloads = normalizedFinalPayload ? [normalizedFinalPayload] : []; - const result = { - payloads, - meta: { - durationMs: Date.now() - startedAt, - aborted: opts.abortSignal?.aborted === true, - stopReason, - }, - }; + const payloads = result.payloads; return await deliverAgentCommandResult({ cfg, @@ -1131,7 +728,7 @@ async function agentCommandInternal( const startedAt = Date.now(); let lifecycleEnded = false; - let result: Awaited>; + let result: Awaited>; let fallbackProvider = provider; let fallbackModel = model; try { diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts new file mode 100644 index 00000000000..b96ebe4ed48 --- /dev/null +++ b/src/agents/command/attempt-execution.ts @@ -0,0 +1,470 @@ +import fs from "node:fs/promises"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js"; +import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js"; +import { + isSilentReplyPrefixText, + isSilentReplyText, + SILENT_REPLY_TOKEN, +} from "../../auto-reply/tokens.js"; +import { loadConfig } from "../../config/config.js"; +import { mergeSessionEntry, type SessionEntry, updateSessionStore } from "../../config/sessions.js"; +import { resolveSessionTranscriptFile } from "../../config/sessions/transcript.js"; +import { emitAgentEvent } from "../../infra/agent-events.js"; +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; +import { sanitizeForLog } from "../../terminal/ansi.js"; +import { resolveMessageChannel } from "../../utils/message-channel.js"; +import { resolveBootstrapWarningSignaturesSeen } from "../bootstrap-budget.js"; +import { runCliAgent } from "../cli-runner.js"; +import { clearCliSession, getCliSessionBinding, setCliSessionBinding } from "../cli-session.js"; +import { FailoverError } from "../failover-error.js"; +import { formatAgentInternalEventsForPrompt } from "../internal-events.js"; +import { isCliProvider } from "../model-selection.js"; +import { prepareSessionManagerForRun } from "../pi-embedded-runner/session-manager-init.js"; +import { runEmbeddedPiAgent } from "../pi-embedded.js"; +import { buildWorkspaceSkillSnapshot } from "../skills.js"; +import { resolveAgentRunContext } from "./run-context.js"; +import type { AgentCommandOpts } from "./types.js"; + +const log = createSubsystemLogger("agents/agent-command"); + +export type PersistSessionEntryParams = { + sessionStore: Record; + sessionKey: string; + storePath: string; + entry: SessionEntry; + clearedFields?: string[]; +}; + +export async function persistSessionEntry(params: PersistSessionEntryParams): Promise { + const persisted = await updateSessionStore(params.storePath, (store) => { + const merged = mergeSessionEntry(store[params.sessionKey], params.entry); + for (const field of params.clearedFields ?? []) { + if (!Object.hasOwn(params.entry, field)) { + Reflect.deleteProperty(merged, field); + } + } + store[params.sessionKey] = merged; + return merged; + }); + params.sessionStore[params.sessionKey] = persisted; +} + +export function resolveFallbackRetryPrompt(params: { + body: string; + isFallbackRetry: boolean; +}): string { + if (!params.isFallbackRetry) { + return params.body; + } + return "Continue where you left off. The previous model attempt failed or timed out."; +} + +export function prependInternalEventContext( + body: string, + events: AgentCommandOpts["internalEvents"], +): string { + if (body.includes("OpenClaw runtime context (internal):")) { + return body; + } + const renderedEvents = formatAgentInternalEventsForPrompt(events); + if (!renderedEvents) { + return body; + } + return [renderedEvents, body].filter(Boolean).join("\n\n"); +} + +export function createAcpVisibleTextAccumulator() { + let pendingSilentPrefix = ""; + let visibleText = ""; + const startsWithWordChar = (chunk: string): boolean => /^[\p{L}\p{N}]/u.test(chunk); + + const resolveNextCandidate = (base: string, chunk: string): string => { + if (!base) { + return chunk; + } + if ( + isSilentReplyText(base, SILENT_REPLY_TOKEN) && + !chunk.startsWith(base) && + startsWithWordChar(chunk) + ) { + return chunk; + } + if (chunk.startsWith(base) && chunk.length > base.length) { + return chunk; + } + return `${base}${chunk}`; + }; + + const mergeVisibleChunk = (base: string, chunk: string): { text: string; delta: string } => { + if (!base) { + return { text: chunk, delta: chunk }; + } + if (chunk.startsWith(base) && chunk.length > base.length) { + const delta = chunk.slice(base.length); + return { text: chunk, delta }; + } + return { + text: `${base}${chunk}`, + delta: chunk, + }; + }; + + return { + consume(chunk: string): { text: string; delta: string } | null { + if (!chunk) { + return null; + } + + if (!visibleText) { + const leadCandidate = resolveNextCandidate(pendingSilentPrefix, chunk); + const trimmedLeadCandidate = leadCandidate.trim(); + if ( + isSilentReplyText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) || + isSilentReplyPrefixText(trimmedLeadCandidate, SILENT_REPLY_TOKEN) + ) { + pendingSilentPrefix = leadCandidate; + return null; + } + if (pendingSilentPrefix) { + pendingSilentPrefix = ""; + visibleText = leadCandidate; + return { + text: visibleText, + delta: leadCandidate, + }; + } + } + + const nextVisible = mergeVisibleChunk(visibleText, chunk); + visibleText = nextVisible.text; + return nextVisible.delta ? nextVisible : null; + }, + finalize(): string { + return visibleText.trim(); + }, + finalizeRaw(): string { + return visibleText; + }, + }; +} + +const ACP_TRANSCRIPT_USAGE = { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, +} as const; + +export async function persistAcpTurnTranscript(params: { + body: string; + finalText: string; + sessionId: string; + sessionKey: string; + sessionEntry: SessionEntry | undefined; + sessionStore?: Record; + storePath?: string; + sessionAgentId: string; + threadId?: string | number; + sessionCwd: string; +}): Promise { + const promptText = params.body; + const replyText = params.finalText; + if (!promptText && !replyText) { + return params.sessionEntry; + } + + const { sessionFile, sessionEntry } = await resolveSessionTranscriptFile({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + storePath: params.storePath, + agentId: params.sessionAgentId, + threadId: params.threadId, + }); + const hadSessionFile = await fs + .access(sessionFile) + .then(() => true) + .catch(() => false); + const sessionManager = SessionManager.open(sessionFile); + await prepareSessionManagerForRun({ + sessionManager, + sessionFile, + hadSessionFile, + sessionId: params.sessionId, + cwd: params.sessionCwd, + }); + + if (promptText) { + sessionManager.appendMessage({ + role: "user", + content: promptText, + timestamp: Date.now(), + }); + } + + if (replyText) { + sessionManager.appendMessage({ + role: "assistant", + content: [{ type: "text", text: replyText }], + api: "openai-responses", + provider: "openclaw", + model: "acp-runtime", + usage: ACP_TRANSCRIPT_USAGE, + stopReason: "stop", + timestamp: Date.now(), + }); + } + + emitSessionTranscriptUpdate(sessionFile); + return sessionEntry; +} + +export function runAgentAttempt(params: { + providerOverride: string; + modelOverride: string; + cfg: ReturnType; + sessionEntry: SessionEntry | undefined; + sessionId: string; + sessionKey: string | undefined; + sessionAgentId: string; + sessionFile: string; + workspaceDir: string; + body: string; + isFallbackRetry: boolean; + resolvedThinkLevel: ThinkLevel; + timeoutMs: number; + runId: string; + opts: AgentCommandOpts & { senderIsOwner: boolean }; + runContext: ReturnType; + spawnedBy: string | undefined; + messageChannel: ReturnType; + skillsSnapshot: ReturnType | undefined; + resolvedVerboseLevel: VerboseLevel | undefined; + agentDir: string; + onAgentEvent: (evt: { stream: string; data?: Record }) => void; + authProfileProvider: string; + sessionStore?: Record; + storePath?: string; + allowTransientCooldownProbe?: boolean; +}) { + const effectivePrompt = resolveFallbackRetryPrompt({ + body: params.body, + isFallbackRetry: params.isFallbackRetry, + }); + const bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + params.sessionEntry?.systemPromptReport, + ); + const bootstrapPromptWarningSignature = + bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1]; + const authProfileId = + params.providerOverride === params.authProfileProvider + ? params.sessionEntry?.authProfileOverride + : undefined; + if (isCliProvider(params.providerOverride, params.cfg)) { + const cliSessionBinding = getCliSessionBinding(params.sessionEntry, params.providerOverride); + const runCliWithSession = (nextCliSessionId: string | undefined) => + runCliAgent({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + agentId: params.sessionAgentId, + sessionFile: params.sessionFile, + workspaceDir: params.workspaceDir, + config: params.cfg, + prompt: effectivePrompt, + provider: params.providerOverride, + model: params.modelOverride, + thinkLevel: params.resolvedThinkLevel, + timeoutMs: params.timeoutMs, + runId: params.runId, + extraSystemPrompt: params.opts.extraSystemPrompt, + cliSessionId: nextCliSessionId, + cliSessionBinding: + nextCliSessionId === cliSessionBinding?.sessionId ? cliSessionBinding : undefined, + authProfileId, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature, + images: params.isFallbackRetry ? undefined : params.opts.images, + streamParams: params.opts.streamParams, + }); + return runCliWithSession(cliSessionBinding?.sessionId).catch(async (err) => { + if ( + err instanceof FailoverError && + err.reason === "session_expired" && + cliSessionBinding?.sessionId && + params.sessionKey && + params.sessionStore && + params.storePath + ) { + log.warn( + `CLI session expired, clearing from session store: provider=${sanitizeForLog(params.providerOverride)} sessionKey=${params.sessionKey}`, + ); + + const entry = params.sessionStore[params.sessionKey]; + if (entry) { + const updatedEntry = { ...entry }; + clearCliSession(updatedEntry, params.providerOverride); + updatedEntry.updatedAt = Date.now(); + + await persistSessionEntry({ + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + entry: updatedEntry, + }); + + params.sessionEntry = updatedEntry; + } + + return runCliWithSession(undefined).then(async (result) => { + if ( + result.meta.agentMeta?.cliSessionBinding?.sessionId && + params.sessionKey && + params.sessionStore && + params.storePath + ) { + const entry = params.sessionStore[params.sessionKey]; + if (entry) { + const updatedEntry = { ...entry }; + setCliSessionBinding( + updatedEntry, + params.providerOverride, + result.meta.agentMeta.cliSessionBinding, + ); + updatedEntry.updatedAt = Date.now(); + + await persistSessionEntry({ + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + entry: updatedEntry, + }); + } + } + return result; + }); + } + throw err; + }); + } + + return runEmbeddedPiAgent({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + agentId: params.sessionAgentId, + trigger: "user", + messageChannel: params.messageChannel, + agentAccountId: params.runContext.accountId, + messageTo: params.opts.replyTo ?? params.opts.to, + messageThreadId: params.opts.threadId, + groupId: params.runContext.groupId, + groupChannel: params.runContext.groupChannel, + groupSpace: params.runContext.groupSpace, + spawnedBy: params.spawnedBy, + currentChannelId: params.runContext.currentChannelId, + currentThreadTs: params.runContext.currentThreadTs, + replyToMode: params.runContext.replyToMode, + hasRepliedRef: params.runContext.hasRepliedRef, + senderIsOwner: params.opts.senderIsOwner, + sessionFile: params.sessionFile, + workspaceDir: params.workspaceDir, + config: params.cfg, + skillsSnapshot: params.skillsSnapshot, + prompt: effectivePrompt, + images: params.isFallbackRetry ? undefined : params.opts.images, + clientTools: params.opts.clientTools, + provider: params.providerOverride, + model: params.modelOverride, + authProfileId, + authProfileIdSource: authProfileId ? params.sessionEntry?.authProfileOverrideSource : undefined, + thinkLevel: params.resolvedThinkLevel, + verboseLevel: params.resolvedVerboseLevel, + timeoutMs: params.timeoutMs, + runId: params.runId, + lane: params.opts.lane, + abortSignal: params.opts.abortSignal, + extraSystemPrompt: params.opts.extraSystemPrompt, + inputProvenance: params.opts.inputProvenance, + streamParams: params.opts.streamParams, + agentDir: params.agentDir, + allowTransientCooldownProbe: params.allowTransientCooldownProbe, + onAgentEvent: params.onAgentEvent, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature, + }); +} + +export function buildAcpResult(params: { + payloadText: string; + startedAt: number; + stopReason?: string; + abortSignal?: AbortSignal; +}) { + const normalizedFinalPayload = normalizeReplyPayload({ + text: params.payloadText, + }); + const payloads = normalizedFinalPayload ? [normalizedFinalPayload] : []; + return { + payloads, + meta: { + durationMs: Date.now() - params.startedAt, + aborted: params.abortSignal?.aborted === true, + stopReason: params.stopReason, + }, + }; +} + +export function emitAcpLifecycleStart(params: { runId: string; startedAt: number }) { + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "start", + startedAt: params.startedAt, + }, + }); +} + +export function emitAcpLifecycleEnd(params: { runId: string }) { + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "end", + endedAt: Date.now(), + }, + }); +} + +export function emitAcpLifecycleError(params: { runId: string; message: string }) { + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "error", + error: params.message, + endedAt: Date.now(), + }, + }); +} + +export function emitAcpAssistantDelta(params: { runId: string; text: string; delta: string }) { + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { + text: params.text, + delta: params.delta, + }, + }); +}