From 83d29dae2bc4c29189ddaa4423cec60657f0517f Mon Sep 17 00:00:00 2001 From: Bob Date: Sun, 5 Apr 2026 11:05:40 +0200 Subject: [PATCH] [codex] Reproduce session stall and restart drain bugs (#61225) * Tests: reproduce session stall and drain bugs * Docs: add reply lifecycle unification plan * Docs: lock down reply lifecycle plan * Delete docs/experiments/plans/reply-lifecycle-unification.md --------- Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com> Co-authored-by: Vincent Koc --- .../reply/agent-runner-execution.test.ts | 37 ++++ .../agent-runner.misc.runreplyagent.test.ts | 170 ++++++++++++++++++ .../reply/get-reply-run.media-only.test.ts | 20 +++ 3 files changed, 227 insertions(+) diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index e539ad176ef..4a85bb647d0 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { LiveSessionModelSwitchError } from "../../agents/live-model-switch-error.js"; import type { SessionEntry } from "../../config/sessions.js"; +import { GatewayDrainingError } from "../../process/command-queue.js"; import type { TemplateContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; import { MAX_LIVE_SWITCH_RETRIES } from "./agent-runner-execution.js"; @@ -460,6 +461,42 @@ describe("runAgentTurnWithFallback", () => { } }); + it("returns a restart-specific error when the gateway is draining", async () => { + state.runEmbeddedPiAgentMock.mockRejectedValueOnce(new GatewayDrainingError()); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun: createFollowupRun(), + sessionCtx: { + Provider: "whatsapp", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: {}, + typingSignals: createMockTypingSignaler(), + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks: new Set(), + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + expect(result.kind).toBe("final"); + if (result.kind === "final") { + expect(result.payload.text).toBe( + "⚠️ Gateway is restarting. Please wait a few seconds and try again.", + ); + } + }); + it("returns a session reset hint for Bedrock tool mismatch errors on external chat channels", async () => { state.runEmbeddedPiAgentMock.mockRejectedValueOnce( new Error( diff --git a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts index e9f8c9368f1..6e7f797fd2e 100644 --- a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts @@ -3,6 +3,8 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { __testing as embeddedRunsTesting } from "../../agents/pi-embedded-runner/runs.js"; +import type { OpenClawConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; import { loadSessionStore, saveSessionStore } from "../../config/sessions.js"; import { onAgentEvent } from "../../infra/agent-events.js"; @@ -12,7 +14,9 @@ import { registerMemoryFlushPlanResolver, } from "../../plugins/memory-state.js"; import type { TemplateContext } from "../templating.js"; +import { __testing as abortTesting, tryFastAbortFromMessage } from "./abort.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; +import { buildTestCtx } from "./test-ctx.js"; import { createMockTypingController } from "./test-helpers.js"; function createCliBackendTestConfig() { @@ -32,6 +36,22 @@ const runEmbeddedPiAgentMock = vi.fn(); const runCliAgentMock = vi.fn(); const runWithModelFallbackMock = vi.fn(); const runtimeErrorMock = vi.fn(); +const compactState = vi.hoisted(() => ({ + compactEmbeddedPiSessionMock: vi.fn(), + actualCompactEmbeddedPiSession: undefined as + | typeof import("../../agents/pi-embedded.js").compactEmbeddedPiSession + | undefined, +})); + +function createDeferred() { + let resolve!: (value: T) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} vi.mock("../../agents/model-fallback.js", () => ({ runWithModelFallback: (params: { @@ -49,8 +69,11 @@ vi.mock("../../agents/pi-embedded.js", async () => { const actual = await vi.importActual( "../../agents/pi-embedded.js", ); + compactState.actualCompactEmbeddedPiSession = actual.compactEmbeddedPiSession; return { ...actual, + compactEmbeddedPiSession: (params: unknown) => + compactState.compactEmbeddedPiSessionMock(params), queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), }; @@ -110,10 +133,19 @@ beforeEach(() => { runCliAgentMock.mockClear(); runWithModelFallbackMock.mockClear(); runtimeErrorMock.mockClear(); + compactState.compactEmbeddedPiSessionMock.mockReset(); loadCronStoreMock.mockClear(); // Default: no cron jobs in store. loadCronStoreMock.mockResolvedValue({ version: 1, jobs: [] }); resetSystemEventsForTest(); + embeddedRunsTesting.resetActiveEmbeddedRuns(); + abortTesting.resetDepsForTests(); + compactState.compactEmbeddedPiSessionMock.mockImplementation((params: unknown) => { + if (!compactState.actualCompactEmbeddedPiSession) { + throw new Error("compactEmbeddedPiSession actual implementation unavailable"); + } + return compactState.actualCompactEmbeddedPiSession(params as never); + }); // Default: no provider switch; execute the chosen provider+model. runWithModelFallbackMock.mockImplementation( @@ -129,6 +161,8 @@ afterEach(() => { vi.useRealTimers(); resetSystemEventsForTest(); clearMemoryPluginState(); + abortTesting.resetDepsForTests(); + embeddedRunsTesting.resetActiveEmbeddedRuns(); }); describe("runReplyAgent onAgentRunStart", () => { @@ -520,6 +554,142 @@ describe("runReplyAgent auto-compaction token update", () => { return { typing, sessionCtx, resolvedQueue, followupRun }; } + it("lets /stop abort a run that is still in preflight compaction", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-preflight-stop-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionFile = "session-relative.jsonl"; + const workspaceDir = tmp; + const transcriptPath = path.join(tmp, sessionFile); + const cfg = { session: { store: storePath } } as OpenClawConfig; + + await fs.writeFile( + transcriptPath, + `${JSON.stringify({ + message: { + role: "user", + content: "x".repeat(320_000), + timestamp: Date.now(), + }, + })}\n`, + "utf-8", + ); + + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + sessionFile, + totalTokens: 10, + totalTokensFresh: false, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const compactionDeferred = createDeferred<{ + ok: true; + compacted: true; + result: { + summary: string; + firstKeptEntryId: string; + tokensBefore: number; + tokensAfter: number; + }; + }>(); + + compactState.compactEmbeddedPiSessionMock.mockImplementationOnce( + async () => await compactionDeferred.promise, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }); + + abortTesting.setDepsForTests({ + getAcpSessionManager: (() => + ({ + resolveSession: () => ({ kind: "none" }), + cancelSession: async () => {}, + }) as never) as never, + getLatestSubagentRunByChildSessionKey: () => null, + listSubagentRunsForController: () => [], + markSubagentRunTerminated: () => 0, + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: cfg, + sessionFile, + workspaceDir, + }); + + const runPromise = runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: sessionKey, + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-6", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + try { + await vi.waitFor(() => { + expect(compactState.compactEmbeddedPiSessionMock).toHaveBeenCalledOnce(); + }); + + const abortResult = await tryFastAbortFromMessage({ + ctx: buildTestCtx({ + Body: "/stop", + RawBody: "/stop", + CommandBody: "/stop", + CommandSource: "text", + CommandAuthorized: true, + ChatType: "direct", + Provider: "whatsapp", + Surface: "whatsapp", + From: "whatsapp:+15550001111", + To: "whatsapp:+15550002222", + SessionKey: sessionKey, + }), + cfg, + }); + + expect(abortResult).toMatchObject({ + handled: true, + aborted: true, + }); + } finally { + compactionDeferred.resolve({ + ok: true, + compacted: true, + result: { + summary: "compacted", + firstKeptEntryId: "first-kept", + tokensBefore: 90_000, + tokensAfter: 8_000, + }, + }); + await runPromise; + } + }); + it("updates totalTokens after auto-compaction using lastCallUsage", async () => { const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-tokens-")); const storePath = path.join(tmp, "sessions.json"); diff --git a/src/auto-reply/reply/get-reply-run.media-only.test.ts b/src/auto-reply/reply/get-reply-run.media-only.test.ts index 4f48415e341..24b3aeb21d8 100644 --- a/src/auto-reply/reply/get-reply-run.media-only.test.ts +++ b/src/auto-reply/reply/get-reply-run.media-only.test.ts @@ -102,6 +102,12 @@ let drainFormattedSystemEvents: typeof import("./session-system-events.js").drai let resolveTypingMode: typeof import("./typing-mode.js").resolveTypingMode; let loadScopeCounter = 0; +function createGatewayDrainingError(): Error { + const error = new Error("Gateway is draining for restart; new tasks are not accepted"); + error.name = "GatewayDrainingError"; + return error; +} + async function loadFreshGetReplyRunModuleForTest() { ({ runPreparedReply } = await importFreshModule( import.meta.url, @@ -334,6 +340,20 @@ describe("runPreparedReply media-only handling", () => { expect(vi.mocked(routeReply)).not.toHaveBeenCalled(); }); + it("does not emit a reset notice when /new is attempted during gateway drain", async () => { + vi.mocked(runReplyAgent).mockRejectedValueOnce(createGatewayDrainingError()); + + await expect( + runPreparedReply( + baseParams({ + resetTriggered: true, + }), + ), + ).rejects.toThrow("Gateway is draining for restart; new tasks are not accepted"); + + expect(vi.mocked(routeReply)).not.toHaveBeenCalled(); + }); + it("uses inbound origin channel for run messageProvider", async () => { await runPreparedReply( baseParams({