diff --git a/CHANGELOG.md b/CHANGELOG.md index b7d91d1db5a..e3bed1b826a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ Docs: https://docs.openclaw.ai - Channels/status reactions: remove stale non-terminal lifecycle reactions when a run reaches done or error, so Discord does not leave a permanent thinking emoji after completion. Fixes #75458. Thanks @davelutztx. - Discord/doctor: migrate unsupported per-channel `agentId` entries under guild channel config into top-level `bindings[]` routes, so `openclaw doctor --fix` preserves the intended agent route instead of stripping it as an unknown key. Fixes #62455. Thanks @lobster-biscuit. - Gateway/config: log config health-state write failures instead of silently hiding config observe-recovery write errors. Thanks @sallyom. +- Diagnostics: reset stuck-session timers on reply, tool, status, block, and ACP progress events, and back off repeated `session.stuck` diagnostics while a session remains unchanged. Supersedes #72010. Thanks @rubencu. ## 2026.4.30 diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 45a5c39e2ea..e1b7d10e5ea 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -74530fefef9ed55cab302802bc0be413ec56929e73c12d4bf4f1e4d290813adc config-baseline.json -21db87c2ebec8844e20bf66ea474c08f3adab842234ff334870fe3e8d87995b4 config-baseline.core.json +8bbb620e445cba64aa8a451cfc1a7142ac24e8c80088d74a2fc813ee9e221680 config-baseline.json +d145a87759d16d5f58873db337a25cb134ab25e776cd454812dca99bb9cb12a7 config-baseline.core.json c401cd3450f1737bc92418cfea301d20b54b7fbef9e6049834acc01af338e538 config-baseline.channel.json 7731a0b93cb335b56fac4c807447ba659fea51ea7a6cd844dc0ef5616669ee75 config-baseline.plugin.json diff --git a/docs/concepts/agent-loop.md b/docs/concepts/agent-loop.md index d04f0d9d1ad..002554092d5 100644 --- a/docs/concepts/agent-loop.md +++ b/docs/concepts/agent-loop.md @@ -164,7 +164,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism - `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides. - Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer. - Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck. -- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path releases the affected session lane so queued startup work can drain. +- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions that have no observed reply, tool, status, block, or ACP progress. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path releases the affected session lane so queued startup work can drain. Repeated `session.stuck` diagnostics back off while the session remains unchanged. - Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers..timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout. - Provider HTTP request timeout: `models.providers..timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout. diff --git a/docs/concepts/queue.md b/docs/concepts/queue.md index 10c847a7c6d..ae2463d7d43 100644 --- a/docs/concepts/queue.md +++ b/docs/concepts/queue.md @@ -115,7 +115,7 @@ keys. - If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining. - If you need queue depth, enable verbose logs and watch for queue timing lines. - Codex app-server runs that accept a turn and then stop emitting progress are interrupted by the Codex adapter so the active session lane can release instead of waiting for the outer run timeout. -- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` are classified by current activity. Active work logs as `session.long_running`; active work with no recent progress logs as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path can release the affected session lane so queued work drains. +- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` with no observed reply, tool, status, block, or ACP progress are classified by current activity. Active work logs as `session.long_running`; active work with no recent progress logs as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path can release the affected session lane so queued work drains. Repeated `session.stuck` diagnostics back off while the session remains unchanged. ## Related diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index b63f1f4b407..0c4a5d02cdb 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -937,7 +937,7 @@ Notes: - `enabled`: master toggle for instrumentation output (default: `true`). - `flags`: array of flag strings enabling targeted log output (supports wildcards like `"telegram.*"` or `"*"`). -- `stuckSessionWarnMs`: age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`. +- `stuckSessionWarnMs`: no-progress age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`. Reply, tool, status, block, and ACP progress reset the timer; repeated `session.stuck` diagnostics back off while unchanged. - `otel.enabled`: enables the OpenTelemetry export pipeline (default: `false`). For the full configuration, signal catalog, and privacy model, see [OpenTelemetry export](/gateway/opentelemetry). - `otel.endpoint`: collector URL for OTel export. - `otel.tracesEndpoint` / `otel.metricsEndpoint` / `otel.logsEndpoint`: optional signal-specific OTLP endpoints. When set, they override `otel.endpoint` for that signal only. diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index f6667c7ff1a..e413b07888f 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -5,7 +5,10 @@ import { createAcpTestConfig as createCfg } from "./test-fixtures/acp-runtime.js type Delivery = { kind: string; text?: string }; -function createProjectorHarness(cfgOverrides?: Parameters[0]) { +function createProjectorHarness( + cfgOverrides?: Parameters[0], + opts?: { onProgress?: () => void }, +) { const deliveries: Delivery[] = []; const projector = createAcpReplyProjector({ cfg: createCfg(cfgOverrides), @@ -14,6 +17,7 @@ function createProjectorHarness(cfgOverrides?: Parameters[0]) deliveries.push({ kind, text: payload.text }); return true; }, + onProgress: opts?.onProgress, }); return { deliveries, projector }; } @@ -175,6 +179,28 @@ async function runHiddenBoundaryCase(params: { } describe("createAcpReplyProjector", () => { + it("reports progress for ACP runtime events before delivery filtering", async () => { + const onProgress = vi.fn(); + const { projector } = createProjectorHarness(undefined, { onProgress }); + + await projector.onEvent({ + type: "text_delta", + stream: "thought", + text: "hidden reasoning", + tag: "agent_message_chunk", + }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "tool-1", + status: "in_progress", + title: "Run command", + text: "Run command", + }); + + expect(onProgress).toHaveBeenCalledTimes(2); + }); + it("coalesces text deltas into bounded block chunks", async () => { const { deliveries, projector } = createProjectorHarness(); diff --git a/src/auto-reply/reply/acp-projector.ts b/src/auto-reply/reply/acp-projector.ts index 3105afbcc21..0ab3887476d 100644 --- a/src/auto-reply/reply/acp-projector.ts +++ b/src/auto-reply/reply/acp-projector.ts @@ -173,6 +173,7 @@ export function createAcpReplyProjector(params: { payload: ReplyPayload, meta?: AcpProjectedDeliveryMeta, ) => Promise; + onProgress?: () => void; provider?: string; accountId?: string; }): AcpReplyProjector { @@ -403,6 +404,7 @@ export function createAcpReplyProjector(params: { }; const onEvent = async (event: AcpRuntimeEvent): Promise => { + params.onProgress?.(); if (event.type === "text_delta") { if (event.stream && event.stream !== "output") { return; diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 6991d7d1814..57d18cb8693 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -74,6 +74,10 @@ const mediaUnderstandingMocks = vi.hoisted(() => ({ applyMediaUnderstanding: vi.fn(async (_params: unknown) => undefined), })); +const diagnosticMocks = vi.hoisted(() => ({ + markDiagnosticSessionProgress: vi.fn(), +})); + const sessionMetaMocks = vi.hoisted(() => ({ readAcpSessionEntry: vi.fn< (params: { sessionKey: string; cfg?: OpenClawConfig }) => AcpSessionStoreEntry | null @@ -168,6 +172,10 @@ vi.mock("./dispatch-acp-session.runtime.js", () => ({ sessionMetaMocks.readAcpSessionEntry(params), })); +vi.mock("../../logging/diagnostic.js", () => ({ + markDiagnosticSessionProgress: diagnosticMocks.markDiagnosticSessionProgress, +})); + vi.mock("./dispatch-acp-transcript.runtime.js", () => ({ persistAcpDispatchTranscript: (params: unknown) => transcriptMocks.persistAcpDispatchTranscript(params), @@ -374,6 +382,7 @@ describe("tryDispatchAcpReply", () => { ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); mediaUnderstandingMocks.applyMediaUnderstanding.mockReset(); mediaUnderstandingMocks.applyMediaUnderstanding.mockResolvedValue(undefined); + diagnosticMocks.markDiagnosticSessionProgress.mockReset(); sessionMetaMocks.readAcpSessionEntry.mockReset(); sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null); transcriptMocks.persistAcpDispatchTranscript.mockClear(); @@ -545,6 +554,18 @@ describe("tryDispatchAcpReply", () => { expect(onReplyStart).toHaveBeenCalledTimes(1); }); + it("does not mark ACP diagnostic progress when diagnostics are disabled", async () => { + setReadyAcpResolution(); + mockVisibleTextTurn(); + + await runDispatch({ + bodyForAgent: "visible", + cfg: createAcpTestConfig({ diagnostics: { enabled: false } }), + }); + + expect(diagnosticMocks.markDiagnosticSessionProgress).not.toHaveBeenCalled(); + }); + it("does not start reply lifecycle for empty ACP prompt", async () => { setReadyAcpResolution(); const onReplyStart = vi.fn(); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 85ce1c06ce2..20f31760bc7 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -11,9 +11,11 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js"; import type { TtsAutoMode } from "../../config/types.tts.js"; import { logVerbose } from "../../globals.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; +import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { generateSecureUuid } from "../../infra/secure-random.js"; import { prefixSystemMessage } from "../../infra/system-message.js"; +import { markDiagnosticSessionProgress } from "../../logging/diagnostic.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { normalizeLowercaseStringOrEmpty, @@ -342,6 +344,23 @@ export async function tryDispatchAcpReply(params: { } const canonicalSessionKey = acpResolution.sessionKey; const acpAgentId = resolveAgentIdFromSessionKey(canonicalSessionKey); + const progressSessionKeys = isDiagnosticsEnabled(params.cfg) + ? Array.from( + new Set( + [params.ctx.SessionKey, sessionKey, canonicalSessionKey] + .map((key) => normalizeOptionalString(key)) + .filter((key): key is string => Boolean(key)), + ), + ) + : []; + const markAcpProgress = + progressSessionKeys.length > 0 + ? () => { + for (const key of progressSessionKeys) { + markDiagnosticSessionProgress({ sessionKey: key }); + } + } + : undefined; let queuedFinal = false; const delivery = createAcpDispatchDeliveryCoordinator({ @@ -401,6 +420,7 @@ export async function tryDispatchAcpReply(params: { cfg: params.cfg, shouldSendToolSummaries: params.shouldSendToolSummaries, deliver: delivery.deliver, + onProgress: markAcpProgress, provider: params.ctx.Surface ?? params.ctx.Provider, accountId: effectiveDispatchAccountId, }); diff --git a/src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts b/src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts index 80f97ef2f20..a674b40da10 100644 --- a/src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts @@ -178,6 +178,7 @@ describe("dispatchReplyFromConfig ACP abort", () => { diagnosticMocks.logMessageQueued.mockReset(); diagnosticMocks.logMessageProcessed.mockReset(); diagnosticMocks.logSessionStateChange.mockReset(); + diagnosticMocks.markDiagnosticSessionProgress.mockReset(); agentEventMocks.emitAgentEvent.mockReset(); agentEventMocks.onAgentEvent.mockReset().mockImplementation(() => () => {}); setNoAbort(); diff --git a/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts index 102e442b9eb..51b7b32503d 100644 --- a/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts @@ -84,6 +84,7 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => { diagnosticMocks.logMessageQueued.mockReset(); diagnosticMocks.logMessageProcessed.mockReset(); diagnosticMocks.logSessionStateChange.mockReset(); + diagnosticMocks.markDiagnosticSessionProgress.mockReset(); runtimePluginMocks.ensureRuntimePluginsLoaded.mockReset(); resetPluginTtsAndThreadMocks(); }); diff --git a/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts b/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts index e14f40dbeae..2e1bc8c2ac6 100644 --- a/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts +++ b/src/auto-reply/reply/dispatch-from-config.shared.test-harness.ts @@ -28,6 +28,7 @@ const diagnosticMocks = vi.hoisted(() => ({ logMessageQueued: vi.fn(), logMessageProcessed: vi.fn(), logSessionStateChange: vi.fn(), + markDiagnosticSessionProgress: vi.fn(), })); const hookMocks = vi.hoisted(() => ({ registry: { @@ -177,6 +178,7 @@ vi.mock("../../logging/diagnostic.js", () => ({ logMessageQueued: diagnosticMocks.logMessageQueued, logMessageProcessed: diagnosticMocks.logMessageProcessed, logSessionStateChange: diagnosticMocks.logSessionStateChange, + markDiagnosticSessionProgress: diagnosticMocks.markDiagnosticSessionProgress, })); vi.mock("../../config/sessions/thread-info.js", () => ({ parseSessionThreadInfo: (sessionKey: string | undefined) => diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 7558da0eb4e..e97e966948d 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -42,6 +42,7 @@ const diagnosticMocks = vi.hoisted(() => ({ logMessageQueued: vi.fn(), logMessageProcessed: vi.fn(), logSessionStateChange: vi.fn(), + markDiagnosticSessionProgress: vi.fn(), })); const hookMocks = vi.hoisted(() => ({ registry: { @@ -343,6 +344,7 @@ vi.mock("../../logging/diagnostic.js", () => ({ logMessageQueued: diagnosticMocks.logMessageQueued, logMessageProcessed: diagnosticMocks.logMessageProcessed, logSessionStateChange: diagnosticMocks.logSessionStateChange, + markDiagnosticSessionProgress: diagnosticMocks.markDiagnosticSessionProgress, })); vi.mock("../../config/sessions/thread-info.js", () => ({ parseSessionThreadInfo: (sessionKey: string | undefined) => @@ -749,6 +751,7 @@ describe("dispatchReplyFromConfig", () => { diagnosticMocks.logMessageQueued.mockClear(); diagnosticMocks.logMessageProcessed.mockClear(); diagnosticMocks.logSessionStateChange.mockClear(); + diagnosticMocks.markDiagnosticSessionProgress.mockClear(); hookMocks.runner.hasHooks.mockClear(); hookMocks.runner.hasHooks.mockImplementation( (hookName?: string) => hookName === "reply_dispatch", @@ -2959,6 +2962,89 @@ describe("dispatchReplyFromConfig", () => { ); }); + it("marks diagnostic progress for real reply events but not reply start callbacks", async () => { + setNoAbort(); + const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "slack", + Surface: "slack", + SessionKey: "agent:main:main", + To: "slack:C123", + }); + const onReplyStart = vi.fn(async () => {}); + const replyResolver = async ( + _ctx: MsgContext, + opts?: GetReplyOptions, + ): Promise => { + await opts?.onReplyStart?.(); + await opts?.onToolResult?.({ text: "tool progress" }); + return { text: "hi" }; + }; + + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher, + replyOptions: { onReplyStart }, + replyResolver, + }); + + expect(onReplyStart).toHaveBeenCalledTimes(1); + expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledTimes(1); + expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledWith({ + sessionKey: "agent:main:main", + }); + }); + + it("keeps diagnostic progress when source progress callbacks are suppressed", async () => { + setNoAbort(); + const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "discord", + Surface: "discord", + ChatType: "channel", + SessionKey: "agent:main:discord:channel:C1", + To: "discord:channel:C1", + }); + const callbacks = { + toolStart: vi.fn(async () => {}), + itemEvent: vi.fn(async () => {}), + commandOutput: vi.fn(async () => {}), + }; + const replyResolver = async ( + _ctx: MsgContext, + opts?: GetReplyOptions, + ): Promise => { + await opts?.onToolStart?.({ name: "lookup" }); + await opts?.onItemEvent?.({ progressText: "working" }); + await opts?.onCommandOutput?.({ output: "line", status: "running" }); + return { text: "hi" }; + }; + + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher, + replyOptions: { + sourceReplyDeliveryMode: "message_tool_only", + onToolStart: callbacks.toolStart, + onItemEvent: callbacks.itemEvent, + onCommandOutput: callbacks.commandOutput, + }, + replyResolver, + }); + + expect(callbacks.toolStart).not.toHaveBeenCalled(); + expect(callbacks.itemEvent).not.toHaveBeenCalled(); + expect(callbacks.commandOutput).not.toHaveBeenCalled(); + expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledTimes(3); + expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledWith({ + sessionKey: "agent:main:discord:channel:C1", + }); + }); + it("routes plugin-owned bindings to the owning plugin before generic inbound claim broadcast", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index e7f6de6f9f9..470b4653463 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -45,6 +45,7 @@ import { logMessageProcessed, logMessageQueued, logSessionStateChange, + markDiagnosticSessionProgress, } from "../../logging/diagnostic.js"; import { buildPluginBindingDeclinedText, @@ -411,6 +412,15 @@ export async function dispatchReplyFromConfig( const boundAcpDispatchSessionKey = resolveBoundAcpDispatchSessionKey({ ctx, cfg }); const acpDispatchSessionKey = boundAcpDispatchSessionKey ?? initialSessionStoreEntry.sessionKey ?? sessionKey; + const markProgress = () => { + if (!canTrackSession || !sessionKey) { + return; + } + markDiagnosticSessionProgress({ sessionKey }); + if (acpDispatchSessionKey && acpDispatchSessionKey !== sessionKey) { + markDiagnosticSessionProgress({ sessionKey: acpDispatchSessionKey }); + } + }; const sessionStoreEntry = boundAcpDispatchSessionKey ? resolveSessionStoreLookup({ ...ctx, SessionKey: boundAcpDispatchSessionKey }, cfg) : initialSessionStoreEntry; @@ -1190,6 +1200,19 @@ export async function dispatchReplyFromConfig( const onPlanUpdateFromReplyOptions = params.replyOptions?.onPlanUpdate; const onApprovalEventFromReplyOptions = params.replyOptions?.onApprovalEvent; const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary; + const wrapProgressCallback = ( + callback: ((...args: Args) => Promise | void) | undefined, + ): ((...args: Args) => Promise) | undefined => { + if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) { + return undefined; + } + return async (...args: Args) => { + markProgress(); + if (!suppressAutomaticSourceDelivery) { + await callback?.(...args); + } + }; + }; const replyResolver = params.replyResolver ?? (await loadGetReplyFromConfigRuntime()).getReplyFromConfig; @@ -1203,33 +1226,18 @@ export async function dispatchReplyFromConfig( sourceReplyDeliveryMode, typingPolicy: typing.typingPolicy, suppressTyping: typing.suppressTyping, - onPartialReply: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onPartialReply, - onReasoningStream: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onReasoningStream, - onReasoningEnd: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onReasoningEnd, - onAssistantMessageStart: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onAssistantMessageStart, - onBlockReplyQueued: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onBlockReplyQueued, - onToolStart: suppressAutomaticSourceDelivery ? undefined : params.replyOptions?.onToolStart, - onItemEvent: suppressAutomaticSourceDelivery ? undefined : params.replyOptions?.onItemEvent, - onCommandOutput: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onCommandOutput, - onCompactionStart: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onCompactionStart, - onCompactionEnd: suppressAutomaticSourceDelivery - ? undefined - : params.replyOptions?.onCompactionEnd, + onPartialReply: wrapProgressCallback(params.replyOptions?.onPartialReply), + onReasoningStream: wrapProgressCallback(params.replyOptions?.onReasoningStream), + onReasoningEnd: wrapProgressCallback(params.replyOptions?.onReasoningEnd), + onAssistantMessageStart: wrapProgressCallback(params.replyOptions?.onAssistantMessageStart), + onBlockReplyQueued: wrapProgressCallback(params.replyOptions?.onBlockReplyQueued), + onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart), + onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent), + onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput), + onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart), + onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd), onToolResult: (payload: ReplyPayload) => { + markProgress(); const run = async () => { markInboundDedupeReplayUnsafe(); if (!suppressAutomaticSourceDelivery) { @@ -1277,6 +1285,7 @@ export async function dispatchReplyFromConfig( return run(); }, onPlanUpdate: async (payload) => { + markProgress(); markInboundDedupeReplayUnsafe(); if (!suppressAutomaticSourceDelivery) { await onPlanUpdateFromReplyOptions?.(payload); @@ -1287,6 +1296,7 @@ export async function dispatchReplyFromConfig( await sendPlanUpdate({ explanation: payload.explanation, steps: payload.steps }); }, onApprovalEvent: async (payload) => { + markProgress(); markInboundDedupeReplayUnsafe(); if (!suppressAutomaticSourceDelivery) { await onApprovalEventFromReplyOptions?.(payload); @@ -1305,6 +1315,7 @@ export async function dispatchReplyFromConfig( await maybeSendWorkingStatus(label); }, onPatchSummary: async (payload) => { + markProgress(); markInboundDedupeReplayUnsafe(); if (!suppressAutomaticSourceDelivery) { await onPatchSummaryFromReplyOptions?.(payload); @@ -1319,6 +1330,7 @@ export async function dispatchReplyFromConfig( await maybeSendWorkingStatus(label); }, onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => { + markProgress(); const run = async () => { if ( payload.isReasoning !== true && diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index f0799f24813..a315c114ba6 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -149,7 +149,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { maximum: 9007199254740991, title: "Session Liveness Threshold (ms)", description: - "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", + "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.", }, otel: { type: "object", @@ -24485,7 +24485,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { }, "diagnostics.stuckSessionWarnMs": { label: "Session Liveness Threshold (ms)", - help: "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", + help: "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.", tags: ["observability", "storage"], }, "diagnostics.otel.enabled": { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 34ea63e73c6..9a81525af3e 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -585,7 +585,7 @@ export const FIELD_HELP: Record = { "diagnostics.enabled": "Master toggle for diagnostics instrumentation output in logs and telemetry wiring paths. Defaults to enabled; set false only in tightly constrained environments.", "diagnostics.stuckSessionWarnMs": - "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", + "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.", "diagnostics.otel.enabled": "Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.", "diagnostics.otel.endpoint": diff --git a/src/config/types.base.ts b/src/config/types.base.ts index 198f2bdabc8..79b53813bee 100644 --- a/src/config/types.base.ts +++ b/src/config/types.base.ts @@ -276,7 +276,7 @@ export type DiagnosticsConfig = { enabled?: boolean; /** Optional ad-hoc diagnostics flags (e.g. "telegram.http"). */ flags?: string[]; - /** Threshold in ms before a processing session logs "stuck session" diagnostics. */ + /** Threshold in ms before a processing session with no observed progress logs diagnostics. */ stuckSessionWarnMs?: number; otel?: DiagnosticsOtelConfig; cacheTrace?: DiagnosticsCacheTraceConfig; diff --git a/src/logging/diagnostic-session-state.ts b/src/logging/diagnostic-session-state.ts index 965aa9f1f8d..69a70681213 100644 --- a/src/logging/diagnostic-session-state.ts +++ b/src/logging/diagnostic-session-state.ts @@ -4,6 +4,7 @@ export type SessionState = { sessionId?: string; sessionKey?: string; lastActivity: number; + lastStuckWarnAgeMs?: number; state: SessionStateValue; queueDepth: number; toolCallHistory?: ToolCallRecord[]; diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 355aa964ba4..5eff8fb922b 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -25,6 +25,7 @@ import { import { logSessionStateChange, logMessageQueued, + markDiagnosticSessionProgress, resetDiagnosticStateForTest, resolveStuckSessionWarnMs, startDiagnosticHeartbeat, @@ -202,6 +203,63 @@ describe("stuck session diagnostics threshold", () => { }); }); + it("does not warn while a processing session continues reporting progress", () => { + const events: DiagnosticEventPayload[] = []; + const unsubscribe = onDiagnosticEvent((event) => { + events.push(event); + }); + try { + startDiagnosticHeartbeat({ + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + vi.advanceTimersByTime(45_000); + markDiagnosticSessionProgress({ sessionId: "s1", sessionKey: "main" }); + vi.advanceTimersByTime(16_000); + } finally { + unsubscribe(); + } + + expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(0); + expect(events.filter((event) => event.type === "session.stalled")).toHaveLength(0); + expect(events.filter((event) => event.type === "session.long_running")).toHaveLength(0); + }); + + it("backs off repeated stuck warnings while a session remains unchanged", () => { + const events: Array<{ ageMs?: number }> = []; + const recoverStuckSession = vi.fn(); + const unsubscribe = onDiagnosticEvent((event) => { + if (event.type === "session.stuck") { + events.push({ ageMs: event.ageMs }); + } + }); + try { + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }, + { recoverStuckSession }, + ); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + vi.advanceTimersByTime(91_000); + expect(events).toHaveLength(1); + expect(recoverStuckSession).toHaveBeenCalledTimes(1); + + vi.advanceTimersByTime(30_000); + } finally { + unsubscribe(); + } + + expect(events.map((event) => event.ageMs)).toEqual([60_000, 120_000]); + expect(recoverStuckSession).toHaveBeenCalledTimes(2); + }); + it("reports active sessions as stalled instead of stuck when active work stops progressing", () => { const events: DiagnosticEventPayload[] = []; const recoverStuckSession = vi.fn(); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 23e5d913e7f..87f5fccc124 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -416,6 +416,7 @@ export function logMessageQueued(params: { const state = getDiagnosticSessionState(params); state.queueDepth += 1; state.lastActivity = Date.now(); + state.lastStuckWarnAgeMs = undefined; if (diag.isEnabled("debug")) { diag.debug( `message queued: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ @@ -494,6 +495,7 @@ export function logSessionStateChange( const prevState = state.state; state.state = params.state; state.lastActivity = Date.now(); + state.lastStuckWarnAgeMs = undefined; if (params.state === "idle") { state.queueDepth = Math.max(0, state.queueDepth - 1); } @@ -518,6 +520,16 @@ export function logSessionStateChange( markActivity(); } +export function markDiagnosticSessionProgress(params: SessionRef) { + if (!areDiagnosticsEnabledForProcess()) { + return; + } + const state = getDiagnosticSessionState(params); + state.lastActivity = Date.now(); + state.lastStuckWarnAgeMs = undefined; + markActivity(); +} + function sessionAttentionFields(params: { classification: SessionAttentionClassification; activity: DiagnosticSessionActivitySnapshot; @@ -562,6 +574,16 @@ export function logSessionAttention( activity, staleMs: params.thresholdMs, }); + if (classification.eventType === "session.stuck") { + const nextWarnAgeMs = + state.lastStuckWarnAgeMs === undefined + ? params.thresholdMs + : Math.max(state.lastStuckWarnAgeMs + params.thresholdMs, state.lastStuckWarnAgeMs * 2); + if (params.ageMs < nextWarnAgeMs) { + return undefined; + } + state.lastStuckWarnAgeMs = params.ageMs; + } const label = classification.eventType === "session.stuck" ? "stuck session"