diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ddeac6e990..1809d8d70e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ Docs: https://docs.openclaw.ai - Gateway/watch: suppress sync-I/O trace output during `pnpm gateway:watch --benchmark` unless explicitly requested, so CPU profiling no longer floods the terminal with stack traces. - Gateway/watch: when benchmark sync-I/O tracing is explicitly enabled, tee trace blocks to the benchmark output log and filter them from the terminal pane while keeping normal Gateway logs visible. - Plugins/runtime-deps: include `json5` in the memory-core plugin runtime dependency set so packaged `memory_search` sandboxes can resolve generated OpenClaw runtime chunks that parse JSON5 config. Fixes #77461. +- Codex harness: preserve app-server usage-limit reset details and deliver OpenClaw-owned runtime failure notices through tool-only source-reply mode, so Telegram and other chat channels tell users when Codex subscription limits or API failures block a turn instead of going silent. - Agents/OpenAI: default direct OpenAI Responses models to the SSE transport instead of WebSocket auto-selection, preventing pi runtime chat turns from hanging on servers where the WebSocket path stalls while the OpenAI HTTP stream works. Thanks @vincentkoc. - Discord: prefer IPv4 for Discord REST and gateway WebSocket startup paths so IPv4-only networks no longer stall before Gateway READY and inbound message dispatch. Fixes #77398; refs #77526. Thanks @Beandon13. - Channels/plugins: key bundled package-state probes, env/config presence, and read-only command defaults by channel id instead of manifest plugin id, preserving setup and native-command detection for channel plugins whose package id differs from the channel alias. Thanks @vincentkoc. diff --git a/docs/plugins/codex-harness.md b/docs/plugins/codex-harness.md index 06953a0e8be..5c4d229be79 100644 --- a/docs/plugins/codex-harness.md +++ b/docs/plugins/codex-harness.md @@ -845,6 +845,10 @@ Common forms: - `/codex mcp` lists Codex app-server MCP server status. - `/codex skills` lists Codex app-server skills. +When Codex reports a usage-limit failure, OpenClaw includes the next +app-server reset time when Codex provided one. Use `/codex account` in the same +conversation to inspect the current account and rate-limit windows. + ### Common debugging workflow When a Codex-backed agent does something surprising in Telegram, Discord, Slack, diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index 631f0228017..0ad6ff06c53 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -14,6 +14,7 @@ import { CodexAppServerEventProjector, type CodexAppServerToolTelemetry, } from "./event-projector.js"; +import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js"; import { createCodexTestModel } from "./test-support.js"; const THREAD_ID = "thread-1"; @@ -86,6 +87,7 @@ beforeEach(() => { afterEach(async () => { resetAgentEventsForTest(); resetGlobalHookRunner(); + resetCodexRateLimitCacheForTests(); vi.restoreAllMocks(); for (const tempDir of tempDirs) { await fs.rm(tempDir, { recursive: true, force: true }); @@ -140,6 +142,23 @@ function appServerError(params: { message: string; willRetry: boolean }): Projec }); } +function rateLimitsUpdated(resetsAt: number): ProjectorNotification { + return { + method: "account/rateLimits/updated", + params: { + rateLimits: { + limitId: "codex", + limitName: "Codex", + primary: { usedPercent: 100, windowDurationMins: 300, resetsAt }, + secondary: null, + credits: null, + planType: "plus", + rateLimitReachedType: "rate_limit_reached", + }, + }, + } as ProjectorNotification; +} + function turnCompleted(items: unknown[] = []): ProjectorNotification { return { method: "turn/completed", @@ -280,6 +299,95 @@ describe("CodexAppServerEventProjector", () => { expect(result.lastAssistant).toBeUndefined(); }); + it("uses Codex rate-limit resets for usage-limit app-server errors", async () => { + const projector = await createProjector(); + const resetsAt = Math.ceil(Date.now() / 1000) + 120; + + await projector.handleNotification(rateLimitsUpdated(resetsAt)); + await projector.handleNotification( + forCurrentTurn("error", { + error: { + message: "You've reached your usage limit.", + codexErrorInfo: "usageLimitExceeded", + additionalDetails: null, + }, + willRetry: false, + }), + ); + + const result = projector.buildResult(buildEmptyToolTelemetry()); + + expect(result.promptError).toContain("You've reached your Codex subscription usage limit."); + expect(result.promptError).toContain("Next reset in"); + expect(result.promptError).toContain("Run /codex account"); + expect(result.promptErrorSource).toBe("prompt"); + }); + + it("uses Codex rate-limit resets for failed turns", async () => { + const projector = await createProjector(); + const resetsAt = Math.ceil(Date.now() / 1000) + 120; + + await projector.handleNotification(rateLimitsUpdated(resetsAt)); + await projector.handleNotification( + forCurrentTurn("turn/completed", { + turn: { + id: TURN_ID, + status: "failed", + error: { + message: "You've reached your usage limit.", + codexErrorInfo: "usageLimitExceeded", + additionalDetails: null, + }, + items: [], + }, + }), + ); + + const result = projector.buildResult(buildEmptyToolTelemetry()); + + expect(result.promptError).toContain("You've reached your Codex subscription usage limit."); + expect(result.promptError).toContain("Next reset in"); + expect(result.promptErrorSource).toBe("prompt"); + }); + + it("uses a recent Codex rate-limit snapshot when failed turns omit reset details", async () => { + const projector = await createProjector(); + const resetsAt = Math.ceil(Date.now() / 1000) + 120; + rememberCodexRateLimits({ + rateLimits: { + limitId: "codex", + limitName: "Codex", + primary: { usedPercent: 100, windowDurationMins: 300, resetsAt }, + secondary: null, + credits: null, + planType: "plus", + rateLimitReachedType: "rate_limit_reached", + }, + rateLimitsByLimitId: null, + }); + + await projector.handleNotification( + forCurrentTurn("turn/completed", { + turn: { + id: TURN_ID, + status: "failed", + error: { + message: "You've reached your usage limit.", + codexErrorInfo: "usageLimitExceeded", + additionalDetails: null, + }, + items: [], + }, + }), + ); + + const result = projector.buildResult(buildEmptyToolTelemetry()); + + expect(result.promptError).toContain("You've reached your Codex subscription usage limit."); + expect(result.promptError).toContain("Next reset in"); + expect(result.promptErrorSource).toBe("prompt"); + }); + it("normalizes snake_case current token usage fields", async () => { const projector = await createProjector(); diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index d0f79c6a55f..2f9752c6788 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -27,6 +27,8 @@ import { type JsonObject, type JsonValue, } from "./protocol.js"; +import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js"; +import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js"; import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; import { attachCodexMirrorIdentity } from "./transcript-mirror.js"; @@ -100,6 +102,7 @@ export class CodexAppServerEventProjector { private tokenUsage: ReturnType; private guardianReviewCount = 0; private completedCompactionCount = 0; + private latestRateLimits: JsonValue | undefined; constructor( private readonly params: EmbeddedRunAttemptParams, @@ -112,6 +115,11 @@ export class CodexAppServerEventProjector { if (!params) { return; } + if (notification.method === "account/rateLimits/updated") { + this.latestRateLimits = params; + rememberCodexRateLimits(params); + return; + } if (isHookNotificationMethod(notification.method)) { if (!this.isHookNotificationForCurrentThread(params)) { return; @@ -167,7 +175,7 @@ export class CodexAppServerEventProjector { if (readBooleanAlias(params, ["willRetry", "will_retry"]) === true) { break; } - this.promptError = readCodexErrorNotificationMessage(params) ?? "codex app-server error"; + this.promptError = this.formatCodexErrorMessage(params) ?? "codex app-server error"; this.promptErrorSource = "prompt"; break; default: @@ -523,7 +531,14 @@ export class CodexAppServerEventProjector { this.aborted = true; } if (turn.status === "failed") { - this.promptError = turn.error?.message ?? "codex app-server turn failed"; + this.promptError = + formatCodexUsageLimitErrorMessage({ + message: turn.error?.message, + codexErrorInfo: turn.error?.codexErrorInfo as JsonValue | null | undefined, + rateLimits: this.latestRateLimits ?? readRecentCodexRateLimits(), + }) ?? + turn.error?.message ?? + "codex app-server turn failed"; this.promptErrorSource = "prompt"; } for (const item of turn.items ?? []) { @@ -746,6 +761,17 @@ export class CodexAppServerEventProjector { }); } + private formatCodexErrorMessage(params: JsonObject): string | undefined { + const error = isJsonObject(params.error) ? params.error : undefined; + return ( + formatCodexUsageLimitErrorMessage({ + message: error ? readString(error, "message") : undefined, + codexErrorInfo: error?.codexErrorInfo, + rateLimits: this.latestRateLimits ?? readRecentCodexRateLimits(), + }) ?? readCodexErrorNotificationMessage(params) + ); + } + private emitAgentEvent( event: Parameters>[0], ): void { diff --git a/extensions/codex/src/app-server/rate-limit-cache.ts b/extensions/codex/src/app-server/rate-limit-cache.ts new file mode 100644 index 00000000000..d9ee376d61f --- /dev/null +++ b/extensions/codex/src/app-server/rate-limit-cache.ts @@ -0,0 +1,48 @@ +import type { JsonValue } from "./protocol.js"; + +const DEFAULT_CODEX_RATE_LIMIT_CACHE_MAX_AGE_MS = 10 * 60_000; +const CODEX_RATE_LIMIT_CACHE_STATE = Symbol.for("openclaw.codexRateLimitCacheState"); + +type CodexRateLimitCacheState = { + value?: JsonValue; + updatedAtMs?: number; +}; + +function getCodexRateLimitCacheState(): CodexRateLimitCacheState { + const globalState = globalThis as typeof globalThis & { + [CODEX_RATE_LIMIT_CACHE_STATE]?: CodexRateLimitCacheState; + }; + globalState[CODEX_RATE_LIMIT_CACHE_STATE] ??= {}; + return globalState[CODEX_RATE_LIMIT_CACHE_STATE]; +} + +export function rememberCodexRateLimits(value: JsonValue | undefined, nowMs = Date.now()): void { + if (value === undefined) { + return; + } + const state = getCodexRateLimitCacheState(); + state.value = value; + state.updatedAtMs = nowMs; +} + +export function readRecentCodexRateLimits(options?: { + nowMs?: number; + maxAgeMs?: number; +}): JsonValue | undefined { + const state = getCodexRateLimitCacheState(); + if (state.value === undefined || state.updatedAtMs === undefined) { + return undefined; + } + const nowMs = options?.nowMs ?? Date.now(); + const maxAgeMs = options?.maxAgeMs ?? DEFAULT_CODEX_RATE_LIMIT_CACHE_MAX_AGE_MS; + if (maxAgeMs >= 0 && nowMs - state.updatedAtMs > maxAgeMs) { + return undefined; + } + return state.value; +} + +export function resetCodexRateLimitCacheForTests(): void { + const state = getCodexRateLimitCacheState(); + state.value = undefined; + state.updatedAtMs = undefined; +} diff --git a/extensions/codex/src/app-server/rate-limits.ts b/extensions/codex/src/app-server/rate-limits.ts new file mode 100644 index 00000000000..a01b8600568 --- /dev/null +++ b/extensions/codex/src/app-server/rate-limits.ts @@ -0,0 +1,262 @@ +import { isJsonObject, type JsonObject, type JsonValue } from "./protocol.js"; + +const CODEX_LIMIT_ID = "codex"; +const LIMIT_WINDOW_KEYS = ["primary", "secondary"] as const; +const ONE_MINUTE_MS = 60_000; +const ONE_HOUR_MS = 60 * ONE_MINUTE_MS; +const ONE_DAY_MS = 24 * ONE_HOUR_MS; + +type LimitWindowKey = (typeof LIMIT_WINDOW_KEYS)[number]; + +type RateLimitReset = { + resetsAtMs: number; + usedPercent?: number; +}; + +export function formatCodexUsageLimitErrorMessage(params: { + message?: string | null; + codexErrorInfo?: JsonValue | null; + rateLimits?: JsonValue; + nowMs?: number; +}): string | undefined { + const message = normalizeText(params.message); + if (!isCodexUsageLimitError(params.codexErrorInfo, message)) { + return undefined; + } + const nowMs = params.nowMs ?? Date.now(); + const nextReset = selectNextRateLimitReset(params.rateLimits, nowMs); + const parts = ["You've reached your Codex subscription usage limit."]; + if (nextReset) { + parts.push(`Next reset ${formatResetTime(nextReset.resetsAtMs, nowMs)}.`); + } else { + parts.push("Codex did not return a reset time for this limit."); + } + parts.push("Run /codex account for current usage details."); + return parts.join(" "); +} + +export function summarizeCodexRateLimits( + value: JsonValue | undefined, + nowMs = Date.now(), +): string | undefined { + const snapshots = collectCodexRateLimitSnapshots(value); + if (snapshots.length === 0) { + return undefined; + } + return snapshots + .slice(0, 4) + .map((snapshot) => summarizeRateLimitSnapshot(snapshot, nowMs)) + .join("; "); +} + +function isCodexUsageLimitError( + codexErrorInfo: JsonValue | null | undefined, + message: string | undefined, +): boolean { + if (codexErrorInfo === "usageLimitExceeded") { + return true; + } + if (typeof codexErrorInfo === "string") { + const normalized = codexErrorInfo.replace(/[_\s-]/gu, "").toLowerCase(); + if (normalized === "usagelimitexceeded") { + return true; + } + } + return Boolean(message?.toLowerCase().includes("usage limit")); +} + +function selectNextRateLimitReset( + value: JsonValue | undefined, + nowMs: number, +): RateLimitReset | undefined { + const windows = collectCodexRateLimitSnapshots(value).flatMap((snapshot) => + LIMIT_WINDOW_KEYS.flatMap((key) => readRateLimitWindow(snapshot, key) ?? []), + ); + const futureWindows = windows.filter((window) => window.resetsAtMs > nowMs); + if (futureWindows.length === 0) { + return undefined; + } + const exhaustedWindows = futureWindows.filter( + (window) => window.usedPercent !== undefined && window.usedPercent >= 100, + ); + const candidates = exhaustedWindows.length > 0 ? exhaustedWindows : futureWindows; + candidates.sort((left, right) => left.resetsAtMs - right.resetsAtMs); + return candidates[0]; +} + +function summarizeRateLimitSnapshot(snapshot: JsonObject, nowMs: number): string { + const label = formatLimitLabel(snapshot); + const windows = LIMIT_WINDOW_KEYS.flatMap((key) => { + const window = readRateLimitWindow(snapshot, key); + return window ? [formatRateLimitWindow(key, window, nowMs)] : []; + }); + const reachedType = readString(snapshot, "rateLimitReachedType"); + const suffix = reachedType ? ` (${formatReachedType(reachedType)})` : ""; + return `${label}: ${windows.join(", ") || "available"}${suffix}`; +} + +function collectCodexRateLimitSnapshots(value: JsonValue | undefined): JsonObject[] { + const snapshots: JsonObject[] = []; + const seen = new Set(); + collectRateLimitSnapshots(value, snapshots, seen); + return snapshots; +} + +function collectRateLimitSnapshots( + value: JsonValue | undefined, + snapshots: JsonObject[], + seen: Set, +): void { + if (Array.isArray(value)) { + for (const entry of value) { + collectRateLimitSnapshots(entry, snapshots, seen); + } + return; + } + if (!isJsonObject(value)) { + return; + } + if (isRateLimitSnapshot(value)) { + addRateLimitSnapshot(value, snapshots, seen); + return; + } + const byLimitId = value.rateLimitsByLimitId; + if (isJsonObject(byLimitId)) { + for (const key of sortedRateLimitKeys(Object.keys(byLimitId))) { + collectRateLimitSnapshots(byLimitId[key], snapshots, seen); + } + } + collectRateLimitSnapshots(value.rateLimits, snapshots, seen); + collectRateLimitSnapshots(value.data, snapshots, seen); + collectRateLimitSnapshots(value.items, snapshots, seen); +} + +function sortedRateLimitKeys(keys: string[]): string[] { + return keys.toSorted((left, right) => { + if (left === CODEX_LIMIT_ID) { + return -1; + } + if (right === CODEX_LIMIT_ID) { + return 1; + } + return left.localeCompare(right); + }); +} + +function addRateLimitSnapshot( + snapshot: JsonObject, + snapshots: JsonObject[], + seen: Set, +): void { + const signature = [ + readNullableString(snapshot, "limitId") ?? "", + readNullableString(snapshot, "limitName") ?? "", + formatWindowSignature(snapshot.primary), + formatWindowSignature(snapshot.secondary), + ].join("|"); + if (seen.has(signature)) { + return; + } + seen.add(signature); + snapshots.push(snapshot); +} + +function isRateLimitSnapshot(value: JsonObject): boolean { + return ( + isJsonObject(value.primary) || + isJsonObject(value.secondary) || + value.rateLimitReachedType !== undefined || + value.limitId !== undefined || + value.limitName !== undefined + ); +} + +function readRateLimitWindow( + snapshot: JsonObject, + key: LimitWindowKey, +): RateLimitReset | undefined { + const window = snapshot[key]; + if (!isJsonObject(window)) { + return undefined; + } + const resetsAt = readNumber(window, "resetsAt"); + return { + ...(typeof resetsAt === "number" && Number.isFinite(resetsAt) && resetsAt > 0 + ? { resetsAtMs: resetsAt * 1000 } + : { resetsAtMs: 0 }), + ...readOptionalNumberField(window, "usedPercent"), + }; +} + +function readOptionalNumberField(record: JsonObject, key: string): { usedPercent?: number } { + const value = readNumber(record, key); + return value === undefined ? {} : { usedPercent: value }; +} + +function formatRateLimitWindow(key: LimitWindowKey, window: RateLimitReset, nowMs: number): string { + const usedPercent = + window.usedPercent === undefined ? "usage unknown" : `${Math.round(window.usedPercent)}%`; + const reset = + window.resetsAtMs > nowMs ? `, resets ${formatResetTime(window.resetsAtMs, nowMs)}` : ""; + return `${key} ${usedPercent}${reset}`; +} + +function formatLimitLabel(snapshot: JsonObject): string { + const label = + readNullableString(snapshot, "limitName") ?? readNullableString(snapshot, "limitId"); + if (!label || label === CODEX_LIMIT_ID) { + return "Codex"; + } + return label.replace(/[_-]+/gu, " ").replace(/\s+/gu, " ").trim(); +} + +function formatReachedType(value: string): string { + return value.replace(/[_-]+/gu, " ").replace(/\s+/gu, " ").trim(); +} + +function formatResetTime(resetsAtMs: number, nowMs: number): string { + return `in ${formatRelativeDuration(resetsAtMs - nowMs)} (${new Date(resetsAtMs).toISOString()})`; +} + +function formatRelativeDuration(durationMs: number): string { + const safeMs = Math.max(1_000, durationMs); + if (safeMs < ONE_MINUTE_MS) { + return `${Math.ceil(safeMs / 1000)} seconds`; + } + if (safeMs < ONE_HOUR_MS) { + const minutes = Math.ceil(safeMs / ONE_MINUTE_MS); + return `${minutes} ${minutes === 1 ? "minute" : "minutes"}`; + } + if (safeMs < ONE_DAY_MS) { + const hours = Math.ceil(safeMs / ONE_HOUR_MS); + return `${hours} ${hours === 1 ? "hour" : "hours"}`; + } + const days = Math.ceil(safeMs / ONE_DAY_MS); + return `${days} ${days === 1 ? "day" : "days"}`; +} + +function formatWindowSignature(value: JsonValue | undefined): string { + if (!isJsonObject(value)) { + return ""; + } + return `${readNumber(value, "usedPercent") ?? ""}:${readNumber(value, "resetsAt") ?? ""}`; +} + +function readString(record: JsonObject, key: string): string | undefined { + const value = record[key]; + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + +function readNullableString(record: JsonObject, key: string): string | undefined { + return readString(record, key) ?? undefined; +} + +function readNumber(record: JsonObject, key: string): number | undefined { + const value = record[key]; + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function normalizeText(value: string | null | undefined): string | undefined { + const text = value?.trim(); + return text ? text : undefined; +} diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index bf33227e364..37f815e8bfb 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -24,6 +24,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { CODEX_GPT5_BEHAVIOR_CONTRACT } from "../../prompt-overlay.js"; import * as elicitationBridge from "./elicitation-bridge.js"; import type { CodexServerNotification } from "./protocol.js"; +import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js"; import { runCodexAppServerAttempt, __testing } from "./run-attempt.js"; import { readCodexAppServerBinding, writeCodexAppServerBinding } from "./session-binding.js"; import { createCodexTestModel } from "./test-support.js"; @@ -126,6 +127,23 @@ function turnStartResult(turnId = "turn-1", status = "inProgress") { }; } +function rateLimitsUpdated(resetsAt: number): CodexServerNotification { + return { + method: "account/rateLimits/updated", + params: { + rateLimits: { + limitId: "codex", + limitName: "Codex", + primary: { usedPercent: 100, windowDurationMins: 300, resetsAt }, + secondary: null, + credits: null, + planType: "plus", + rateLimitReachedType: "rate_limit_reached", + }, + }, + }; +} + function assistantMessage(text: string, timestamp: number) { return { role: "assistant" as const, @@ -351,6 +369,7 @@ describe("runCodexAppServerAttempt", () => { afterEach(async () => { __testing.resetCodexAppServerClientFactoryForTests(); + resetCodexRateLimitCacheForTests(); nativeHookRelayTesting.clearNativeHookRelaysForTests(); resetAgentEventsForTest(); resetGlobalHookRunner(); @@ -1171,6 +1190,71 @@ describe("runCodexAppServerAttempt", () => { expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined(); }); + it("preserves Codex usage-limit reset details when turn/start fails", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const resetsAt = Math.ceil(Date.now() / 1000) + 120; + let harness!: ReturnType; + harness = createStartedThreadHarness(async (method) => { + if (method === "turn/start") { + await harness.notify(rateLimitsUpdated(resetsAt)); + throw Object.assign(new Error("You've reached your usage limit."), { + data: { codexErrorInfo: "usageLimitExceeded" }, + }); + } + return undefined; + }); + + const runError = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)).catch( + (error: unknown) => error, + ); + + const error = await runError; + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toContain( + "You've reached your Codex subscription usage limit.", + ); + expect((error as Error).message).toContain("Next reset in"); + }); + + it("uses a recent Codex rate-limit snapshot when turn/start omits reset details", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const resetsAt = Math.ceil(Date.now() / 1000) + 120; + rememberCodexRateLimits({ + rateLimits: { + limitId: "codex", + limitName: "Codex", + primary: { usedPercent: 100, windowDurationMins: 300, resetsAt }, + secondary: null, + credits: null, + planType: "plus", + rateLimitReachedType: "rate_limit_reached", + }, + rateLimitsByLimitId: null, + }); + const harness = createStartedThreadHarness(async (method) => { + if (method === "turn/start") { + throw Object.assign(new Error("You've reached your usage limit."), { + data: { codexErrorInfo: "usageLimitExceeded" }, + }); + } + return undefined; + }); + + const runError = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)).catch( + (error: unknown) => error, + ); + await harness.waitForMethod("turn/start"); + + const error = await runError; + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toContain( + "You've reached your Codex subscription usage limit.", + ); + expect((error as Error).message).toContain("Next reset in"); + }); + it("cleans up native hook relay state when the Codex turn aborts", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); const workspaceDir = path.join(tempDir, "workspace"); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 3850b072220..c87f92b58d4 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -84,6 +84,8 @@ import { type JsonObject, type JsonValue, } from "./protocol.js"; +import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js"; +import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js"; import { readCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js"; import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; import { clearSharedCodexAppServerClientIfCurrent } from "./shared-client.js"; @@ -1034,16 +1036,18 @@ export async function runCodexAppServerAttempt( ), ); } catch (error) { + const usageLimitError = formatCodexTurnStartUsageLimitError(error, pendingNotifications); + const turnStartErrorMessage = usageLimitError ?? formatErrorMessage(error); emitCodexAppServerEvent(params, { stream: "codex_app_server.lifecycle", - data: { phase: "turn_start_failed", error: formatErrorMessage(error) }, + data: { phase: "turn_start_failed", error: turnStartErrorMessage }, }); trajectoryRecorder?.recordEvent("session.ended", { status: "error", threadId: thread.threadId, timedOut, aborted: runAbortController.signal.aborted, - promptError: normalizeCodexTrajectoryError(error), + promptError: turnStartErrorMessage, }); trajectoryEndRecorded = true; runAgentHarnessLlmOutputHook({ @@ -1065,7 +1069,7 @@ export async function runCodexAppServerAttempt( event: { messages: turnStartFailureMessages, success: false, - error: formatErrorMessage(error), + error: turnStartErrorMessage, durationMs: Date.now() - attemptStartedAt, }, ctx: hookContext, @@ -1083,6 +1087,11 @@ export async function runCodexAppServerAttempt( }, }); params.abortSignal?.removeEventListener("abort", abortFromUpstream); + if (usageLimitError) { + throw new Error(usageLimitError, { + cause: error, + }); + } throw error; } turnId = turn.turn.id; @@ -1644,6 +1653,76 @@ function readDynamicToolCallParams( return readCodexDynamicToolCallParams(value); } +function formatCodexTurnStartUsageLimitError( + error: unknown, + pendingNotifications: CodexServerNotification[], +): string | undefined { + const notificationError = readLatestCodexErrorNotification(pendingNotifications); + const errorPayload = readCodexErrorPayload(error); + return formatCodexUsageLimitErrorMessage({ + message: notificationError?.message ?? errorPayload.message ?? formatErrorMessage(error), + codexErrorInfo: notificationError?.codexErrorInfo ?? errorPayload.codexErrorInfo, + rateLimits: + readLatestRateLimitNotificationPayload(pendingNotifications) ?? + errorPayload.rateLimits ?? + readRecentCodexRateLimits(), + }); +} + +function readLatestRateLimitNotificationPayload( + notifications: CodexServerNotification[], +): JsonValue | undefined { + for (let index = notifications.length - 1; index >= 0; index -= 1) { + const notification = notifications[index]; + if (notification?.method === "account/rateLimits/updated") { + rememberCodexRateLimits(notification.params); + return notification.params; + } + } + return undefined; +} + +function readLatestCodexErrorNotification( + notifications: CodexServerNotification[], +): { message?: string; codexErrorInfo?: JsonValue | null } | undefined { + for (let index = notifications.length - 1; index >= 0; index -= 1) { + const notification = notifications[index]; + if (notification?.method !== "error" || !isJsonObject(notification.params)) { + continue; + } + const error = notification.params.error; + if (!isJsonObject(error)) { + continue; + } + return { + message: readString(error, "message"), + codexErrorInfo: error.codexErrorInfo, + }; + } + return undefined; +} + +function readCodexErrorPayload(error: unknown): { + message?: string; + codexErrorInfo?: JsonValue | null; + rateLimits?: JsonValue; +} { + const message = error instanceof Error ? error.message : undefined; + if (!error || typeof error !== "object" || !("data" in error)) { + return { message }; + } + const data = (error as { data?: unknown }).data as JsonValue | undefined; + if (!isJsonObject(data)) { + return { message }; + } + const nestedError = isJsonObject(data.error) ? data.error : data; + return { + message: readString(nestedError, "message") ?? message, + codexErrorInfo: nestedError.codexErrorInfo, + rateLimits: nestedError.rateLimits ?? data.rateLimits, + }; +} + function isTurnNotification( value: JsonValue | undefined, threadId: string, diff --git a/extensions/codex/src/command-formatters.ts b/extensions/codex/src/command-formatters.ts index bf8855e5598..5385d440128 100644 --- a/extensions/codex/src/command-formatters.ts +++ b/extensions/codex/src/command-formatters.ts @@ -1,6 +1,7 @@ import type { CodexComputerUseStatus } from "./app-server/computer-use.js"; import type { CodexAppServerModelListResult } from "./app-server/models.js"; import { isJsonObject, type JsonObject, type JsonValue } from "./app-server/protocol.js"; +import { summarizeCodexRateLimits } from "./app-server/rate-limits.js"; import type { SafeValue } from "./command-rpc.js"; type CodexStatusProbes = { @@ -37,7 +38,7 @@ export function formatCodexStatus(probes: CodexStatusProbes): string { lines.push( `Rate limits: ${ probes.limits.ok - ? summarizeRateLimits(probes.limits.value) + ? formatCodexRateLimitSummary(probes.limits.value) : formatCodexDisplayText(probes.limits.error) }`, ); @@ -104,7 +105,9 @@ export function formatAccount( ): string { return [ `Account: ${account.ok ? formatCodexAccountSummary(account.value) : formatCodexDisplayText(account.error)}`, - `Rate limits: ${limits.ok ? summarizeRateLimits(limits.value) : formatCodexDisplayText(limits.error)}`, + `Rate limits: ${ + limits.ok ? formatCodexRateLimitSummary(limits.value) : formatCodexDisplayText(limits.error) + }`, ].join("\n"); } @@ -276,6 +279,10 @@ function summarizeArrayLike(value: JsonValue | undefined): string { return `${entries.length}`; } +function formatCodexRateLimitSummary(value: JsonValue | undefined): string { + return formatCodexDisplayText(summarizeCodexRateLimits(value) ?? summarizeRateLimits(value)); +} + function summarizeRateLimits(value: JsonValue | undefined): string { const entries = extractArray(value); if (entries.length > 0) { diff --git a/extensions/codex/src/command-handlers.ts b/extensions/codex/src/command-handlers.ts index b6d141859c2..4dcbf577c7e 100644 --- a/extensions/codex/src/command-handlers.ts +++ b/extensions/codex/src/command-handlers.ts @@ -9,6 +9,7 @@ import { import type { CodexComputerUseConfig } from "./app-server/config.js"; import { listAllCodexAppServerModels } from "./app-server/models.js"; import { isJsonObject, type JsonValue } from "./app-server/protocol.js"; +import { rememberCodexRateLimits } from "./app-server/rate-limit-cache.js"; import { clearCodexAppServerBinding, readCodexAppServerBinding, @@ -321,6 +322,9 @@ export async function handleCodexSubcommand( undefined, ), ]); + if (limits.ok) { + rememberCodexRateLimits(limits.value); + } return { text: formatAccount(account, limits) }; } return { text: `Unknown Codex command: ${formatCodexDisplayText(subcommand)}\n\n${buildHelp()}` }; diff --git a/extensions/codex/src/commands.test.ts b/extensions/codex/src/commands.test.ts index e669ae7ed1b..aea475d6858 100644 --- a/extensions/codex/src/commands.test.ts +++ b/extensions/codex/src/commands.test.ts @@ -6,6 +6,10 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js"; import type { CodexComputerUseStatus } from "./app-server/computer-use.js"; import type { CodexAppServerStartOptions } from "./app-server/config.js"; +import { + readRecentCodexRateLimits, + resetCodexRateLimitCacheForTests, +} from "./app-server/rate-limit-cache.js"; import { resetSharedCodexAppServerClientForTests } from "./app-server/shared-client.js"; import { resetCodexDiagnosticsFeedbackStateForTests, @@ -101,6 +105,7 @@ describe("codex command", () => { afterEach(async () => { resetCodexDiagnosticsFeedbackStateForTests(); + resetCodexRateLimitCacheForTests(); resetSharedCodexAppServerClientForTests(); await fs.rm(tempDir, { recursive: true, force: true }); }); @@ -422,10 +427,10 @@ describe("codex command", () => { }); await expect(handleCodexCommand(createContext("status"), { deps })).resolves.toMatchObject({ - text: expect.stringContaining("Rate limits: 1"), + text: expect.stringContaining("Rate limits: Codex: primary 42%"), }); await expect(handleCodexCommand(createContext("account"), { deps })).resolves.toMatchObject({ - text: expect.stringContaining("Rate limits: 1"), + text: expect.stringContaining("Rate limits: Codex: primary 42%"), }); }); @@ -476,6 +481,7 @@ describe("codex command", () => { }); it("formats generated account/read responses", async () => { + const resetsAt = Math.ceil(Date.now() / 1000) + 120; const safeCodexControlRequest = vi .fn() .mockResolvedValueOnce({ @@ -485,14 +491,30 @@ describe("codex command", () => { requiresOpenaiAuth: false, }, }) - .mockResolvedValueOnce({ ok: true, value: { data: [{ name: "primary" }] } }); + .mockResolvedValueOnce({ + ok: true, + value: { + rateLimits: { + limitId: "codex", + limitName: "Codex", + primary: { usedPercent: 50, windowDurationMins: 300, resetsAt }, + secondary: null, + credits: null, + planType: "plus", + rateLimitReachedType: null, + }, + rateLimitsByLimitId: null, + }, + }); - await expect( - handleCodexCommand(createContext("account"), { - deps: createDeps({ safeCodexControlRequest }), - }), - ).resolves.toEqual({ - text: ["Account: codex@example.com", "Rate limits: 1"].join("\n"), + const result = await handleCodexCommand(createContext("account"), { + deps: createDeps({ safeCodexControlRequest }), + }); + + expect(result.text).toContain("Account: codex@example.com"); + expect(result.text).toContain("Rate limits: Codex: primary 50%, resets in"); + expect(readRecentCodexRateLimits()).toMatchObject({ + rateLimits: { limitId: "codex" }, }); expect(safeCodexControlRequest).toHaveBeenCalledWith(undefined, CODEX_CONTROL_METHODS.account, { refreshToken: false, diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index 1d6c747f93f..88bd603d0fe 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -2762,6 +2762,37 @@ describe("createTelegramBot", () => { expect(sendMessageSpy).toHaveBeenCalledTimes(1); expect(sendMessageSpy.mock.calls[0][1]).toBe("PFX final reply"); }); + + it("sends Codex usage-limit reset details as the Telegram reply body", async () => { + const codexRateLimitText = + "⚠️ You've reached your Codex subscription usage limit. Next reset in 42 minutes (2026-05-04T21:34:00.000Z). Run /codex account for current usage details."; + replySpy.mockResolvedValue({ text: codexRateLimitText }); + loadConfig.mockReturnValue({ + channels: { + telegram: { dmPolicy: "open", allowFrom: ["*"] }, + }, + }); + + createTelegramBot({ token: "tok" }); + const handler = getOnHandler("message") as (ctx: Record) => Promise; + await handler({ + message: { + chat: { id: 5, type: "private" }, + text: "hi", + date: 1736380800, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ download: async () => new Uint8Array() }), + }); + + expect(sendMessageSpy).toHaveBeenCalledTimes(1); + expect(sendMessageSpy.mock.calls[0][0]).toBe("5"); + expect(sendMessageSpy.mock.calls[0][1]).toBe(codexRateLimitText); + expect(String(sendMessageSpy.mock.calls[0][1])).not.toContain( + "All models are temporarily rate-limited", + ); + }); + it("honors threaded replies for replyToMode=first/all", async () => { for (const [mode, messageId] of [ ["first", 101], diff --git a/src/auto-reply/reply-payload.ts b/src/auto-reply/reply-payload.ts index a2e2af3315d..18778d6d819 100644 --- a/src/auto-reply/reply-payload.ts +++ b/src/auto-reply/reply-payload.ts @@ -50,6 +50,12 @@ export type ReplyPayload = { export type ReplyPayloadMetadata = { assistantMessageIndex?: number; + /** + * Internal OpenClaw notices generated after a runtime/provider failure are + * not assistant source replies. Dispatch may deliver them even when normal + * assistant source replies are message-tool-only; sendPolicy deny still wins. + */ + deliverDespiteSourceReplySuppression?: boolean; }; const replyPayloadMetadata = new WeakMap(); @@ -66,3 +72,14 @@ export function setReplyPayloadMetadata( export function getReplyPayloadMetadata(payload: object): ReplyPayloadMetadata | undefined { return replyPayloadMetadata.get(payload); } + +export function copyReplyPayloadMetadata(source: object, payload: T): T { + const metadata = getReplyPayloadMetadata(source); + return metadata ? setReplyPayloadMetadata(payload, metadata) : payload; +} + +export function markReplyPayloadForSourceSuppressionDelivery(payload: T): T { + return setReplyPayloadMetadata(payload, { + deliverDespiteSourceReplySuppression: true, + }); +} diff --git a/src/auto-reply/reply/agent-runner-direct-runtime-config.test.ts b/src/auto-reply/reply/agent-runner-direct-runtime-config.test.ts index e95c171ac0f..c8fdeb87f17 100644 --- a/src/auto-reply/reply/agent-runner-direct-runtime-config.test.ts +++ b/src/auto-reply/reply/agent-runner-direct-runtime-config.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { getReplyPayloadMetadata } from "../reply-payload.js"; import type { TemplateContext } from "../templating.js"; import { createTestFollowupRun } from "./agent-runner.test-fixtures.js"; import type { QueueSettings } from "./queue.js"; @@ -201,6 +202,26 @@ describe("runReplyAgent runtime config", () => { ); }); + it("surfaces known pre-run Codex usage-limit failures instead of dropping the reply", async () => { + const { replyParams } = createDirectRuntimeReplyParams({ + shouldFollowup: false, + isActive: false, + }); + const codexMessage = + "You've reached your Codex subscription usage limit. Codex did not return a reset time for this limit. Run /codex account for current usage details."; + runPreflightCompactionIfNeededMock.mockRejectedValue(new Error(codexMessage)); + runMemoryFlushIfNeededMock.mockResolvedValue(undefined); + + const result = await runReplyAgent(replyParams); + + expect(result).toMatchObject({ + text: `⚠️ ${codexMessage}`, + }); + expect(result ? getReplyPayloadMetadata(result) : undefined).toMatchObject({ + deliverDespiteSourceReplySuppression: true, + }); + }); + it("does not resolve secrets before the enqueue-followup queue path", async () => { const { followupRun, resolvedQueue, replyParams } = createDirectRuntimeReplyParams({ shouldFollowup: true, diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index cdcc4b29b71..82b878750f6 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -3,6 +3,7 @@ import { LiveSessionModelSwitchError } from "../../agents/live-model-switch-erro import type { SessionEntry } from "../../config/sessions.js"; import type { ModelDefinitionConfig } from "../../config/types.models.js"; import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js"; +import { getReplyPayloadMetadata } from "../reply-payload.js"; import type { TemplateContext } from "../templating.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; @@ -88,7 +89,8 @@ vi.mock("../../agents/pi-embedded-helpers.js", () => ({ isBillingErrorMessage: () => false, isLikelyContextOverflowError: () => false, isOverloadedErrorMessage: (message: string) => /overloaded|capacity/i.test(message), - isRateLimitErrorMessage: () => false, + isRateLimitErrorMessage: (message: string) => + /rate.limit|too many requests|429|usage limit/i.test(message), isTransientHttpError: () => false, sanitizeUserFacingText: (text?: string) => text ?? "", })); @@ -2024,6 +2026,98 @@ describe("runAgentTurnWithFallback", () => { } }); + it("surfaces Codex usage-limit reset details for pure fallback exhaustion", async () => { + const codexMessage = + "You've reached your Codex subscription usage limit. Next reset in 42 minutes (2026-05-04T21:34:00.000Z). Run /codex account for current usage details."; + state.runWithModelFallbackMock.mockRejectedValueOnce( + Object.assign(new Error(`All models failed (1): openai/gpt-5.5: ${codexMessage}`), { + name: "FallbackSummaryError", + attempts: [ + { + provider: "openai", + model: "gpt-5.5", + error: codexMessage, + reason: "rate_limit", + }, + ], + soonestCooldownExpiry: null, + }), + ); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun: createFollowupRun(), + sessionCtx: { + Provider: "telegram", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: {}, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("final"); + if (result.kind === "final") { + expect(result.payload.text).toBe(`⚠️ ${codexMessage}`); + expect(result.payload.text).not.toContain("All models failed"); + expect(getReplyPayloadMetadata(result.payload)).toMatchObject({ + deliverDespiteSourceReplySuppression: true, + }); + } + }); + + it("surfaces direct Codex usage-limit errors when fallback does not wrap one attempt", async () => { + const codexMessage = + "You've reached your Codex subscription usage limit. Codex did not return a reset time for this limit. Run /codex account for current usage details."; + state.runWithModelFallbackMock.mockRejectedValueOnce(new Error(codexMessage)); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun: createFollowupRun(), + sessionCtx: { + Provider: "telegram", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: {}, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("final"); + if (result.kind === "final") { + expect(result.payload.text).toBe(`⚠️ ${codexMessage}`); + expect(getReplyPayloadMetadata(result.payload)).toMatchObject({ + deliverDespiteSourceReplySuppression: true, + }); + } + }); + it("surfaces billing guidance for pure billing cooldown fallback exhaustion", async () => { state.runWithModelFallbackMock.mockRejectedValueOnce( Object.assign( diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 984e41bc05d..41364a70de4 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -59,6 +59,7 @@ import { } from "../../utils/message-channel.js"; import { isInternalMessageChannel } from "../../utils/message-channel.js"; import { stripHeartbeatToken } from "../heartbeat.js"; +import { markReplyPayloadForSourceSuppressionDelivery } from "../reply-payload.js"; import type { TemplateContext } from "../templating.js"; import type { VerboseLevel } from "../thinking.js"; import { @@ -300,6 +301,10 @@ function rollbackFallbackSelectionStateIfUnchanged( * Includes a countdown when the soonest cooldown expiry is known. */ function buildRateLimitCooldownMessage(err: unknown): string { + const codexUsageLimitMessage = extractCodexUsageLimitErrorMessage(err); + if (codexUsageLimitMessage) { + return codexUsageLimitMessage; + } if (!isFallbackSummaryError(err)) { return "⚠️ All models are temporarily rate-limited. Please try again in a few minutes."; } @@ -316,6 +321,44 @@ function buildRateLimitCooldownMessage(err: unknown): string { return "⚠️ All models are temporarily rate-limited. Please try again in a few minutes."; } +function extractCodexUsageLimitErrorMessage(err: unknown): string | undefined { + if (isFallbackSummaryError(err)) { + for (const attempt of err.attempts) { + const message = extractCodexUsageLimitMessage(attempt.error); + if (message) { + return `⚠️ ${message}`; + } + } + return undefined; + } + const message = extractCodexUsageLimitMessage(formatErrorMessage(err)); + return message ? `⚠️ ${message}` : undefined; +} + +function extractCodexUsageLimitMessage(text: string): string | undefined { + const markers = [ + "You've reached your Codex subscription usage limit.", + "Codex usage limit reached.", + ]; + const markerIndex = markers + .map((marker) => text.indexOf(marker)) + .filter((index) => index >= 0) + .toSorted((left, right) => left - right)[0]; + if (markerIndex === undefined) { + return undefined; + } + const message = sanitizeUserFacingText(text.slice(markerIndex), { errorContext: true }) + .split(/\r?\n/u) + .map((line) => line.trim()) + .filter(Boolean) + .join(" ") + .trim(); + if (!message) { + return undefined; + } + return message.length > 500 ? `${message.slice(0, 497)}...` : message; +} + function isPureTransientRateLimitSummary(err: unknown): boolean { return ( isFallbackSummaryError(err) && @@ -459,6 +502,74 @@ function buildExternalRunFailureReply( }; } +function markAgentRunFailureReplyPayload(payload: T): T { + return markReplyPayloadForSourceSuppressionDelivery(payload); +} + +export function buildKnownAgentRunFailureReplyPayload(params: { + err: unknown; + sessionCtx: TemplateContext; + resolvedVerboseLevel: VerboseLevel | undefined; +}): ReplyPayload | undefined { + const message = formatErrorMessage(params.err); + const isFallbackSummary = isFallbackSummaryError(params.err); + const isBilling = isFallbackSummary + ? isPureBillingSummary(params.err) + : isBillingErrorMessage(message); + if (isBilling) { + return markAgentRunFailureReplyPayload({ + text: resolveExternalRunFailureTextForConversation({ + text: BILLING_ERROR_USER_MESSAGE, + sessionCtx: params.sessionCtx, + isGenericRunnerFailure: false, + }), + }); + } + + const isPureTransientSummary = isFallbackSummary + ? isPureTransientRateLimitSummary(params.err) + : false; + const isRateLimit = isFallbackSummary ? isPureTransientSummary : isRateLimitErrorMessage(message); + const rateLimitOrOverloadedCopy = + !isFallbackSummary || isPureTransientSummary + ? formatRateLimitOrOverloadedErrorCopy(message) + : undefined; + + if (isRateLimit && !isOverloadedErrorMessage(message)) { + return markAgentRunFailureReplyPayload({ + text: resolveExternalRunFailureTextForConversation({ + text: buildRateLimitCooldownMessage(params.err), + sessionCtx: params.sessionCtx, + isGenericRunnerFailure: false, + }), + }); + } + + if (rateLimitOrOverloadedCopy) { + return markAgentRunFailureReplyPayload({ + text: resolveExternalRunFailureTextForConversation({ + text: rateLimitOrOverloadedCopy, + sessionCtx: params.sessionCtx, + isGenericRunnerFailure: false, + }), + }); + } + + const externalRunFailureReply = buildExternalRunFailureReply(message, { + includeDetails: isVerboseFailureDetailEnabled(params.resolvedVerboseLevel), + }); + if (externalRunFailureReply.isGenericRunnerFailure) { + return undefined; + } + return markAgentRunFailureReplyPayload({ + text: resolveExternalRunFailureTextForConversation({ + text: externalRunFailureReply.text, + sessionCtx: params.sessionCtx, + isGenericRunnerFailure: false, + }), + }); +} + const CONTEXT_OVERFLOW_RESET_HINT = "\n\nTo prevent this, increase your compaction buffer by setting " + "`agents.defaults.compaction.reserveTokensFloor` to 20000 or higher in your config."; @@ -1771,7 +1882,7 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("run_failed", embeddedError); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: buildContextOverflowRecoveryText({ cfg: runtimeConfig, agentId: params.followupRun.run.agentId, @@ -1779,7 +1890,7 @@ export async function runAgentTurnWithFallback(params: { primaryModel: params.followupRun.run.model, activeSessionEntry: params.getActiveSessionEntry(), }), - }, + }), }; } if (embeddedError?.kind === "role_ordering") { @@ -1788,9 +1899,9 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("run_failed", embeddedError); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: "⚠️ Message ordering conflict. I've reset the conversation - please try again.", - }, + }), }; } } @@ -1820,13 +1931,13 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("run_failed", err); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: resolveExternalRunFailureTextForConversation({ text: switchErrorText, sessionCtx: params.sessionCtx, isGenericRunnerFailure: !shouldSurfaceToControlUi, }), - }, + }), }; } params.followupRun.run.provider = err.provider; @@ -1852,9 +1963,9 @@ export async function runAgentTurnWithFallback(params: { if (isReplyOperationRestartAbort(params.replyOperation)) { return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: buildRestartLifecycleReplyText(), - }, + }), }; } @@ -1872,9 +1983,9 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("gateway_draining", restartLifecycleError); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: buildRestartLifecycleReplyText(), - }, + }), }; } @@ -1882,9 +1993,9 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("command_lane_cleared", restartLifecycleError); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: buildRestartLifecycleReplyText(), - }, + }), }; } @@ -1897,7 +2008,7 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("run_failed", err); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: buildContextOverflowRecoveryText({ duringCompaction: true, cfg: runtimeConfig, @@ -1906,7 +2017,7 @@ export async function runAgentTurnWithFallback(params: { primaryModel: params.followupRun.run.model, activeSessionEntry: params.getActiveSessionEntry(), }), - }, + }), }; } if (isRoleOrderingError) { @@ -1915,9 +2026,9 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("run_failed", err); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: "⚠️ Message ordering conflict. I've reset the conversation - please try again.", - }, + }), }; } } @@ -1962,9 +2073,9 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("session_corruption_reset", err); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: "⚠️ Session history was corrupted. I've reset the conversation - please try again!", - }, + }), }; } @@ -2036,9 +2147,9 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("run_failed", err); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: userVisibleFallbackText, - }, + }), }; } } @@ -2056,9 +2167,9 @@ export async function runAgentTurnWithFallback(params: { params.replyOperation?.fail("run_failed", finalEmbeddedError); return { kind: "final", - payload: { + payload: markAgentRunFailureReplyPayload({ text: "⚠️ Context overflow — this conversation is too large for the model. Use /new to start a fresh session.", - }, + }), }; } } @@ -2093,10 +2204,10 @@ export async function runAgentTurnWithFallback(params: { : undefined; if (formattedErrorCandidate) { runResult.payloads = [ - { + markAgentRunFailureReplyPayload({ text: formattedErrorCandidate, isError: true, - }, + }), ]; } } diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index 2ca5c1f99c0..886f9d72e4c 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -1,6 +1,10 @@ import { describe, expect, it } from "vitest"; import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js"; import { createTestRegistry } from "../../test-utils/channel-plugins.js"; +import { + getReplyPayloadMetadata, + markReplyPayloadForSourceSuppressionDelivery, +} from "../reply-payload.js"; import { buildReplyPayloads } from "./agent-runner-payloads.js"; const baseParams = { @@ -47,6 +51,28 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads[0]?.text).toBe("Before\n\n\nAfter"); }); + it("preserves internal delivery metadata through final payload normalization", async () => { + const payload = markReplyPayloadForSourceSuppressionDelivery({ + text: "⚠️ API rate limit reached.\n[[reply_to_current]]", + }); + + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [payload], + replyToMode: "all", + currentMessageId: "msg-1", + }); + + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]).toMatchObject({ + text: "⚠️ API rate limit reached.", + replyToId: "msg-1", + }); + expect(getReplyPayloadMetadata(replyPayloads[0])).toMatchObject({ + deliverDespiteSourceReplySuppression: true, + }); + }); + it("strips media URL from payload when in messagingToolSentMediaUrls", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 43ac234731e..6f87cf6afd5 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -5,6 +5,7 @@ import { logVerbose } from "../../globals.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; import { stripLegacyBracketToolCallBlocks } from "../../shared/text/assistant-visible-text.js"; import { stripHeartbeatToken } from "../heartbeat.js"; +import { copyReplyPayloadMetadata } from "../reply-payload.js"; import type { OriginatingChannelType } from "../templating.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js"; @@ -35,15 +36,16 @@ async function normalizeReplyPayloadMedia(params: { } try { - return await params.normalizeMediaPaths(params.payload); + const normalized = await params.normalizeMediaPaths(params.payload); + return copyReplyPayloadMetadata(params.payload, normalized); } catch (err) { logVerbose(`reply payload media normalization failed: ${String(err)}`); - return { + return copyReplyPayloadMetadata(params.payload, { ...params.payload, mediaUrl: undefined, mediaUrls: undefined, audioAsVoice: false, - }; + }); } } @@ -102,7 +104,7 @@ function sanitizeHeartbeatPayload(payload: ReplyPayload): ReplyPayload { return payload; } logVerbose("Stripped legacy tool-call block from heartbeat reply"); - return { ...payload, text: cleaned }; + return copyReplyPayloadMetadata(payload, { ...payload, text: cleaned }); } export async function buildReplyPayloads(params: { @@ -139,7 +141,7 @@ export async function buildReplyPayloads(params: { } if (!text || !text.includes("HEARTBEAT_OK")) { - return [{ ...payload, text }]; + return [copyReplyPayloadMetadata(payload, { ...payload, text })]; } const stripped = stripHeartbeatToken(text, { mode: "message" }); if (stripped.didStrip && !didLogHeartbeatStrip) { @@ -150,7 +152,7 @@ export async function buildReplyPayloads(params: { if (stripped.shouldSkip && !hasMedia) { return []; } - return [{ ...payload, text: stripped.text }]; + return [copyReplyPayloadMetadata(payload, { ...payload, text: stripped.text })]; }); const replyTaggedPayloads = ( @@ -275,20 +277,20 @@ export async function buildReplyPayloads(params: { if (!reply.trimmedText) { return payload; } - const textOnlyPayload = { + const textOnlyPayload = copyReplyPayloadMetadata(payload, { ...payload, mediaUrl: undefined, mediaUrls: undefined, audioAsVoice: undefined, - }; + }); if (!params.blockReplyPipeline?.hasSentPayload(textOnlyPayload)) { return payload; } - return { + return copyReplyPayloadMetadata(payload, { ...payload, text: undefined, audioAsVoice: payload.audioAsVoice || undefined, - }; + }); }; const contentSuppressedPayloads = shouldDropFinalPayloads ? dedupedPayloads.flatMap((payload) => preserveUnsentMediaAfterBlockStream(payload) ?? []) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index eddefddae0a..160fd18cde1 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -38,11 +38,15 @@ import { buildFallbackNotice, resolveFallbackTransition, } from "../fallback-state.js"; +import { markReplyPayloadForSourceSuppressionDelivery } from "../reply-payload.js"; import type { OriginatingChannelType, TemplateContext } from "../templating.js"; import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { runAgentTurnWithFallback } from "./agent-runner-execution.js"; +import { + buildKnownAgentRunFailureReplyPayload, + runAgentTurnWithFallback, +} from "./agent-runner-execution.js"; import { createShouldEmitToolOutput, createShouldEmitToolResult, @@ -1141,9 +1145,9 @@ export async function runReplyAgent(params: { } catch (error) { if (error instanceof ReplyRunAlreadyActiveError) { typing.cleanup(); - return { + return markReplyPayloadForSourceSuppressionDelivery({ text: "⚠️ Previous run is still shutting down. Please try again in a moment.", - }; + }); } throw error; } @@ -1883,24 +1887,39 @@ export async function runReplyAgent(params: { replyOperation.result?.kind === "aborted" && replyOperation.result.code === "aborted_for_restart" ) { - return returnWithQueuedFollowupDrain({ - text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", - }); + return returnWithQueuedFollowupDrain( + markReplyPayloadForSourceSuppressionDelivery({ + text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", + }), + ); } if (replyOperation.result?.kind === "aborted") { return returnWithQueuedFollowupDrain({ text: SILENT_REPLY_TOKEN }); } if (error instanceof GatewayDrainingError) { replyOperation.fail("gateway_draining", error); - return returnWithQueuedFollowupDrain({ - text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", - }); + return returnWithQueuedFollowupDrain( + markReplyPayloadForSourceSuppressionDelivery({ + text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", + }), + ); } if (error instanceof CommandLaneClearedError) { replyOperation.fail("command_lane_cleared", error); - return returnWithQueuedFollowupDrain({ - text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", - }); + return returnWithQueuedFollowupDrain( + markReplyPayloadForSourceSuppressionDelivery({ + text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.", + }), + ); + } + const knownFailurePayload = buildKnownAgentRunFailureReplyPayload({ + err: error, + sessionCtx, + resolvedVerboseLevel, + }); + if (knownFailurePayload) { + replyOperation.fail("run_failed", error); + return returnWithQueuedFollowupDrain(knownFailurePayload); } replyOperation.fail("run_failed", error); // Keep the followup queue moving even when an unexpected exception escapes diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 40f5d4427cd..a63125e3270 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -25,7 +25,7 @@ import { } from "../../test-utils/channel-plugins.js"; import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js"; import type { MsgContext } from "../templating.js"; -import type { GetReplyOptions, ReplyPayload } from "../types.js"; +import { setReplyPayloadMetadata, type GetReplyOptions, type ReplyPayload } from "../types.js"; import type { ReplyDispatcher } from "./reply-dispatcher.js"; import { buildTestCtx } from "./test-ctx.js"; @@ -4162,6 +4162,7 @@ describe("before_dispatch hook", () => { describe("sendPolicy deny — suppress delivery, not processing (#53328)", () => { beforeEach(() => { resetInboundDedupe(); + sessionStoreMocks.currentEntry = undefined; sessionBindingMocks.resolveByConversation.mockReset(); sessionBindingMocks.resolveByConversation.mockReturnValue(null); sessionBindingMocks.touch.mockReset(); @@ -4579,6 +4580,72 @@ describe("sendPolicy deny — suppress delivery, not processing (#53328)", () => ); }); + it("delivers marked runtime failure notices in message-tool-only mode", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "allow", + }; + const dispatcher = createDispatcher(); + const failureNotice = setReplyPayloadMetadata( + { text: "⚠️ You've reached your Codex subscription usage limit." }, + { deliverDespiteSourceReplySuppression: true }, + ); + const replyResolver = vi.fn(async () => failureNotice satisfies ReplyPayload); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + const result = await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + replyOptions: { + sourceReplyDeliveryMode: "message_tool_only", + }, + }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(result.queuedFinal).toBe(true); + expect(result.sourceReplyDeliveryMode).toBe("message_tool_only"); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(failureNotice); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + expect(dispatcher.sendToolResult).not.toHaveBeenCalled(); + }); + + it("does not deliver marked runtime failure notices when sendPolicy denies delivery", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn( + async () => + setReplyPayloadMetadata( + { text: "⚠️ You've reached your Codex subscription usage limit." }, + { deliverDespiteSourceReplySuppression: true }, + ) satisfies ReplyPayload, + ); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + const result = await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + replyOptions: { + sourceReplyDeliveryMode: "message_tool_only", + }, + }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(result.queuedFinal).toBe(false); + expect(result.sourceReplyDeliveryMode).toBe("message_tool_only"); + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + }); + it("defaults group/channel turns to message-tool-only source delivery", async () => { setNoAbort(); const dispatcher = createDispatcher(); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 7ca3c5cea9a..45ac7e35d47 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -779,6 +779,7 @@ export async function dispatchReplyFromConfig( sourceReplyDeliveryMode, suppressAutomaticSourceDelivery, suppressDelivery, + sendPolicyDenied, deliverySuppressionReason, suppressHookUserDelivery, suppressHookReplyLifecycle, @@ -1501,29 +1502,36 @@ export async function dispatchReplyFromConfig( let routedFinalCount = 0; let attemptedFinalDelivery = false; let finalDeliveryFailed = false; + const shouldDeliverDespiteSourceReplySuppression = (reply: ReplyPayload) => + suppressAutomaticSourceDelivery && + !sendPolicyDenied && + getReplyPayloadMetadata(reply)?.deliverDespiteSourceReplySuppression === true; + for (const reply of replies) { + // Suppress reasoning payloads from channel delivery — channels using this + // generic dispatch path do not have a dedicated reasoning lane. + if (reply.isReasoning === true) { + continue; + } + if (suppressDelivery && !shouldDeliverDespiteSourceReplySuppression(reply)) { + continue; + } + attemptedFinalDelivery = true; + const finalReply = await sendFinalPayload(reply); + queuedFinal = finalReply.queuedFinal || queuedFinal; + routedFinalCount += finalReply.routedFinalCount; + if (!finalReply.queuedFinal && finalReply.routedFinalCount === 0) { + finalDeliveryFailed = true; + } + } + + if (attemptedFinalDelivery && !finalDeliveryFailed) { + await clearPendingFinalDeliveryAfterSuccess({ + storePath: sessionStoreEntry.storePath, + sessionKey: sessionStoreEntry.sessionKey ?? sessionKey, + }); + } + if (!suppressDelivery) { - for (const reply of replies) { - // Suppress reasoning payloads from channel delivery — channels using this - // generic dispatch path do not have a dedicated reasoning lane. - if (reply.isReasoning === true) { - continue; - } - attemptedFinalDelivery = true; - const finalReply = await sendFinalPayload(reply); - queuedFinal = finalReply.queuedFinal || queuedFinal; - routedFinalCount += finalReply.routedFinalCount; - if (!finalReply.queuedFinal && finalReply.routedFinalCount === 0) { - finalDeliveryFailed = true; - } - } - - if (attemptedFinalDelivery && !finalDeliveryFailed) { - await clearPendingFinalDeliveryAfterSuccess({ - storePath: sessionStoreEntry.storePath, - sessionKey: sessionStoreEntry.sessionKey ?? sessionKey, - }); - } - const ttsMode = resolveConfiguredTtsMode(cfg, { agentId: sessionAgentId, channelId: deliveryChannel, diff --git a/src/auto-reply/reply/reply-delivery.ts b/src/auto-reply/reply/reply-delivery.ts index 9a428e16ec5..d3c92bc5cdf 100644 --- a/src/auto-reply/reply/reply-delivery.ts +++ b/src/auto-reply/reply/reply-delivery.ts @@ -1,6 +1,6 @@ import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import { logVerbose } from "../../globals.js"; -import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js"; +import { copyReplyPayloadMetadata } from "../reply-payload.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { BlockReplyContext, ReplyPayload, ReplyThreadingPolicy } from "../types.js"; import type { BlockReplyPipeline } from "./block-reply-pipeline.js"; @@ -48,7 +48,7 @@ export function normalizeReplyPayloadDirectives(params: { const mediaUrl = params.payload.mediaUrl ?? parsed?.mediaUrl ?? mediaUrls?.[0]; return { - payload: { + payload: copyReplyPayloadMetadata(params.payload, { ...params.payload, text, mediaUrls, @@ -57,16 +57,11 @@ export function normalizeReplyPayloadDirectives(params: { replyToTag: params.payload.replyToTag || parsed?.replyToTag, replyToCurrent: params.payload.replyToCurrent || parsed?.replyToCurrent, audioAsVoice: Boolean(params.payload.audioAsVoice || parsed?.audioAsVoice), - }, + }), isSilent: parsed?.isSilent ?? false, }; } -function carryReplyPayloadMetadata(source: ReplyPayload, target: ReplyPayload): ReplyPayload { - const metadata = getReplyPayloadMetadata(source); - return metadata ? setReplyPayloadMetadata(target, metadata) : target; -} - async function sendDirectBlockReply(params: { onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise | void; directlySentBlockKeys: Set; @@ -130,7 +125,7 @@ export function createBlockReplyDeliveryHandler(params: { const mediaNormalizedPayload = params.normalizeMediaPaths ? await params.normalizeMediaPaths(normalized.payload) : normalized.payload; - const blockPayload = carryReplyPayloadMetadata( + const blockPayload = copyReplyPayloadMetadata( payload, params.applyReplyToMode(mediaNormalizedPayload), ); diff --git a/src/auto-reply/reply/reply-payloads-base.ts b/src/auto-reply/reply/reply-payloads-base.ts index 5f55cd9cb64..86e601695ef 100644 --- a/src/auto-reply/reply/reply-payloads-base.ts +++ b/src/auto-reply/reply/reply-payloads-base.ts @@ -1,6 +1,7 @@ import type { ReplyToMode } from "../../config/types.js"; import { hasReplyPayloadContent } from "../../interactive/payload.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import { copyReplyPayloadMetadata } from "../reply-payload.js"; import type { OriginatingChannelType } from "../templating.js"; import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js"; import { extractReplyToTag } from "./reply-tags.js"; @@ -42,27 +43,30 @@ function resolveReplyThreadingForPayload(params: { !implicitReplyToId || !allowImplicitReplyToCurrentMessage ? params.payload - : { ...params.payload, replyToId: implicitReplyToId }; + : copyReplyPayloadMetadata(params.payload, { + ...params.payload, + replyToId: implicitReplyToId, + }); if (typeof resolved.text === "string" && resolved.text.includes("[[")) { const { cleaned, replyToId, replyToCurrent, hasTag } = extractReplyToTag( resolved.text, currentMessageId, ); - resolved = { + resolved = copyReplyPayloadMetadata(resolved, { ...resolved, text: cleaned ? cleaned : undefined, replyToId: replyToId ?? resolved.replyToId, replyToTag: hasTag || resolved.replyToTag, replyToCurrent: replyToCurrent || resolved.replyToCurrent, - }; + }); } if (resolved.replyToCurrent && !resolved.replyToId && currentMessageId) { - resolved = { + resolved = copyReplyPayloadMetadata(resolved, { ...resolved, replyToId: currentMessageId, - }; + }); } return resolved; diff --git a/src/auto-reply/reply/reply-threading.ts b/src/auto-reply/reply/reply-threading.ts index 32698b8aa07..c784c100055 100644 --- a/src/auto-reply/reply/reply-threading.ts +++ b/src/auto-reply/reply/reply-threading.ts @@ -4,6 +4,7 @@ import { normalizeAnyChannelId } from "../../channels/registry.js"; import type { ReplyToMode } from "../../config/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js"; +import { copyReplyPayloadMetadata } from "../reply-payload.js"; import type { OriginatingChannelType } from "../templating.js"; import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js"; import { isSingleUseReplyToMode } from "./reply-reference.js"; @@ -107,7 +108,7 @@ export function createReplyToModeFilter( if (opts.allowExplicitReplyTagsWhenOff && isExplicit && !payload.isCompactionNotice) { return payload; } - return { ...payload, replyToId: undefined }; + return copyReplyPayloadMetadata(payload, { ...payload, replyToId: undefined }); } if (mode === "all") { return payload; @@ -119,7 +120,7 @@ export function createReplyToModeFilter( if (payload.isCompactionNotice) { return payload; } - return { ...payload, replyToId: undefined }; + return copyReplyPayloadMetadata(payload, { ...payload, replyToId: undefined }); } // Compaction notices are transient status messages — they should be // threaded (so they appear in-context), but they must not consume the diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 5aa2119caae..711e7b5d50f 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -4,5 +4,9 @@ export type { ReplyThreadingPolicy, TypingPolicy, } from "./get-reply-options.types.js"; -export { setReplyPayloadMetadata } from "./reply-payload.js"; +export { + copyReplyPayloadMetadata, + markReplyPayloadForSourceSuppressionDelivery, + setReplyPayloadMetadata, +} from "./reply-payload.js"; export type { ReplyPayload } from "./reply-payload.js"; diff --git a/src/infra/heartbeat-runner.tool-response.test.ts b/src/infra/heartbeat-runner.tool-response.test.ts index c6cf5337022..f2d8f4044ff 100644 --- a/src/infra/heartbeat-runner.tool-response.test.ts +++ b/src/infra/heartbeat-runner.tool-response.test.ts @@ -5,6 +5,7 @@ import { createHeartbeatToolResponsePayload, type HeartbeatToolResponse, } from "../auto-reply/heartbeat-tool-response.js"; +import { markReplyPayloadForSourceSuppressionDelivery } from "../auto-reply/types.js"; import type { OpenClawConfig } from "../config/config.js"; import { runHeartbeatOnce, type HeartbeatDeps } from "./heartbeat-runner.js"; import { installHeartbeatRunnerTestRuntime } from "./heartbeat-runner.test-harness.js"; @@ -184,6 +185,44 @@ describe("runHeartbeatOnce heartbeat response tool", () => { }); }); + it("delivers Codex runtime failure notices during Codex heartbeat message-tool mode", async () => { + await withTempTelegramHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const cfg = createConfig({ tmpDir, storePath }); + await seedMainSessionStore(storePath, cfg, { + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: TELEGRAM_GROUP, + agentHarnessId: "codex", + }); + const usageLimitMessage = + "⚠️ You've reached your Codex subscription usage limit. Next reset in 42 minutes (2026-05-04T21:34:00.000Z). Run /codex account for current usage details."; + replySpy.mockResolvedValue( + markReplyPayloadForSourceSuppressionDelivery({ + text: usageLimitMessage, + isError: true, + }), + ); + const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1" }); + + const result = await runHeartbeatOnce({ + cfg, + deps: createDeps({ sendTelegram, getReplyFromConfig: replySpy }), + }); + + const calledOpts = replySpy.mock.calls[0]?.[1] as { + sourceReplyDeliveryMode?: string; + }; + expect(result.status).toBe("ran"); + expect(calledOpts.sourceReplyDeliveryMode).toBe("message_tool_only"); + expect(sendTelegram).toHaveBeenCalledTimes(1); + expect(sendTelegram).toHaveBeenCalledWith( + TELEGRAM_GROUP, + usageLimitMessage, + expect.any(Object), + ); + }); + }); + it("uses the heartbeat response tool prompt for auto-selected Codex model sessions", async () => { await withTempTelegramHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { const cfg = createConfig({