Files
openclaw/packages/agent-core/src/agent.ts
Peter Steinberger 85beee613c docs: clarify inline code comments
Comment-only follow-up documenting reusable gateway, auth, proxy, device, Talk, session, and agent helper contracts.\n\nVerification: git diff --check plus targeted tests recorded in PR body.
2026-05-31 14:37:41 +01:00

612 lines
20 KiB
TypeScript

import type {
ImageContent,
Message,
Model,
SimpleStreamOptions,
TextContent,
ThinkingBudgets,
Transport,
} from "../../llm-core/src/index.js";
import { runAgentLoop, runAgentLoopContinue } from "./agent-loop.js";
import { type AgentCoreStreamRuntimeDeps, resolveAgentCoreStreamFn } from "./runtime-deps.js";
import type {
AfterToolCallContext,
AfterToolCallResult,
AgentContext,
AgentEvent,
AgentLoopConfig,
AgentLoopTurnUpdate,
AgentMessage,
AgentState,
AgentTool,
BeforeToolCallContext,
BeforeToolCallResult,
QueueMode,
StreamFn,
ToolExecutionMode,
} from "./types.js";
export type { QueueMode } from "./types.js";
function defaultConvertToLlm(messages: AgentMessage[]): Message[] {
return messages.filter(
(message) =>
message.role === "user" || message.role === "assistant" || message.role === "toolResult",
);
}
const EMPTY_USAGE = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
};
const DEFAULT_MODEL = {
id: "unknown",
name: "unknown",
api: "unknown",
provider: "unknown",
baseUrl: "",
reasoning: false,
input: [],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 0,
maxTokens: 0,
} satisfies Model;
type MutableAgentState = Omit<
AgentState,
"isStreaming" | "streamingMessage" | "pendingToolCalls" | "errorMessage"
> & {
isStreaming: boolean;
streamingMessage?: AgentMessage;
pendingToolCalls: Set<string>;
errorMessage?: string;
};
function createMutableAgentState(
initialState?: Partial<
Omit<AgentState, "pendingToolCalls" | "isStreaming" | "streamingMessage" | "errorMessage">
>,
): MutableAgentState {
let tools = initialState?.tools?.slice() ?? [];
let messages = initialState?.messages?.slice() ?? [];
return {
systemPrompt: initialState?.systemPrompt ?? "",
model: initialState?.model ?? DEFAULT_MODEL,
thinkingLevel: initialState?.thinkingLevel ?? "off",
get tools() {
return tools;
},
set tools(nextTools: AgentTool[]) {
tools = nextTools.slice();
},
get messages() {
return messages;
},
set messages(nextMessages: AgentMessage[]) {
messages = nextMessages.slice();
},
isStreaming: false,
streamingMessage: undefined,
pendingToolCalls: new Set<string>(),
errorMessage: undefined,
};
}
/** Options for constructing an {@link Agent}. */
export interface AgentOptions {
/** Initial transcript, tools, model, and prompt state. */
initialState?: Partial<
Omit<AgentState, "pendingToolCalls" | "isStreaming" | "streamingMessage" | "errorMessage">
>;
/** Convert agent-owned transcript messages into provider-facing messages. */
convertToLlm?: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
/** Optionally rewrite context before each provider request. */
transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
/** Injected stream runtime used when streamFn is not supplied. */
runtime?: AgentCoreStreamRuntimeDeps;
/** Explicit stream implementation, preferred over runtime.streamSimple. */
streamFn?: StreamFn;
/** Resolve provider API keys at request time. */
getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
/** Inspect the provider payload before it is sent. */
onPayload?: SimpleStreamOptions["onPayload"];
/** Inspect the provider response after it returns. */
onResponse?: SimpleStreamOptions["onResponse"];
/** Hook that may short-circuit or alter a tool call before execution. */
beforeToolCall?: (
context: BeforeToolCallContext,
signal?: AbortSignal,
) => Promise<BeforeToolCallResult | undefined>;
/** Hook that may alter a tool result after execution. */
afterToolCall?: (
context: AfterToolCallContext,
signal?: AbortSignal,
) => Promise<AfterToolCallResult | undefined>;
/** Hook that may update model, reasoning, or context after a turn. */
prepareNextTurn?: (
signal?: AbortSignal,
) => Promise<AgentLoopTurnUpdate | undefined> | AgentLoopTurnUpdate | undefined;
/** Queue drain mode for steering messages injected before the next assistant response. */
steeringMode?: QueueMode;
/** Queue drain mode for follow-up messages injected after the agent would otherwise stop. */
followUpMode?: QueueMode;
/** Session identifier forwarded to cache-aware providers. */
sessionId?: string;
/** Optional per-thinking-level token budgets forwarded to providers. */
thinkingBudgets?: ThinkingBudgets;
/** Preferred provider transport. */
transport?: Transport;
/** Optional cap for provider-requested retry delays. */
maxRetryDelayMs?: number;
/** Default strategy for executing multiple tool calls in one assistant message. */
toolExecution?: ToolExecutionMode;
}
class PendingMessageQueue {
private messages: AgentMessage[] = [];
public mode: QueueMode;
constructor(mode: QueueMode) {
this.mode = mode;
}
enqueue(message: AgentMessage): void {
this.messages.push(message);
}
hasItems(): boolean {
return this.messages.length > 0;
}
drain(): AgentMessage[] {
if (this.mode === "all") {
const drained = this.messages.slice();
this.messages = [];
return drained;
}
// one-at-a-time preserves later queued messages for subsequent loop turns.
const first = this.messages[0];
if (!first) {
return [];
}
this.messages = this.messages.slice(1);
return [first];
}
clear(): void {
this.messages = [];
}
}
type ActiveRun = {
promise: Promise<void>;
resolve: () => void;
abortController: AbortController;
};
/**
* Stateful wrapper around the low-level agent loop.
*
* `Agent` owns the current transcript, emits lifecycle events, executes tools,
* and exposes queueing APIs for steering and follow-up messages.
*/
export class Agent {
private mutableState: MutableAgentState;
private readonly listeners = new Set<
(event: AgentEvent, signal: AbortSignal) => Promise<void> | void
>();
private readonly steeringQueue: PendingMessageQueue;
private readonly followUpQueue: PendingMessageQueue;
public convertToLlm: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
public transformContext?: (
messages: AgentMessage[],
signal?: AbortSignal,
) => Promise<AgentMessage[]>;
public runtime?: AgentCoreStreamRuntimeDeps;
public streamFn: StreamFn;
public getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
public onPayload?: SimpleStreamOptions["onPayload"];
public onResponse?: SimpleStreamOptions["onResponse"];
public beforeToolCall?: (
context: BeforeToolCallContext,
signal?: AbortSignal,
) => Promise<BeforeToolCallResult | undefined>;
public afterToolCall?: (
context: AfterToolCallContext,
signal?: AbortSignal,
) => Promise<AfterToolCallResult | undefined>;
public prepareNextTurn?: (
signal?: AbortSignal,
) => Promise<AgentLoopTurnUpdate | undefined> | AgentLoopTurnUpdate | undefined;
private activeRun?: ActiveRun;
/** Session identifier forwarded to providers for cache-aware backends. */
public sessionId?: string;
/** Optional per-level thinking token budgets forwarded to the stream function. */
public thinkingBudgets?: ThinkingBudgets;
/** Preferred transport forwarded to the stream function. */
public transport: Transport;
/** Optional cap for provider-requested retry delays. */
public maxRetryDelayMs?: number;
/** Tool execution strategy for assistant messages that contain multiple tool calls. */
public toolExecution: ToolExecutionMode;
constructor(options: AgentOptions = {}) {
this.mutableState = createMutableAgentState(options.initialState);
this.convertToLlm = options.convertToLlm ?? defaultConvertToLlm;
this.transformContext = options.transformContext;
this.runtime = options.runtime;
this.streamFn = resolveAgentCoreStreamFn(options.runtime, options.streamFn);
this.getApiKey = options.getApiKey;
this.onPayload = options.onPayload;
this.onResponse = options.onResponse;
this.beforeToolCall = options.beforeToolCall;
this.afterToolCall = options.afterToolCall;
this.prepareNextTurn = options.prepareNextTurn;
this.steeringQueue = new PendingMessageQueue(options.steeringMode ?? "one-at-a-time");
this.followUpQueue = new PendingMessageQueue(options.followUpMode ?? "one-at-a-time");
this.sessionId = options.sessionId;
this.thinkingBudgets = options.thinkingBudgets;
this.transport = options.transport ?? "auto";
this.maxRetryDelayMs = options.maxRetryDelayMs;
this.toolExecution = options.toolExecution ?? "parallel";
}
/**
* Subscribe to agent lifecycle events.
*
* Listener promises are awaited in subscription order and are included in
* the current run's settlement. Listeners also receive the active abort
* signal for the current run.
*
* `agent_end` is the final emitted event for a run, but the agent does not
* become idle until all awaited listeners for that event have settled.
*/
subscribe(
listener: (event: AgentEvent, signal: AbortSignal) => Promise<void> | void,
): () => void {
this.listeners.add(listener);
return () => this.listeners.delete(listener);
}
/**
* Current agent state.
*
* Assigning `state.tools` or `state.messages` copies the provided top-level array.
*/
get state(): AgentState {
return this.mutableState;
}
/** Controls how queued steering messages are drained. */
set steeringMode(mode: QueueMode) {
this.steeringQueue.mode = mode;
}
get steeringMode(): QueueMode {
return this.steeringQueue.mode;
}
/** Controls how queued follow-up messages are drained. */
set followUpMode(mode: QueueMode) {
this.followUpQueue.mode = mode;
}
get followUpMode(): QueueMode {
return this.followUpQueue.mode;
}
/** Queue a message to be injected after the current assistant turn finishes. */
steer(message: AgentMessage): void {
this.steeringQueue.enqueue(message);
}
/** Queue a message to run only after the agent would otherwise stop. */
followUp(message: AgentMessage): void {
this.followUpQueue.enqueue(message);
}
/** Remove all queued steering messages. */
clearSteeringQueue(): void {
this.steeringQueue.clear();
}
/** Remove all queued follow-up messages. */
clearFollowUpQueue(): void {
this.followUpQueue.clear();
}
/** Remove all queued steering and follow-up messages. */
clearAllQueues(): void {
this.clearSteeringQueue();
this.clearFollowUpQueue();
}
/** Returns true when either queue still contains pending messages. */
hasQueuedMessages(): boolean {
return this.steeringQueue.hasItems() || this.followUpQueue.hasItems();
}
/** Active abort signal for the current run, if any. */
get signal(): AbortSignal | undefined {
return this.activeRun?.abortController.signal;
}
/** Abort the current run, if one is active. */
abort(): void {
this.activeRun?.abortController.abort();
}
/**
* Resolve when the current run and all awaited event listeners have finished.
*
* This resolves after `agent_end` listeners settle.
*/
waitForIdle(): Promise<void> {
return this.activeRun?.promise ?? Promise.resolve();
}
/** Clear transcript state, runtime state, and queued messages. */
reset(): void {
this.mutableState.messages = [];
this.mutableState.isStreaming = false;
this.mutableState.streamingMessage = undefined;
this.mutableState.pendingToolCalls = new Set<string>();
this.mutableState.errorMessage = undefined;
this.clearFollowUpQueue();
this.clearSteeringQueue();
}
/** Start a new prompt from text, a single message, or a batch of messages. */
async prompt(message: AgentMessage | AgentMessage[]): Promise<void>;
async prompt(input: string, images?: ImageContent[]): Promise<void>;
async prompt(
input: string | AgentMessage | AgentMessage[],
images?: ImageContent[],
): Promise<void> {
if (this.activeRun) {
throw new Error(
"Agent is already processing a prompt. Use steer() or followUp() to queue messages, or wait for completion.",
);
}
const messages = this.normalizePromptInput(input, images);
await this.runPromptMessages(messages);
}
/** Continue from the current transcript. The last message must be a user or tool-result message. */
async continue(): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing. Wait for completion before continuing.");
}
const lastMessage = this.mutableState.messages[this.mutableState.messages.length - 1];
if (!lastMessage) {
throw new Error("No messages to continue from");
}
if (lastMessage.role === "assistant") {
const queuedSteering = this.steeringQueue.drain();
if (queuedSteering.length > 0) {
await this.runPromptMessages(queuedSteering, { skipInitialSteeringPoll: true });
return;
}
const queuedFollowUps = this.followUpQueue.drain();
if (queuedFollowUps.length > 0) {
await this.runPromptMessages(queuedFollowUps);
return;
}
throw new Error("Cannot continue from message role: assistant");
}
await this.runContinuation();
}
private normalizePromptInput(
input: string | AgentMessage | AgentMessage[],
images?: ImageContent[],
): AgentMessage[] {
if (Array.isArray(input)) {
return input;
}
if (typeof input !== "string") {
return [input];
}
const content: Array<TextContent | ImageContent> = [{ type: "text", text: input }];
if (images && images.length > 0) {
content.push(...images);
}
return [{ role: "user", content, timestamp: Date.now() }];
}
private async runPromptMessages(
messages: AgentMessage[],
options: { skipInitialSteeringPoll?: boolean } = {},
): Promise<void> {
await this.runWithLifecycle(async (signal) => {
await runAgentLoop(
messages,
this.createContextSnapshot(),
this.createLoopConfig(options),
(event) => this.processEvents(event),
signal,
this.streamFn,
);
});
}
private async runContinuation(): Promise<void> {
await this.runWithLifecycle(async (signal) => {
await runAgentLoopContinue(
this.createContextSnapshot(),
this.createLoopConfig(),
(event) => this.processEvents(event),
signal,
this.streamFn,
);
});
}
private createContextSnapshot(): AgentContext {
return {
systemPrompt: this.mutableState.systemPrompt,
messages: this.mutableState.messages.slice(),
tools: this.mutableState.tools.slice(),
};
}
private createLoopConfig(options: { skipInitialSteeringPoll?: boolean } = {}): AgentLoopConfig {
let skipInitialSteeringPoll = options.skipInitialSteeringPoll === true;
return {
model: this.mutableState.model,
reasoning:
this.mutableState.thinkingLevel === "off" ? undefined : this.mutableState.thinkingLevel,
sessionId: this.sessionId,
onPayload: this.onPayload,
onResponse: this.onResponse,
transport: this.transport,
thinkingBudgets: this.thinkingBudgets,
maxRetryDelayMs: this.maxRetryDelayMs,
toolExecution: this.toolExecution,
beforeToolCall: this.beforeToolCall,
afterToolCall: this.afterToolCall,
prepareNextTurn: this.prepareNextTurn
? async () => await this.prepareNextTurn?.(this.signal)
: undefined,
convertToLlm: this.convertToLlm,
transformContext: this.transformContext,
getApiKey: this.getApiKey,
getSteeringMessages: async () => {
if (skipInitialSteeringPoll) {
skipInitialSteeringPoll = false;
return [];
}
return this.steeringQueue.drain();
},
getFollowUpMessages: async () => this.followUpQueue.drain(),
};
}
private async runWithLifecycle(executor: (signal: AbortSignal) => Promise<void>): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing.");
}
const abortController = new AbortController();
let resolvePromise = () => {};
const promise = new Promise<void>((resolve) => {
resolvePromise = resolve;
});
this.activeRun = { promise, resolve: resolvePromise, abortController };
this.mutableState.isStreaming = true;
this.mutableState.streamingMessage = undefined;
this.mutableState.errorMessage = undefined;
try {
await executor(abortController.signal);
} catch (error) {
await this.handleRunFailure(error, abortController.signal.aborted);
} finally {
this.finishRun();
}
}
private async handleRunFailure(error: unknown, aborted: boolean): Promise<void> {
const failureMessage = {
role: "assistant",
content: [{ type: "text", text: "" }],
api: this.mutableState.model.api,
provider: this.mutableState.model.provider,
model: this.mutableState.model.id,
usage: EMPTY_USAGE,
stopReason: aborted ? "aborted" : "error",
errorMessage: error instanceof Error ? error.message : String(error),
timestamp: Date.now(),
} satisfies AgentMessage;
await this.processEvents({ type: "message_start", message: failureMessage });
await this.processEvents({ type: "message_end", message: failureMessage });
await this.processEvents({ type: "turn_end", message: failureMessage, toolResults: [] });
await this.processEvents({ type: "agent_end", messages: [failureMessage] });
}
private finishRun(): void {
this.mutableState.isStreaming = false;
this.mutableState.streamingMessage = undefined;
this.mutableState.pendingToolCalls = new Set<string>();
this.activeRun?.resolve();
this.activeRun = undefined;
}
/**
* Reduce internal state for a loop event, then await listeners.
*
* `agent_end` only means no further loop events will be emitted. The run is
* considered idle later, after all awaited listeners for `agent_end` finish
* and `finishRun()` clears runtime-owned state.
*/
private async processEvents(event: AgentEvent): Promise<void> {
switch (event.type) {
case "agent_start":
case "turn_start":
case "tool_execution_update":
break;
case "message_start":
this.mutableState.streamingMessage = event.message;
break;
case "message_update":
this.mutableState.streamingMessage = event.message;
break;
case "message_end":
this.mutableState.streamingMessage = undefined;
this.mutableState.messages.push(event.message);
break;
case "tool_execution_start": {
const pendingToolCalls = new Set(this.mutableState.pendingToolCalls);
pendingToolCalls.add(event.toolCallId);
this.mutableState.pendingToolCalls = pendingToolCalls;
break;
}
case "tool_execution_end": {
const pendingToolCalls = new Set(this.mutableState.pendingToolCalls);
pendingToolCalls.delete(event.toolCallId);
this.mutableState.pendingToolCalls = pendingToolCalls;
break;
}
case "turn_end":
if (event.message.role === "assistant" && event.message.errorMessage) {
this.mutableState.errorMessage = event.message.errorMessage;
}
break;
case "agent_end":
this.mutableState.streamingMessage = undefined;
break;
}
const signal = this.activeRun?.abortController.signal;
if (!signal) {
throw new Error("Agent listener invoked outside active run");
}
for (const listener of this.listeners) {
await listener(event, signal);
}
}
}