// Keep the runtime class on the package specifier so built agent-core shares // constructor identity with @openclaw/llm-core; source types keep SDK d.ts bundled. import { EventStream as LlmEventStream } from "@openclaw/llm-core"; import type { AssistantMessage, AssistantMessageEvent, Context, EventStream, ToolResultMessage, } from "../../llm-core/src/index.js"; import type { EventStream as SourceEventStream } from "../../llm-core/src/index.js"; import { resolveAgentReasoningOption } from "./reasoning.js"; import { type AgentCoreStreamRuntimeDeps, resolveAgentCoreStreamFn } from "./runtime-deps.js"; import type { AgentContext, AgentEvent, AgentLoopConfig, AgentMessage, AgentTool, AgentToolCall, AgentToolResult, StreamFn, } from "./types.js"; import { validateToolArguments } from "./validation.js"; /** Callback used by synchronous loop runners to publish agent lifecycle events. */ export type AgentEventSink = (event: AgentEvent) => Promise | void; const EMPTY_USAGE = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }; const EventStreamConstructor: typeof SourceEventStream = LlmEventStream; type AssistantMessageUpdateEvent = Extract< AssistantMessageEvent, { type: | "text_start" | "text_delta" | "text_end" | "thinking_start" | "thinking_delta" | "thinking_end" | "toolcall_start" | "toolcall_delta" | "toolcall_end"; } >; function appendTextDeltaToAssistantMessage( message: AssistantMessage, contentIndex: number, delta: string, ): AssistantMessage { const content = [...message.content]; const currentContent = content[contentIndex]; content[contentIndex] = currentContent?.type === "text" ? { ...currentContent, text: currentContent.text + delta } : { type: "text", text: delta }; return { ...message, content }; } function resolveAssistantMessageUpdate( event: AssistantMessageUpdateEvent, currentMessage: AssistantMessage, ): AssistantMessage { if ("partial" in event && event.partial) { return event.partial; } if (event.type === "text_delta") { return appendTextDeltaToAssistantMessage(currentMessage, event.contentIndex, event.delta); } return currentMessage; } /** * Start an agent loop with a new prompt message. * The prompt is added to the context and events are emitted for it. */ export function agentLoop( prompts: AgentMessage[], context: AgentContext, config: AgentLoopConfig, signal?: AbortSignal, streamFn?: StreamFn, runtime?: AgentCoreStreamRuntimeDeps, ): EventStream { const stream = createAgentStream(); void runAgentLoop( prompts, context, config, async (event) => { stream.push(event); }, signal, streamFn, runtime, ) .then((messages) => { stream.end(messages); }) .catch((error: unknown) => { pushLoopFailure(stream, config, error, signal?.aborted === true); }); return stream; } /** * Continue an agent loop from the current context without adding a new message. * Used for retries - context already has user message or tool results. * * **Important:** The last message in context must convert to a `user` or `toolResult` message * via `convertToLlm`. If it doesn't, the LLM provider will reject the request. * This cannot be validated here since `convertToLlm` is only called once per turn. */ export function agentLoopContinue( context: AgentContext, config: AgentLoopConfig, signal?: AbortSignal, streamFn?: StreamFn, runtime?: AgentCoreStreamRuntimeDeps, ): EventStream { if (context.messages.length === 0) { throw new Error("Cannot continue: no messages in context"); } if (context.messages[context.messages.length - 1].role === "assistant") { throw new Error("Cannot continue from message role: assistant"); } const stream = createAgentStream(); void runAgentLoopContinue( context, config, async (event) => { stream.push(event); }, signal, streamFn, runtime, ) .then((messages) => { stream.end(messages); }) .catch((error: unknown) => { pushLoopFailure(stream, config, error, signal?.aborted === true); }); return stream; } /** Run a prompt-started loop and emit events through a caller-owned sink. */ export async function runAgentLoop( prompts: AgentMessage[], context: AgentContext, config: AgentLoopConfig, emit: AgentEventSink, signal?: AbortSignal, streamFn?: StreamFn, runtime?: AgentCoreStreamRuntimeDeps, ): Promise { const newMessages: AgentMessage[] = [...prompts]; const currentContext: AgentContext = { ...context, messages: [...context.messages, ...prompts], }; await emit({ type: "agent_start" }); await emit({ type: "turn_start" }); for (const prompt of prompts) { await emit({ type: "message_start", message: prompt }); await emit({ type: "message_end", message: prompt }); } await runLoop(currentContext, newMessages, config, signal, emit, streamFn, runtime); return newMessages; } /** Continue an existing loop context and emit only newly produced messages. */ export async function runAgentLoopContinue( context: AgentContext, config: AgentLoopConfig, emit: AgentEventSink, signal?: AbortSignal, streamFn?: StreamFn, runtime?: AgentCoreStreamRuntimeDeps, ): Promise { if (context.messages.length === 0) { throw new Error("Cannot continue: no messages in context"); } if (context.messages[context.messages.length - 1].role === "assistant") { throw new Error("Cannot continue from message role: assistant"); } const newMessages: AgentMessage[] = []; const currentContext: AgentContext = { ...context }; await emit({ type: "agent_start" }); await emit({ type: "turn_start" }); await runLoop(currentContext, newMessages, config, signal, emit, streamFn, runtime); return newMessages; } function createAgentStream(): EventStream { return new EventStreamConstructor( (event: AgentEvent) => event.type === "agent_end", (event: AgentEvent) => (event.type === "agent_end" ? event.messages : []), ); } function createLoopFailureMessage( config: AgentLoopConfig, error: unknown, aborted: boolean, ): AssistantMessage { return { role: "assistant", content: [{ type: "text", text: "" }], api: config.model.api, provider: config.model.provider, model: config.model.id, usage: EMPTY_USAGE, stopReason: aborted ? "aborted" : "error", errorMessage: error instanceof Error ? error.message : String(error), timestamp: Date.now(), }; } function pushLoopFailure( stream: EventStream, config: AgentLoopConfig, error: unknown, aborted: boolean, ): void { const failureMessage = createLoopFailureMessage(config, error, aborted); stream.push({ type: "message_start", message: failureMessage }); stream.push({ type: "message_end", message: failureMessage }); stream.push({ type: "turn_end", message: failureMessage, toolResults: [] }); stream.push({ type: "agent_end", messages: [failureMessage] }); } /** * Main loop logic shared by agentLoop and agentLoopContinue. */ async function runLoop( initialContext: AgentContext, newMessages: AgentMessage[], initialConfig: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, streamFn?: StreamFn, runtime?: AgentCoreStreamRuntimeDeps, ): Promise { let currentContext = initialContext; let config = initialConfig; let firstTurn = true; let turnOpen = true; // Check for steering messages at start (user may have typed while waiting) let pendingMessages: AgentMessage[] = (await config.getSteeringMessages?.()) || []; const stopIfAborted = async (): Promise => { if (!signal?.aborted) { return false; } // Persist an aborted assistant outcome so session post-processing does not // compact or continue from the preceding toolUse message. const abortedMessage = createLoopFailureMessage( config, signal.reason instanceof Error ? signal.reason : new Error("Agent run aborted"), true, ); newMessages.push(abortedMessage); if (!turnOpen) { await emit({ type: "turn_start" }); turnOpen = true; } await emit({ type: "message_start", message: abortedMessage }); await emit({ type: "message_end", message: abortedMessage }); await emit({ type: "turn_end", message: abortedMessage, toolResults: [] }); turnOpen = false; await emit({ type: "agent_end", messages: newMessages }); return true; }; // Outer loop: continues when queued follow-up messages arrive after agent would stop while (true) { let hasMoreToolCalls = true; // Inner loop: process tool calls and steering messages while (hasMoreToolCalls || pendingMessages.length > 0) { if (await stopIfAborted()) { return; } if (!firstTurn) { await emit({ type: "turn_start" }); turnOpen = true; } else { firstTurn = false; } // Process pending messages (inject before next assistant response) if (pendingMessages.length > 0) { for (const message of pendingMessages) { await emit({ type: "message_start", message }); await emit({ type: "message_end", message }); currentContext.messages.push(message); newMessages.push(message); } } if (await stopIfAborted()) { return; } // Stream assistant response const message = await streamAssistantResponse( currentContext, config, signal, emit, streamFn, runtime, ); newMessages.push(message); if (message.stopReason === "error" || message.stopReason === "aborted") { await emit({ type: "turn_end", message, toolResults: [] }); await emit({ type: "agent_end", messages: newMessages }); return; } // Check for tool calls const toolCalls = message.content.filter((c) => c.type === "toolCall"); const toolResults: ToolResultMessage[] = []; hasMoreToolCalls = false; if (toolCalls.length > 0) { const executedToolBatch = await executeToolCalls( currentContext, message, config, signal, emit, ); toolResults.push(...executedToolBatch.messages); hasMoreToolCalls = !executedToolBatch.terminate; for (const result of toolResults) { currentContext.messages.push(result); newMessages.push(result); } } await emit({ type: "turn_end", message, toolResults }); turnOpen = false; if (await stopIfAborted()) { return; } const nextTurnContext = { message, toolResults, context: currentContext, newMessages, }; const nextTurnSnapshot = await config.prepareNextTurn?.(nextTurnContext); if (nextTurnSnapshot) { currentContext = nextTurnSnapshot.context ?? currentContext; const nextModel = nextTurnSnapshot.model ?? config.model; const nextThinkingLevel = nextTurnSnapshot.thinkingLevel ?? config.thinkingLevel; const shouldResolveReasoning = nextTurnSnapshot.thinkingLevel !== undefined || (nextTurnSnapshot.model !== undefined && nextThinkingLevel !== undefined); const nextReasoning = shouldResolveReasoning && nextThinkingLevel !== undefined ? resolveAgentReasoningOption(nextModel, nextThinkingLevel) : config.reasoning; config = Object.assign({}, config, { model: nextModel, thinkingLevel: nextThinkingLevel, reasoning: nextReasoning, }); } if (await stopIfAborted()) { return; } if ( await config.shouldStopAfterTurn?.({ message, toolResults, context: currentContext, newMessages, }) ) { await emit({ type: "agent_end", messages: newMessages }); return; } pendingMessages = (await config.getSteeringMessages?.()) || []; if (await stopIfAborted()) { return; } } const followUpMessages = (await config.getFollowUpMessages?.()) || []; if (followUpMessages.length > 0) { // Follow-up messages arrive after a turn would otherwise end; route them through the // same pending-message path so event ordering matches steering messages. pendingMessages = followUpMessages; continue; } // No more messages, exit break; } await emit({ type: "agent_end", messages: newMessages }); } /** * Stream an assistant response from the LLM. * This is where AgentMessage[] gets transformed to Message[] for the LLM. */ async function streamAssistantResponse( context: AgentContext, config: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, streamFn?: StreamFn, runtime?: AgentCoreStreamRuntimeDeps, ): Promise { // Apply context transform if configured (AgentMessage[] → AgentMessage[]) let messages = context.messages; if (config.transformContext) { messages = await config.transformContext(messages, signal); } // Convert to LLM-compatible messages (AgentMessage[] → Message[]) const llmMessages = await config.convertToLlm(messages); // Build LLM context const llmContext: Context = { systemPrompt: context.systemPrompt, messages: llmMessages, tools: context.tools, }; const streamFunction = resolveAgentCoreStreamFn(runtime, streamFn); // Resolve API key (important for expiring tokens) const resolvedApiKey = (config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) || config.apiKey; const response = await streamFunction(config.model, llmContext, { ...config, apiKey: resolvedApiKey, signal, }); let partialMessage: AssistantMessage | null = null; let addedPartial = false; for await (const event of response) { switch (event.type) { case "start": { const message = event.partial; partialMessage = message; context.messages.push(message); addedPartial = true; await emit({ type: "message_start", message: { ...message } }); break; } case "text_start": case "text_delta": case "text_end": case "thinking_start": case "thinking_delta": case "thinking_end": case "toolcall_start": case "toolcall_delta": case "toolcall_end": if (partialMessage) { const message = resolveAssistantMessageUpdate(event, partialMessage); partialMessage = message; context.messages[context.messages.length - 1] = message; await emit({ type: "message_update", assistantMessageEvent: event, message: { ...message }, }); } break; case "done": case "error": { const finalMessage = await response.result(); if (addedPartial) { context.messages[context.messages.length - 1] = finalMessage; } else { context.messages.push(finalMessage); } if (!addedPartial) { await emit({ type: "message_start", message: { ...finalMessage } }); } await emit({ type: "message_end", message: finalMessage }); return finalMessage; } } } const finalMessage = await response.result(); if (addedPartial) { context.messages[context.messages.length - 1] = finalMessage; } else { context.messages.push(finalMessage); await emit({ type: "message_start", message: { ...finalMessage } }); } await emit({ type: "message_end", message: finalMessage }); return finalMessage; } /** * Execute tool calls from an assistant message. */ async function executeToolCalls( currentContext: AgentContext, assistantMessage: AssistantMessage, config: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, ): Promise { const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall"); const resolvedToolCalls = new Map(); let hasSequentialToolCall = false; if (config.toolExecution !== "sequential") { for (const toolCall of toolCalls) { const resolution = await resolveToolCallTool( currentContext, assistantMessage, toolCall, config, signal, resolvedToolCalls, ); if (resolution.kind === "resolved" && resolution.tool?.executionMode === "sequential") { hasSequentialToolCall = true; break; } if (signal?.aborted) { break; } } } if (config.toolExecution === "sequential" || hasSequentialToolCall) { return executeToolCallsSequential( currentContext, assistantMessage, toolCalls, resolvedToolCalls, config, signal, emit, ); } return executeToolCallsParallel( currentContext, assistantMessage, toolCalls, resolvedToolCalls, config, signal, emit, ); } type ExecutedToolCallBatch = { messages: ToolResultMessage[]; terminate: boolean; }; type ResolvedToolCallOutcome = | { kind: "resolved"; tool?: AgentTool } | { kind: "error"; error: unknown }; async function executeToolCallsSequential( currentContext: AgentContext, assistantMessage: AssistantMessage, toolCalls: AgentToolCall[], resolvedToolCalls: Map, config: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, ): Promise { const finalizedCalls: FinalizedToolCallOutcome[] = []; const messages: ToolResultMessage[] = []; for (const toolCall of toolCalls) { await emit({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: toolCall.arguments, }); const preparation = await prepareToolCall( currentContext, assistantMessage, toolCall, config, signal, resolvedToolCalls, ); let finalized: FinalizedToolCallOutcome; if (preparation.kind === "immediate") { finalized = { toolCall, result: preparation.result, isError: preparation.isError, executionStarted: false, }; } else { const executed = await executePreparedToolCall(preparation, signal, emit); finalized = await finalizeExecutedToolCall( currentContext, assistantMessage, preparation, executed, config, signal, ); } await emitToolExecutionEnd(finalized, emit); const toolResultMessage = createToolResultMessage(finalized); await emitToolResultMessage(toolResultMessage, emit); finalizedCalls.push(finalized); messages.push(toolResultMessage); if (signal?.aborted) { break; } } return { messages, terminate: shouldTerminateToolBatch(finalizedCalls), }; } async function executeToolCallsParallel( currentContext: AgentContext, assistantMessage: AssistantMessage, toolCalls: AgentToolCall[], resolvedToolCalls: Map, config: AgentLoopConfig, signal: AbortSignal | undefined, emit: AgentEventSink, ): Promise { const finalizedCalls: FinalizedToolCallEntry[] = []; for (const toolCall of toolCalls) { await emit({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: toolCall.arguments, }); const preparation = await prepareToolCall( currentContext, assistantMessage, toolCall, config, signal, resolvedToolCalls, ); if (preparation.kind === "immediate") { const finalized = { toolCall, result: preparation.result, isError: preparation.isError, executionStarted: false, } satisfies FinalizedToolCallOutcome; await emitToolExecutionEnd(finalized, emit); finalizedCalls.push(finalized); if (signal?.aborted) { break; } continue; } finalizedCalls.push(async () => { const executed = await executePreparedToolCall(preparation, signal, emit); const finalized = await finalizeExecutedToolCall( currentContext, assistantMessage, preparation, executed, config, signal, ); await emitToolExecutionEnd(finalized, emit); return finalized; }); if (signal?.aborted) { break; } } const orderedFinalizedCalls = await Promise.all( finalizedCalls.map((entry) => (typeof entry === "function" ? entry() : Promise.resolve(entry))), ); const messages: ToolResultMessage[] = []; for (const finalized of orderedFinalizedCalls) { const toolResultMessage = createToolResultMessage(finalized); await emitToolResultMessage(toolResultMessage, emit); messages.push(toolResultMessage); } return { messages, terminate: shouldTerminateToolBatch(orderedFinalizedCalls), }; } type PreparedToolCall = { kind: "prepared"; toolCall: AgentToolCall; tool: AgentTool; args: unknown; }; type ImmediateToolCallOutcome = { kind: "immediate"; result: AgentToolResult; isError: boolean; }; type ExecutedToolCallOutcome = { result: AgentToolResult; isError: boolean; }; type FinalizedToolCallOutcome = { toolCall: AgentToolCall; result: AgentToolResult; isError: boolean; executionStarted: boolean; }; type FinalizedToolCallEntry = FinalizedToolCallOutcome | (() => Promise); function shouldTerminateToolBatch(finalizedCalls: FinalizedToolCallOutcome[]): boolean { return ( finalizedCalls.length > 0 && finalizedCalls.every((finalized) => finalized.result.terminate === true) ); } function prepareToolCallArguments(tool: AgentTool, toolCall: AgentToolCall): AgentToolCall { if (!tool.prepareArguments) { return toolCall; } const preparedArguments = tool.prepareArguments(toolCall.arguments); if (preparedArguments === toolCall.arguments) { return toolCall; } return { ...toolCall, arguments: preparedArguments as Record, }; } async function resolveToolCallTool( currentContext: AgentContext, assistantMessage: AssistantMessage, toolCall: AgentToolCall, config: AgentLoopConfig, signal: AbortSignal | undefined, resolvedToolCalls?: Map, ): Promise { const cached = resolvedToolCalls?.get(toolCall); if (cached) { return cached; } let resolution: ResolvedToolCallOutcome; try { let tool = currentContext.tools?.find((t) => t.name === toolCall.name); if (!tool) { const resolvedTool = await config.resolveDeferredTool?.( { assistantMessage, toolCall, context: currentContext, }, signal, ); // Keep execution and lifecycle/audit identity aligned with the original model call. if (resolvedTool && resolvedTool.name !== toolCall.name) { throw new Error( `Deferred tool resolver returned "${resolvedTool.name}" for requested "${toolCall.name}"`, ); } tool = resolvedTool; if (tool) { // Make the recovered tool visible to later provider continuations in this run. currentContext.tools = [...(currentContext.tools ?? []), tool]; } } resolution = { kind: "resolved", ...(tool ? { tool } : {}) }; } catch (error) { resolution = { kind: "error", error }; } resolvedToolCalls?.set(toolCall, resolution); return resolution; } async function prepareToolCall( currentContext: AgentContext, assistantMessage: AssistantMessage, toolCall: AgentToolCall, config: AgentLoopConfig, signal: AbortSignal | undefined, resolvedToolCalls: Map, ): Promise { const resolution = await resolveToolCallTool( currentContext, assistantMessage, toolCall, config, signal, resolvedToolCalls, ); if (resolution.kind === "error") { return { kind: "immediate", result: createErrorToolResult( signal?.aborted ? "Operation aborted" : resolution.error instanceof Error ? resolution.error.message : String(resolution.error), ), isError: true, }; } const tool = resolution.tool; if (!tool) { return { kind: "immediate", result: createErrorToolResult(`Tool ${toolCall.name} not found`), isError: true, }; } try { const preparedToolCall = prepareToolCallArguments(tool, toolCall); const validatedArgs = validateToolArguments(tool, preparedToolCall); if (config.beforeToolCall) { const beforeResult = await config.beforeToolCall( { assistantMessage, toolCall, args: validatedArgs, context: currentContext, }, signal, ); if (signal?.aborted) { return { kind: "immediate", result: createErrorToolResult("Operation aborted"), isError: true, }; } if (beforeResult?.block) { return { kind: "immediate", result: createErrorToolResult(beforeResult.reason || "Tool execution was blocked"), isError: true, }; } } if (signal?.aborted) { return { kind: "immediate", result: createErrorToolResult("Operation aborted"), isError: true, }; } return { kind: "prepared", toolCall, tool, args: validatedArgs, }; } catch (error) { return { kind: "immediate", result: createErrorToolResult(error instanceof Error ? error.message : String(error)), isError: true, }; } } async function executePreparedToolCall( prepared: PreparedToolCall, signal: AbortSignal | undefined, emit: AgentEventSink, ): Promise { const updateEvents: Promise[] = []; try { const result = await prepared.tool.execute( prepared.toolCall.id, prepared.args as never, signal, (partialResult) => { updateEvents.push( Promise.resolve( emit({ type: "tool_execution_update", toolCallId: prepared.toolCall.id, toolName: prepared.toolCall.name, args: prepared.toolCall.arguments, partialResult, }), ), ); }, ); await Promise.all(updateEvents); return { result, isError: false }; } catch (error) { await Promise.all(updateEvents); return { result: createErrorToolResult(error instanceof Error ? error.message : String(error)), isError: true, }; } } async function finalizeExecutedToolCall( currentContext: AgentContext, assistantMessage: AssistantMessage, prepared: PreparedToolCall, executed: ExecutedToolCallOutcome, config: AgentLoopConfig, signal: AbortSignal | undefined, ): Promise { let result = executed.result; let isError = executed.isError; if (config.afterToolCall) { try { const afterResult = await config.afterToolCall( { assistantMessage, toolCall: prepared.toolCall, args: prepared.args, result, isError, context: currentContext, }, signal, ); if (afterResult) { result = { content: afterResult.content ?? result.content, details: afterResult.details ?? result.details, terminate: afterResult.terminate ?? result.terminate, }; isError = afterResult.isError ?? isError; } } catch (error) { result = createErrorToolResult(error instanceof Error ? error.message : String(error)); isError = true; } } return { toolCall: prepared.toolCall, result, isError, executionStarted: true, }; } function createErrorToolResult(message: string): AgentToolResult { return { content: [{ type: "text", text: message }], details: {}, }; } async function emitToolExecutionEnd( finalized: FinalizedToolCallOutcome, emit: AgentEventSink, ): Promise { await emit({ type: "tool_execution_end", toolCallId: finalized.toolCall.id, toolName: finalized.toolCall.name, result: finalized.result, isError: finalized.isError, executionStarted: finalized.executionStarted, }); } function createToolResultMessage(finalized: FinalizedToolCallOutcome): ToolResultMessage { return { role: "toolResult", toolCallId: finalized.toolCall.id, toolName: finalized.toolCall.name, content: finalized.result.content, details: finalized.result.details, isError: finalized.isError, timestamp: Date.now(), }; } async function emitToolResultMessage( toolResultMessage: ToolResultMessage, emit: AgentEventSink, ): Promise { await emit({ type: "message_start", message: toolResultMessage }); await emit({ type: "message_end", message: toolResultMessage }); }