Files
openclaw/packages/agent-core/src/agent.ts
Peter Steinberger bb46b79d3c refactor: internalize OpenClaw agent runtime (#85341)
* refactor: extract agent core package

Introduce packages/agent-core as the OpenClaw-owned home for reusable agent loop, harness, session, prompt, and runtime dependency contracts.

* refactor: extract shared llm runtime

Move provider model registries, stream wrappers, OAuth helpers, and LLM utilities into src/llm with plugin-sdk barrels instead of depending on the old embedded runtime layout.

* refactor: remove pi runtime internals

Rename remaining Pi-shaped agent surfaces to OpenClaw agent runtime names, delete obsolete Pi docs and package graph checks, and add the third-party notice for incorporated code.

* refactor: tighten agent session runtime

Make agent-core/runtime dependencies explicit, consolidate compaction and session transcript helpers, and move model/session helpers behind OpenClaw-owned contracts.

* refactor: remove static model and pi auth paths

Drop static model catalogs and Pi auth bridges, move model/provider facts to manifest-owned runtime contracts, and harden internal embedded-agent utilities.

* refactor: remove legacy provider compat paths

* docs: remove agent parity notes

* fix: skip provider wildcard metadata parsing

* refactor: share session extension sdk loading

* refactor: inline acpx proxy error formatter

* refactor: fold edit recovery into edit tool

* fix: accept extension batch separator

* test: align startup provider plugin expectations

* fix: restore provider-scoped release discovery

* test: align static asset packaging expectations

* fix: run static provider catalogs during scoped discovery

* fix: add provider entry catalogs for scoped live discovery

* fix: load lightweight provider catalog entries

* fix: refresh provider-scoped plugin metadata

* fix: keep provider catalog entries on release live path

* fix: keep static manifest models in release live checks

* fix: harden release model discovery

* fix: reduce OpenAI live cache probe reasoning

* fix: disable OpenAI cache probe reasoning

* ci: extend OpenAI gateway live timeout

* fix: extend live gateway model budget

* fix: stabilize release validation regressions

* fix: honor provider aliases in model rows

* fix: stabilize release validation lanes

* fix: stabilize release memory qa

* ci: stabilize release validation lanes

* ci: prefer ipv4 for live docker node calls

* fix: restore shared tool-call stream wrapper

* ci: remove legacy pi test shard alias

* fix: clean up embedded agent test drift

* fix: stabilize runtime alias status

* fix: clean up embedded agent ci drift

* fix: restore release ci invariants

* fix: clean up post-rebase runtime drift

* fix: restore release ci checks

* fix: restore release ci after rebase

* fix: remove stale pi runtime path

* test: align compaction runtime expectations

* test: update plugin prerelease expectations

* fix: handle claude live tool approvals

* fix: stabilize release validation gates

* fix: finish agent runtime import

* test: finish post-rebase agent runtime mocks

* fix: keep codex compaction native

* fix: stabilize codex app-server hook tests

* test: isolate codex diagnostic active run

* test: remove codex diagnostic completion race

# Conflicts:
#	extensions/codex/src/app-server/run-attempt.test.ts

* ci: fix full release manifest performance run id

* refactor: narrow llm plugin sdk boundary

* chore: drop generated google boundary stamps

* fix: repair rebase fallout

* fix: clean up rebased runtime references

* fix: decode codex jwt payloads as base64url

* fix: preserve shipped pi runtime alias

* fix: add scoped sdk virtual modules

* fix: decode llm codex oauth jwt as base64url

* fix: avoid stale vertex adc negative cache

* fix: harden tool arg decoding and codeql path

* fix: keep vertex adc negative checks live

* refactor: consolidate codex jwt and edit helpers

* fix: await codex oauth node runtime imports

* fix: preserve sdk tool and notice contracts

* fix: preserve shipped compat config boundaries

* fix: align codex oauth callback host

* fix: terminate agent-core loop streams on failure

* fix: keep codex oauth callback alive during fallback

* ci: include session tools in critical codeql scans

* fix: keep Cloudflare Anthropic provider auth header

* docs: redirect legacy pi runtime pages

* fix: honor bundled web provider compat discovery

* fix: protect session output spill files

* fix: keep legacy agent dir env blocked

* fix: contain auto-discovered skill symlinks

* fix: harden agent core sdk proxy surfaces

* fix: restore approval reaction sdk compat

* fix: keep live docker runs bounded

* fix: keep codex oauth redirect host aligned

* fix: resolve post-rebase agent runtime drift

* fix: redact anthropic oauth parse failures

* fix: preserve responses strict tool shaping

* fix: repair agent runtime rebase cleanup

* docs: redirect retired parity pages

* fix: bound auto-discovered resources to roots

* fix: repair post-rebase agent test drift

* fix: preserve bundled provider allowlist migration

* fix: preserve manifest-owned provider aliases

* fix: declare photon image dependency

* fix: keep provider headers out of proxy body

* fix: preserve shipped env aliases

* fix: refresh control ui i18n generated state

* fix: quote read fallback paths

* fix: preview edits through configured backend

* test: satisfy core test typecheck

* fix: preserve ZAI usage auth fallback

* test: repair codex diagnostic test

* fix: repair agent runtime rebase drift

* test: finish embedded runner import rename

* fix: repair agent runtime rebase integrations

* test: align compaction oauth fallback expectations

* fix: allow sdk-auth session models

* fix: update doctor tool schema import

* fix: preserve bedrock plugin region

* fix: stream harmony-like prose immediately

* ci: include session runtime in codeql shards

* fix: repair latest rebase integrations

* fix: honor explicit codex websocket transport

* fix: keep openai-compatible credentials provider-scoped

* fix: refresh sdk api baseline after rebase

* fix: route cli runtime aliases through openclaw harness

* test: rename stale harness mock expectation

* test: rename embedded agent overflow calls

* test: clean embedded auth test wording

* test: use openclaw stream types in deepinfra cache test

* fix: refresh sdk api baseline on latest main

* fix: honor bundled discovery compat allowlists

* fix: refresh sdk api baseline after latest rebase

* fix: remove stale rebase imports

* test: rename stale model catalog mock

* test: mock renamed doctor runtime modules

* fix: map canonical kimi env auth

* fix: use internal model registry in bench script

* fix: migrate deepinfra provider catalog entry

* fix: enforce builtin tool suppression

* fix: route compaction auth and proxy payloads safely

* refactor: prune unused llm registry leftovers

* test: update codex hooks session import

* test: fix model picker ci coverage

* test: align model picker auth mock types
2026-05-27 19:24:04 +01:00

593 lines
18 KiB
TypeScript

import { runAgentLoop, runAgentLoopContinue } from "./agent-loop.js";
import {
type ImageContent,
type Message,
type Model,
type SimpleStreamOptions,
type TextContent,
type ThinkingBudgets,
type Transport,
} from "./llm.js";
import { type AgentCoreStreamRuntimeDeps, resolveAgentCoreStreamFn } from "./runtime-deps.js";
import type {
AfterToolCallContext,
AfterToolCallResult,
AgentContext,
AgentEvent,
AgentLoopConfig,
AgentLoopTurnUpdate,
AgentMessage,
AgentState,
AgentTool,
BeforeToolCallContext,
BeforeToolCallResult,
QueueMode,
StreamFn,
ToolExecutionMode,
} from "./types.js";
export type { QueueMode } from "./types.js";
function defaultConvertToLlm(messages: AgentMessage[]): Message[] {
return messages.filter(
(message) =>
message.role === "user" || message.role === "assistant" || message.role === "toolResult",
);
}
const EMPTY_USAGE = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
};
const DEFAULT_MODEL = {
id: "unknown",
name: "unknown",
api: "unknown",
provider: "unknown",
baseUrl: "",
reasoning: false,
input: [],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 0,
maxTokens: 0,
} satisfies Model;
type MutableAgentState = Omit<
AgentState,
"isStreaming" | "streamingMessage" | "pendingToolCalls" | "errorMessage"
> & {
isStreaming: boolean;
streamingMessage?: AgentMessage;
pendingToolCalls: Set<string>;
errorMessage?: string;
};
function createMutableAgentState(
initialState?: Partial<
Omit<AgentState, "pendingToolCalls" | "isStreaming" | "streamingMessage" | "errorMessage">
>,
): MutableAgentState {
let tools = initialState?.tools?.slice() ?? [];
let messages = initialState?.messages?.slice() ?? [];
return {
systemPrompt: initialState?.systemPrompt ?? "",
model: initialState?.model ?? DEFAULT_MODEL,
thinkingLevel: initialState?.thinkingLevel ?? "off",
get tools() {
return tools;
},
set tools(nextTools: AgentTool[]) {
tools = nextTools.slice();
},
get messages() {
return messages;
},
set messages(nextMessages: AgentMessage[]) {
messages = nextMessages.slice();
},
isStreaming: false,
streamingMessage: undefined,
pendingToolCalls: new Set<string>(),
errorMessage: undefined,
};
}
/** Options for constructing an {@link Agent}. */
export interface AgentOptions {
initialState?: Partial<
Omit<AgentState, "pendingToolCalls" | "isStreaming" | "streamingMessage" | "errorMessage">
>;
convertToLlm?: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
runtime?: AgentCoreStreamRuntimeDeps;
streamFn?: StreamFn;
getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
onPayload?: SimpleStreamOptions["onPayload"];
onResponse?: SimpleStreamOptions["onResponse"];
beforeToolCall?: (
context: BeforeToolCallContext,
signal?: AbortSignal,
) => Promise<BeforeToolCallResult | undefined>;
afterToolCall?: (
context: AfterToolCallContext,
signal?: AbortSignal,
) => Promise<AfterToolCallResult | undefined>;
prepareNextTurn?: (
signal?: AbortSignal,
) => Promise<AgentLoopTurnUpdate | undefined> | AgentLoopTurnUpdate | undefined;
steeringMode?: QueueMode;
followUpMode?: QueueMode;
sessionId?: string;
thinkingBudgets?: ThinkingBudgets;
transport?: Transport;
maxRetryDelayMs?: number;
toolExecution?: ToolExecutionMode;
}
class PendingMessageQueue {
private messages: AgentMessage[] = [];
public mode: QueueMode;
constructor(mode: QueueMode) {
this.mode = mode;
}
enqueue(message: AgentMessage): void {
this.messages.push(message);
}
hasItems(): boolean {
return this.messages.length > 0;
}
drain(): AgentMessage[] {
if (this.mode === "all") {
const drained = this.messages.slice();
this.messages = [];
return drained;
}
const first = this.messages[0];
if (!first) {
return [];
}
this.messages = this.messages.slice(1);
return [first];
}
clear(): void {
this.messages = [];
}
}
type ActiveRun = {
promise: Promise<void>;
resolve: () => void;
abortController: AbortController;
};
/**
* Stateful wrapper around the low-level agent loop.
*
* `Agent` owns the current transcript, emits lifecycle events, executes tools,
* and exposes queueing APIs for steering and follow-up messages.
*/
export class Agent {
private mutableState: MutableAgentState;
private readonly listeners = new Set<
(event: AgentEvent, signal: AbortSignal) => Promise<void> | void
>();
private readonly steeringQueue: PendingMessageQueue;
private readonly followUpQueue: PendingMessageQueue;
public convertToLlm: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
public transformContext?: (
messages: AgentMessage[],
signal?: AbortSignal,
) => Promise<AgentMessage[]>;
public runtime?: AgentCoreStreamRuntimeDeps;
public streamFn: StreamFn;
public getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
public onPayload?: SimpleStreamOptions["onPayload"];
public onResponse?: SimpleStreamOptions["onResponse"];
public beforeToolCall?: (
context: BeforeToolCallContext,
signal?: AbortSignal,
) => Promise<BeforeToolCallResult | undefined>;
public afterToolCall?: (
context: AfterToolCallContext,
signal?: AbortSignal,
) => Promise<AfterToolCallResult | undefined>;
public prepareNextTurn?: (
signal?: AbortSignal,
) => Promise<AgentLoopTurnUpdate | undefined> | AgentLoopTurnUpdate | undefined;
private activeRun?: ActiveRun;
/** Session identifier forwarded to providers for cache-aware backends. */
public sessionId?: string;
/** Optional per-level thinking token budgets forwarded to the stream function. */
public thinkingBudgets?: ThinkingBudgets;
/** Preferred transport forwarded to the stream function. */
public transport: Transport;
/** Optional cap for provider-requested retry delays. */
public maxRetryDelayMs?: number;
/** Tool execution strategy for assistant messages that contain multiple tool calls. */
public toolExecution: ToolExecutionMode;
constructor(options: AgentOptions = {}) {
this.mutableState = createMutableAgentState(options.initialState);
this.convertToLlm = options.convertToLlm ?? defaultConvertToLlm;
this.transformContext = options.transformContext;
this.runtime = options.runtime;
this.streamFn = resolveAgentCoreStreamFn(options.runtime, options.streamFn);
this.getApiKey = options.getApiKey;
this.onPayload = options.onPayload;
this.onResponse = options.onResponse;
this.beforeToolCall = options.beforeToolCall;
this.afterToolCall = options.afterToolCall;
this.prepareNextTurn = options.prepareNextTurn;
this.steeringQueue = new PendingMessageQueue(options.steeringMode ?? "one-at-a-time");
this.followUpQueue = new PendingMessageQueue(options.followUpMode ?? "one-at-a-time");
this.sessionId = options.sessionId;
this.thinkingBudgets = options.thinkingBudgets;
this.transport = options.transport ?? "auto";
this.maxRetryDelayMs = options.maxRetryDelayMs;
this.toolExecution = options.toolExecution ?? "parallel";
}
/**
* Subscribe to agent lifecycle events.
*
* Listener promises are awaited in subscription order and are included in
* the current run's settlement. Listeners also receive the active abort
* signal for the current run.
*
* `agent_end` is the final emitted event for a run, but the agent does not
* become idle until all awaited listeners for that event have settled.
*/
subscribe(
listener: (event: AgentEvent, signal: AbortSignal) => Promise<void> | void,
): () => void {
this.listeners.add(listener);
return () => this.listeners.delete(listener);
}
/**
* Current agent state.
*
* Assigning `state.tools` or `state.messages` copies the provided top-level array.
*/
get state(): AgentState {
return this.mutableState;
}
/** Controls how queued steering messages are drained. */
set steeringMode(mode: QueueMode) {
this.steeringQueue.mode = mode;
}
get steeringMode(): QueueMode {
return this.steeringQueue.mode;
}
/** Controls how queued follow-up messages are drained. */
set followUpMode(mode: QueueMode) {
this.followUpQueue.mode = mode;
}
get followUpMode(): QueueMode {
return this.followUpQueue.mode;
}
/** Queue a message to be injected after the current assistant turn finishes. */
steer(message: AgentMessage): void {
this.steeringQueue.enqueue(message);
}
/** Queue a message to run only after the agent would otherwise stop. */
followUp(message: AgentMessage): void {
this.followUpQueue.enqueue(message);
}
/** Remove all queued steering messages. */
clearSteeringQueue(): void {
this.steeringQueue.clear();
}
/** Remove all queued follow-up messages. */
clearFollowUpQueue(): void {
this.followUpQueue.clear();
}
/** Remove all queued steering and follow-up messages. */
clearAllQueues(): void {
this.clearSteeringQueue();
this.clearFollowUpQueue();
}
/** Returns true when either queue still contains pending messages. */
hasQueuedMessages(): boolean {
return this.steeringQueue.hasItems() || this.followUpQueue.hasItems();
}
/** Active abort signal for the current run, if any. */
get signal(): AbortSignal | undefined {
return this.activeRun?.abortController.signal;
}
/** Abort the current run, if one is active. */
abort(): void {
this.activeRun?.abortController.abort();
}
/**
* Resolve when the current run and all awaited event listeners have finished.
*
* This resolves after `agent_end` listeners settle.
*/
waitForIdle(): Promise<void> {
return this.activeRun?.promise ?? Promise.resolve();
}
/** Clear transcript state, runtime state, and queued messages. */
reset(): void {
this.mutableState.messages = [];
this.mutableState.isStreaming = false;
this.mutableState.streamingMessage = undefined;
this.mutableState.pendingToolCalls = new Set<string>();
this.mutableState.errorMessage = undefined;
this.clearFollowUpQueue();
this.clearSteeringQueue();
}
/** Start a new prompt from text, a single message, or a batch of messages. */
async prompt(message: AgentMessage | AgentMessage[]): Promise<void>;
async prompt(input: string, images?: ImageContent[]): Promise<void>;
async prompt(
input: string | AgentMessage | AgentMessage[],
images?: ImageContent[],
): Promise<void> {
if (this.activeRun) {
throw new Error(
"Agent is already processing a prompt. Use steer() or followUp() to queue messages, or wait for completion.",
);
}
const messages = this.normalizePromptInput(input, images);
await this.runPromptMessages(messages);
}
/** Continue from the current transcript. The last message must be a user or tool-result message. */
async continue(): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing. Wait for completion before continuing.");
}
const lastMessage = this.mutableState.messages[this.mutableState.messages.length - 1];
if (!lastMessage) {
throw new Error("No messages to continue from");
}
if (lastMessage.role === "assistant") {
const queuedSteering = this.steeringQueue.drain();
if (queuedSteering.length > 0) {
await this.runPromptMessages(queuedSteering, { skipInitialSteeringPoll: true });
return;
}
const queuedFollowUps = this.followUpQueue.drain();
if (queuedFollowUps.length > 0) {
await this.runPromptMessages(queuedFollowUps);
return;
}
throw new Error("Cannot continue from message role: assistant");
}
await this.runContinuation();
}
private normalizePromptInput(
input: string | AgentMessage | AgentMessage[],
images?: ImageContent[],
): AgentMessage[] {
if (Array.isArray(input)) {
return input;
}
if (typeof input !== "string") {
return [input];
}
const content: Array<TextContent | ImageContent> = [{ type: "text", text: input }];
if (images && images.length > 0) {
content.push(...images);
}
return [{ role: "user", content, timestamp: Date.now() }];
}
private async runPromptMessages(
messages: AgentMessage[],
options: { skipInitialSteeringPoll?: boolean } = {},
): Promise<void> {
await this.runWithLifecycle(async (signal) => {
await runAgentLoop(
messages,
this.createContextSnapshot(),
this.createLoopConfig(options),
(event) => this.processEvents(event),
signal,
this.streamFn,
);
});
}
private async runContinuation(): Promise<void> {
await this.runWithLifecycle(async (signal) => {
await runAgentLoopContinue(
this.createContextSnapshot(),
this.createLoopConfig(),
(event) => this.processEvents(event),
signal,
this.streamFn,
);
});
}
private createContextSnapshot(): AgentContext {
return {
systemPrompt: this.mutableState.systemPrompt,
messages: this.mutableState.messages.slice(),
tools: this.mutableState.tools.slice(),
};
}
private createLoopConfig(options: { skipInitialSteeringPoll?: boolean } = {}): AgentLoopConfig {
let skipInitialSteeringPoll = options.skipInitialSteeringPoll === true;
return {
model: this.mutableState.model,
reasoning:
this.mutableState.thinkingLevel === "off" ? undefined : this.mutableState.thinkingLevel,
sessionId: this.sessionId,
onPayload: this.onPayload,
onResponse: this.onResponse,
transport: this.transport,
thinkingBudgets: this.thinkingBudgets,
maxRetryDelayMs: this.maxRetryDelayMs,
toolExecution: this.toolExecution,
beforeToolCall: this.beforeToolCall,
afterToolCall: this.afterToolCall,
prepareNextTurn: this.prepareNextTurn
? async () => await this.prepareNextTurn?.(this.signal)
: undefined,
convertToLlm: this.convertToLlm,
transformContext: this.transformContext,
getApiKey: this.getApiKey,
getSteeringMessages: async () => {
if (skipInitialSteeringPoll) {
skipInitialSteeringPoll = false;
return [];
}
return this.steeringQueue.drain();
},
getFollowUpMessages: async () => this.followUpQueue.drain(),
};
}
private async runWithLifecycle(executor: (signal: AbortSignal) => Promise<void>): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing.");
}
const abortController = new AbortController();
let resolvePromise = () => {};
const promise = new Promise<void>((resolve) => {
resolvePromise = resolve;
});
this.activeRun = { promise, resolve: resolvePromise, abortController };
this.mutableState.isStreaming = true;
this.mutableState.streamingMessage = undefined;
this.mutableState.errorMessage = undefined;
try {
await executor(abortController.signal);
} catch (error) {
await this.handleRunFailure(error, abortController.signal.aborted);
} finally {
this.finishRun();
}
}
private async handleRunFailure(error: unknown, aborted: boolean): Promise<void> {
const failureMessage = {
role: "assistant",
content: [{ type: "text", text: "" }],
api: this.mutableState.model.api,
provider: this.mutableState.model.provider,
model: this.mutableState.model.id,
usage: EMPTY_USAGE,
stopReason: aborted ? "aborted" : "error",
errorMessage: error instanceof Error ? error.message : String(error),
timestamp: Date.now(),
} satisfies AgentMessage;
await this.processEvents({ type: "message_start", message: failureMessage });
await this.processEvents({ type: "message_end", message: failureMessage });
await this.processEvents({ type: "turn_end", message: failureMessage, toolResults: [] });
await this.processEvents({ type: "agent_end", messages: [failureMessage] });
}
private finishRun(): void {
this.mutableState.isStreaming = false;
this.mutableState.streamingMessage = undefined;
this.mutableState.pendingToolCalls = new Set<string>();
this.activeRun?.resolve();
this.activeRun = undefined;
}
/**
* Reduce internal state for a loop event, then await listeners.
*
* `agent_end` only means no further loop events will be emitted. The run is
* considered idle later, after all awaited listeners for `agent_end` finish
* and `finishRun()` clears runtime-owned state.
*/
private async processEvents(event: AgentEvent): Promise<void> {
switch (event.type) {
case "agent_start":
case "turn_start":
case "tool_execution_update":
break;
case "message_start":
this.mutableState.streamingMessage = event.message;
break;
case "message_update":
this.mutableState.streamingMessage = event.message;
break;
case "message_end":
this.mutableState.streamingMessage = undefined;
this.mutableState.messages.push(event.message);
break;
case "tool_execution_start": {
const pendingToolCalls = new Set(this.mutableState.pendingToolCalls);
pendingToolCalls.add(event.toolCallId);
this.mutableState.pendingToolCalls = pendingToolCalls;
break;
}
case "tool_execution_end": {
const pendingToolCalls = new Set(this.mutableState.pendingToolCalls);
pendingToolCalls.delete(event.toolCallId);
this.mutableState.pendingToolCalls = pendingToolCalls;
break;
}
case "turn_end":
if (event.message.role === "assistant" && event.message.errorMessage) {
this.mutableState.errorMessage = event.message.errorMessage;
}
break;
case "agent_end":
this.mutableState.streamingMessage = undefined;
break;
}
const signal = this.activeRun?.abortController.signal;
if (!signal) {
throw new Error("Agent listener invoked outside active run");
}
for (const listener of this.listeners) {
await listener(event, signal);
}
}
}