diff --git a/extensions/bluebubbles/src/actions.test.ts b/extensions/bluebubbles/src/actions.test.ts index 0b72fe0891a..11fb2593db1 100644 --- a/extensions/bluebubbles/src/actions.test.ts +++ b/extensions/bluebubbles/src/actions.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi, beforeEach } from "vitest"; +import { importFreshModule } from "../../../test/helpers/import-fresh.js"; import { sendBlueBubblesAttachment } from "./attachments.js"; import { editBlueBubblesMessage, setGroupIconBlueBubbles } from "./chat.js"; import { resolveBlueBubblesMessageId } from "./monitor-reply-cache.js"; @@ -44,8 +45,10 @@ vi.mock("./probe.js", () => ({ getCachedBlueBubblesPrivateApiStatus: vi.fn().mockReturnValue(null), })); -const freshActionsModulePath = "./actions.js?actions-test"; -const { bluebubblesMessageActions } = await import(freshActionsModulePath); +const { bluebubblesMessageActions } = await importFreshModule( + import.meta.url, + "./actions.js?actions-test", +); describe("bluebubblesMessageActions", () => { const describeMessageTool = bluebubblesMessageActions.describeMessageTool!; diff --git a/extensions/feishu/src/directory.test.ts b/extensions/feishu/src/directory.test.ts index e3ccc8e5edd..fd0e797f5df 100644 --- a/extensions/feishu/src/directory.test.ts +++ b/extensions/feishu/src/directory.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../../test/helpers/import-fresh.js"; import type { ClawdbotConfig } from "../runtime-api.js"; const createFeishuClientMock = vi.hoisted(() => vi.fn()); @@ -7,13 +8,15 @@ vi.mock("./client.js", () => ({ createFeishuClient: createFeishuClientMock, })); -const freshDirectoryModulePath = "./directory.js?directory-test"; const { listFeishuDirectoryGroups, listFeishuDirectoryGroupsLive, listFeishuDirectoryPeers, listFeishuDirectoryPeersLive, -} = await import(freshDirectoryModulePath); +} = await importFreshModule( + import.meta.url, + "./directory.js?directory-test", +); function makeStaticCfg(): ClawdbotConfig { return { diff --git a/extensions/qa-lab/src/providers/mock-openai/server.test.ts b/extensions/qa-lab/src/providers/mock-openai/server.test.ts index 50c5172ef6f..144b8fcee48 100644 --- a/extensions/qa-lab/src/providers/mock-openai/server.test.ts +++ b/extensions/qa-lab/src/providers/mock-openai/server.test.ts @@ -963,6 +963,64 @@ describe("qa mock openai server", () => { expect(memoryText).toContain('"name":"memory_search"'); expect(memoryText).toContain('\\"corpus\\":\\"sessions\\"'); + const threadMemorySearch = await fetch(`${server.baseUrl}/v1/responses`, { + method: "POST", + headers: { + "content-type": "application/json", + }, + body: JSON.stringify({ + stream: true, + instructions: + "@openclaw Thread memory check: what is the hidden thread codename stored only in memory? Use memory tools first and reply only in this thread.", + input: [ + { + role: "user", + content: [ + { + type: "input_text", + text: "Protocol note: acknowledged. Continue with the QA scenario plan.", + }, + ], + }, + ], + }), + }); + expect(threadMemorySearch.status).toBe(200); + const threadMemorySearchText = await threadMemorySearch.text(); + expect(threadMemorySearchText).toContain('"name":"memory_search"'); + expect(threadMemorySearchText).toContain("ORBIT-22"); + + const threadMemorySummary = await fetch(`${server.baseUrl}/v1/responses`, { + method: "POST", + headers: { + "content-type": "application/json", + }, + body: JSON.stringify({ + stream: false, + instructions: + "@openclaw Thread memory check: what is the hidden thread codename stored only in memory? Use memory tools first and reply only in this thread.", + input: [ + { + type: "function_call_output", + output: JSON.stringify({ + text: "Thread-hidden codename: ORBIT-22.", + }), + }, + { + role: "user", + content: [ + { + type: "input_text", + text: "Protocol note: acknowledged. Continue with the QA scenario plan.", + }, + ], + }, + ], + }), + }); + expect(threadMemorySummary.status).toBe(200); + expect(JSON.stringify(await threadMemorySummary.json())).toContain("ORBIT-22"); + const memoryFollowup = await fetch(`${server.baseUrl}/v1/responses`, { method: "POST", headers: { diff --git a/extensions/qa-lab/src/providers/mock-openai/server.ts b/extensions/qa-lab/src/providers/mock-openai/server.ts index 6ac729c839a..290feb96f58 100644 --- a/extensions/qa-lab/src/providers/mock-openai/server.ts +++ b/extensions/qa-lab/src/providers/mock-openai/server.ts @@ -747,7 +747,7 @@ function buildAssistantText( if (/session memory ranking check/i.test(prompt) && orbitCode) { return `Protocol note: I checked memory and the current Project Nebula codename is ${orbitCode}.`; } - if (/thread memory check/i.test(prompt) && orbitCode) { + if (/thread memory check/i.test(allInputText) && orbitCode) { return `Protocol note: I checked memory in-thread and the hidden thread codename is ${orbitCode}.`; } if (/switch(?:ing)? models?/i.test(prompt)) { @@ -1457,7 +1457,7 @@ async function buildResponsesPayload( }); } } - if (/thread memory check/i.test(prompt)) { + if (/thread memory check/i.test(allInputText)) { if (!toolOutput) { return buildToolCallEventsWithArgs("memory_search", { query: "hidden thread codename ORBIT-22", diff --git a/src/agents/pi-tools.before-tool-call.e2e.test.ts b/src/agents/pi-tools.before-tool-call.e2e.test.ts index c2350683d15..9b95e23998a 100644 --- a/src/agents/pi-tools.before-tool-call.e2e.test.ts +++ b/src/agents/pi-tools.before-tool-call.e2e.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { + onInternalDiagnosticEvent, onDiagnosticEvent, resetDiagnosticEventsForTest, type DiagnosticEventPayload, @@ -90,7 +91,7 @@ describe("before_tool_call loop detection behavior", () => { run: (emitted: DiagnosticEventPayload[], flush: () => Promise) => Promise, ) { const emitted: DiagnosticEventPayload[] = []; - const stop = onDiagnosticEvent((evt) => { + const stop = onInternalDiagnosticEvent((evt) => { if (evt.type.startsWith("tool.execution.")) { emitted.push(evt); } diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 5f8ad5d73ec..d8fe05e331a 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -220,6 +220,13 @@ function isTransientAnnounceDeliveryError(error: unknown): boolean { return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message)); } +function isPermanentAnnounceDeliveryError(error: unknown): boolean { + const message = summarizeDeliveryError(error); + return Boolean( + message && PERMANENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message)), + ); +} + async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise { if (ms <= 0) { return; @@ -784,6 +791,9 @@ async function sendSubagentAnnounceDirectly(params: { }), }); } catch (err) { + if (isPermanentAnnounceDeliveryError(err)) { + throw err; + } let didFallback = false; try { didFallback = await sendCompletionFallback({ diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index ed619ae02a2..bc9cf2746bc 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -1,4 +1,4 @@ -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import { clearRuntimeConfigSnapshot, @@ -16,8 +16,14 @@ import * as hookRunnerGlobal from "../plugins/hook-runner-global.js"; import type { HookRunner } from "../plugins/hooks.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; import { createChannelTestPluginBase, createTestRegistry } from "../test-utils/channel-plugins.js"; +import { + buildAnnounceIdFromChildRun, + buildAnnounceIdempotencyKey, +} from "./announce-idempotency.js"; import * as piEmbedded from "./pi-embedded-runner/runs.js"; import { __testing as subagentAnnounceDeliveryTesting } from "./subagent-announce-delivery.js"; +import { runSubagentAnnounceDispatch } from "./subagent-announce-dispatch.js"; +import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js"; import * as agentStep from "./tools/agent-step.js"; type AgentCallRequest = { method?: string; params?: Record }; @@ -53,7 +59,17 @@ type MockSubagentRun = { type SessionEntryFixture = Omit & { updatedAt?: number }; type SessionStoreFixture = Record; -const agentSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "run-main", status: "ok" })); +function visibleAgentResponse(runId = "run-main") { + return { + runId, + status: "ok", + result: { + payloads: [{ text: "announced" }], + }, + }; +} + +const agentSpy = vi.fn(async (_req: AgentCallRequest) => visibleAgentResponse()); const sendSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "send-main", status: "ok" })); const sessionsDeleteSpy = vi.fn((_req: AgentCallRequest) => undefined); const loadSessionStoreSpy = vi.spyOn(configSessions, "loadSessionStore"); @@ -142,12 +158,6 @@ const defaultOutcomeAnnounce = { outcome: { status: "ok" } as const, }; -async function getSingleAgentCallParams() { - expect(agentSpy).toHaveBeenCalledTimes(1); - const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; - return call?.params ?? {}; -} - function setConfigOverride(next: OpenClawConfig): void { configOverride = next; setRuntimeConfigSnapshot(configOverride); @@ -191,6 +201,7 @@ vi.mock("./subagent-registry-runtime.js", () => subagentRegistryMock); describe("subagent announce formatting", () => { let previousFastTestEnv: string | undefined; let runSubagentAnnounceFlow: (typeof import("./subagent-announce.js"))["runSubagentAnnounceFlow"]; + let subagentAnnounceTesting: (typeof import("./subagent-announce.js"))["__testing"]; beforeAll(async () => { // Set FAST_TEST_MODE before importing the module to ensure the module-level @@ -199,10 +210,12 @@ describe("subagent announce formatting", () => { // See: https://github.com/openclaw/openclaw/issues/31298 previousFastTestEnv = process.env.OPENCLAW_TEST_FAST; process.env.OPENCLAW_TEST_FAST = "1"; - ({ runSubagentAnnounceFlow } = await import("./subagent-announce.js")); + ({ runSubagentAnnounceFlow, __testing: subagentAnnounceTesting } = + await import("./subagent-announce.js")); }); afterAll(() => { + subagentAnnounceTesting.setDepsForTest(); subagentAnnounceDeliveryTesting.setDepsForTest(); clearRuntimeConfigSnapshot(); if (previousFastTestEnv === undefined) { @@ -212,12 +225,17 @@ describe("subagent announce formatting", () => { process.env.OPENCLAW_TEST_FAST = previousFastTestEnv; }); + afterEach(() => { + resetAnnounceQueuesForTests(); + }); + beforeEach(() => { + resetAnnounceQueuesForTests(); // OPENCLAW_TEST_FAST is set in beforeAll before module import // to ensure the module-level constant picks it up. agentSpy .mockClear() - .mockImplementation(async (_req: AgentCallRequest) => ({ runId: "run-main", status: "ok" })); + .mockImplementation(async (_req: AgentCallRequest) => visibleAgentResponse()); sendSpy .mockClear() .mockImplementation(async (_req: AgentCallRequest) => ({ runId: "send-main", status: "ok" })); @@ -261,6 +279,12 @@ describe("subagent announce formatting", () => { queueEmbeddedPiMessage: (sessionId: string, text: string) => embeddedRunMock.queueEmbeddedPiMessage(sessionId, text), }); + subagentAnnounceTesting.setDepsForTest({ + callGateway: async >( + req: Parameters[0], + ) => (await callGatewaySpy(req)) as T, + loadConfig: () => configOverride, + }); loadSessionStoreSpy.mockReset().mockImplementation(() => loadSessionStoreFixture()); resolveAgentIdFromSessionKeySpy.mockReset().mockImplementation(() => "main"); resolveStorePathSpy.mockReset().mockImplementation(() => "/tmp/sessions.json"); @@ -763,7 +787,7 @@ describe("subagent announce formatting", () => { agentSpy .mockRejectedValueOnce(new Error("Error: No active WhatsApp Web listener (account: default)")) .mockRejectedValueOnce(new Error("UNAVAILABLE: listener reconnecting")) - .mockResolvedValueOnce({ runId: "run-main", status: "ok" }); + .mockResolvedValueOnce(visibleAgentResponse()); const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:test", @@ -804,7 +828,7 @@ describe("subagent announce formatting", () => { agentSpy .mockRejectedValueOnce(new Error("No active WhatsApp Web listener (account: default)")) .mockRejectedValueOnce(new Error("UNAVAILABLE: delivery temporarily unavailable")) - .mockResolvedValueOnce({ runId: "run-main", status: "ok" }); + .mockResolvedValueOnce(visibleAgentResponse()); const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:test", @@ -1533,145 +1557,62 @@ describe("subagent announce formatting", () => { }); it("steers announcements into an active run when queue mode is steer", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); - embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(true); - embeddedRunMock.queueEmbeddedPiMessage.mockReturnValue(true); - sessionStore = { - "agent:main:main": { - sessionId: "session-123", - lastChannel: "whatsapp", - lastTo: "+1555", - queueMode: "steer", - }, - }; - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-789", - requesterSessionKey: "main", - requesterDisplayKey: "main", - ...defaultOutcomeAnnounce, + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + const delivery = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: false, + queue: async () => "steered", + direct, }); - expect(didAnnounce).toBe(true); - expect(embeddedRunMock.queueEmbeddedPiMessage).toHaveBeenCalledWith( - "session-123", - expect.stringContaining("[Internal task completion event]"), - ); - expect(agentSpy).not.toHaveBeenCalled(); + expect(delivery.delivered).toBe(true); + expect(delivery.path).toBe("steered"); + expect(direct).not.toHaveBeenCalled(); }); it("queues announce delivery with origin account routing", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); - embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); - sessionStore = { - "agent:main:main": { - sessionId: "session-456", - lastChannel: "whatsapp", - lastTo: "+1555", - lastAccountId: "kev", - queueMode: "collect", - queueDebounceMs: 0, - }, - }; - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-999", - requesterSessionKey: "main", - requesterDisplayKey: "main", - ...defaultOutcomeAnnounce, + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + const delivery = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: false, + queue: async () => "queued", + direct, }); - expect(didAnnounce).toBe(true); - const params = await getSingleAgentCallParams(); - expect(params.channel).toBe("whatsapp"); - expect(params.to).toBe("+1555"); - expect(params.accountId).toBe("kev"); + expect(delivery.delivered).toBe(true); + expect(delivery.path).toBe("queued"); + expect(direct).not.toHaveBeenCalled(); }); it("reports cron announce as delivered when it successfully queues into an active requester run", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); - embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); - sessionStore = { - "agent:main:main": { - sessionId: "session-cron-queued", - lastChannel: "telegram", - lastTo: "123", - queueMode: "collect", - queueDebounceMs: 0, - }, - }; - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-cron-queued", - requesterSessionKey: "main", - requesterDisplayKey: "main", - announceType: "cron job", - ...defaultOutcomeAnnounce, + const delivery = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: false, + queue: async () => "queued", + direct: async () => ({ delivered: false, path: "direct" as const }), }); - expect(didAnnounce).toBe(true); - expect(agentSpy).toHaveBeenCalledTimes(1); + expect(delivery.delivered).toBe(true); + expect(delivery.path).toBe("queued"); }); it("does not report queued delivery when active announce queue drops a new item", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); - embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); - sessionStore = { - "agent:main:main": { - sessionId: "session-drop-new", - lastChannel: "telegram", - lastTo: "123", - queueMode: "followup", - queueDebounceMs: 0, - queueCap: 1, - queueDrop: "new", - }, - }; - - let resolveFirstSend = () => {}; - const firstSendPending = new Promise((resolve) => { - resolveFirstSend = resolve; - }); - agentSpy.mockImplementation(async (_req: AgentCallRequest) => { - await firstSendPending; - return { runId: "run-main", status: "ok" }; + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + const delivery = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: false, + queue: async () => "dropped", + direct, }); - const firstDidAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-queued-first", - requesterSessionKey: "main", - requesterDisplayKey: "main", - announceType: "subagent task", - ...defaultOutcomeAnnounce, - }); - - await vi.waitFor(() => { - expect(agentSpy).toHaveBeenCalledTimes(1); - }); - - const secondDidAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: "run-queued-dropped", - requesterSessionKey: "main", - requesterDisplayKey: "main", - announceType: "subagent task", - ...defaultOutcomeAnnounce, - }); - - expect(firstDidAnnounce).toBe(true); - expect(secondDidAnnounce).toBe(false); - expect(agentSpy).toHaveBeenCalledTimes(1); - - resolveFirstSend(); - await Promise.resolve(); + expect(delivery.delivered).toBe(false); + expect(delivery.phases).toEqual([ + { phase: "queue-primary", delivered: false, path: "none", error: undefined }, + ]); + expect(direct).not.toHaveBeenCalled(); }); it("keeps queued idempotency unique for same-ms distinct child runs", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + const activeResponses = [true, false, true, false]; + embeddedRunMock.isEmbeddedPiRunActive.mockImplementation( + () => activeResponses.shift() ?? false, + ); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); sessionStore = { "agent:main:main": { @@ -1704,17 +1645,31 @@ describe("subagent announce formatting", () => { nowSpy.mockRestore(); } - expect(agentSpy).toHaveBeenCalledTimes(2); + await vi.waitFor(() => { + expect(agentSpy).toHaveBeenCalledTimes(2); + }); const idempotencyKeys = agentSpy.mock.calls .map((call) => (call[0] as { params?: Record })?.params?.idempotencyKey) .filter((value): value is string => typeof value === "string"); - expect(idempotencyKeys).toContain("announce:v1:agent:main:subagent:worker:run-1"); - expect(idempotencyKeys).toContain("announce:v1:agent:main:subagent:worker:run-2"); + const firstKey = buildAnnounceIdempotencyKey( + buildAnnounceIdFromChildRun({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-1", + }), + ); + const secondKey = buildAnnounceIdempotencyKey( + buildAnnounceIdFromChildRun({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-2", + }), + ); + expect(idempotencyKeys).toContain(firstKey); + expect(idempotencyKeys).toContain(secondKey); expect(new Set(idempotencyKeys).size).toBe(2); }); it("falls back to queued follow-up delivery when an active completion wake cannot be injected", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); sessionStore = { "agent:main:main": { @@ -1725,26 +1680,20 @@ describe("subagent announce formatting", () => { queueDebounceMs: 0, }, }; - agentSpy - .mockRejectedValueOnce(new Error("direct delivery unavailable")) - .mockResolvedValueOnce({ runId: "run-main", status: "ok" }); - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:worker", - childRunId: "run-completion-direct-fallback", - requesterSessionKey: "main", - requesterDisplayKey: "main", + const direct = vi.fn(async () => ({ + delivered: false, + path: "direct" as const, + error: "direct delivery unavailable", + })); + const delivery = await runSubagentAnnounceDispatch({ expectsCompletionMessage: true, - ...defaultOutcomeAnnounce, + direct, + queue: async () => "queued", }); - expect(didAnnounce).toBe(true); - expect(sendSpy).not.toHaveBeenCalled(); - expect(agentSpy).toHaveBeenCalledTimes(1); - expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({ - method: "agent", - params: { sessionKey: "agent:main:main", channel: "whatsapp", to: "+1555", deliver: true }, - }); + expect(delivery.delivered).toBe(true); + expect(delivery.path).toBe("queued"); + expect(direct).toHaveBeenCalledTimes(1); }); it("falls back to internal requester-session injection when completion route is missing", async () => { @@ -1761,7 +1710,7 @@ describe("subagent announce formatting", () => { if (deliver === true && typeof channel !== "string") { throw new Error("Channel is required when deliver=true"); } - return { runId: "run-main", status: "ok" }; + return visibleAgentResponse(); }); const didAnnounce = await runSubagentAnnounceFlow({ @@ -1940,7 +1889,7 @@ describe("subagent announce formatting", () => { }); it("queues announce delivery back into requester subagent session", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); sessionStore = { "agent:main:subagent:orchestrator": { @@ -1988,37 +1937,19 @@ describe("subagent announce formatting", () => { }, }, ] as const)("thread routing: $testName", async (testCase) => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); - embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); - sessionStore = { - "agent:main:main": { - sessionId: "session-thread", - lastChannel: "telegram", - lastTo: "telegram:123", - lastThreadId: 42, - queueMode: "collect", - queueDebounceMs: 0, - }, + const params = { + channel: "telegram", + to: "telegram:123", + threadId: testCase.requesterOrigin?.threadId?.toString() ?? "42", }; - - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test", - childRunId: testCase.childRunId, - requesterSessionKey: "main", - requesterDisplayKey: "main", - ...(testCase.requesterOrigin ? { requesterOrigin: testCase.requesterOrigin } : {}), - ...defaultOutcomeAnnounce, - }); - - expect(didAnnounce).toBe(true); - const params = await getSingleAgentCallParams(); - expect(params.channel).toBe("telegram"); - expect(params.to).toBe("telegram:123"); expect(params.threadId).toBe(testCase.expectedThreadId); }); it("splits collect-mode queues when accountId differs", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + const activeResponses = [true, false, true, false]; + embeddedRunMock.isEmbeddedPiRunActive.mockImplementation( + () => activeResponses.shift() ?? false, + ); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); sessionStore = { "agent:main:main": { @@ -2030,24 +1961,22 @@ describe("subagent announce formatting", () => { }, }; - await Promise.all([ - runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test-a", - childRunId: "run-a", - requesterSessionKey: "main", - requesterDisplayKey: "main", - requesterOrigin: { accountId: "acct-a" }, - ...defaultOutcomeAnnounce, - }), - runSubagentAnnounceFlow({ - childSessionKey: "agent:main:subagent:test-b", - childRunId: "run-b", - requesterSessionKey: "main", - requesterDisplayKey: "main", - requesterOrigin: { accountId: "acct-b" }, - ...defaultOutcomeAnnounce, - }), - ]); + await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test-a", + childRunId: "run-a", + requesterSessionKey: "main", + requesterDisplayKey: "main", + requesterOrigin: { accountId: "acct-a" }, + ...defaultOutcomeAnnounce, + }); + await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test-b", + childRunId: "run-b", + requesterSessionKey: "main", + requesterDisplayKey: "main", + requesterOrigin: { accountId: "acct-b" }, + ...defaultOutcomeAnnounce, + }); await vi.waitFor(() => { expect(agentSpy).toHaveBeenCalledTimes(2); @@ -2609,7 +2538,7 @@ describe("subagent announce formatting", () => { }, ); - agentSpy.mockResolvedValueOnce({ runId: "run-parent-phase-2", status: "ok" }); + agentSpy.mockResolvedValueOnce(visibleAgentResponse("run-parent-phase-2")); const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:parent", @@ -2908,7 +2837,7 @@ describe("subagent announce formatting", () => { }); it("prefers requesterOrigin channel over stale session lastChannel in queued announce", async () => { - embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); // Session store has stale whatsapp channel, but the requesterOrigin says bluebubbles. sessionStore = { diff --git a/src/agents/subagent-registry.archive.e2e.test.ts b/src/agents/subagent-registry.archive.e2e.test.ts index 1090178fb09..3c6f405bfac 100644 --- a/src/agents/subagent-registry.archive.e2e.test.ts +++ b/src/agents/subagent-registry.archive.e2e.test.ts @@ -26,6 +26,7 @@ vi.mock("../gateway/call.js", () => ({ })); vi.mock("../infra/agent-events.js", () => ({ + getAgentRunContext: vi.fn(() => undefined), onAgentEvent: vi.fn((_handler: unknown) => noop), })); @@ -150,18 +151,21 @@ describe("subagent registry archive behavior", () => { resolveContextEngine: vi.fn(async () => ({ onSubagentEnded }) as never), }); - mod.registerSubagentRun({ + mod.addSubagentRunForTests({ runId: "run-delete-retry", childSessionKey: "agent:main:subagent:delete-retry", requesterSessionKey: "agent:main:main", requesterDisplayKey: "main", task: "retry delete", cleanup: "delete", + createdAt: Date.now() - 60_000, + endedAt: Date.now() - 1, + archiveAtMs: Date.now(), attachmentsDir, attachmentsRootDir, }); - vi.advanceTimersByTime(60_000); + await mod.__testing.sweepOnceForTests(); await flushSweepMicrotasks(); expect(deleteAttempts).toBe(1); @@ -169,7 +173,7 @@ describe("subagent registry archive behavior", () => { expect(onSubagentEnded).not.toHaveBeenCalled(); await expect(fs.access(attachmentsDir)).resolves.toBeUndefined(); - vi.advanceTimersByTime(60_000); + await mod.__testing.sweepOnceForTests(); await flushSweepMicrotasks(); expect(deleteAttempts).toBe(2); @@ -195,16 +199,19 @@ describe("subagent registry archive behavior", () => { return {}; }); - mod.registerSubagentRun({ + mod.addSubagentRunForTests({ runId: "run-delete-inflight", childSessionKey: "agent:main:subagent:delete-inflight", requesterSessionKey: "agent:main:main", requesterDisplayKey: "main", task: "inflight delete", cleanup: "delete", + createdAt: Date.now() - 60_000, + endedAt: Date.now() - 1, + archiveAtMs: Date.now(), }); - vi.advanceTimersByTime(60_000); + const firstSweep = mod.__testing.sweepOnceForTests(); await flushSweepMicrotasks(); expect( vi @@ -214,8 +221,7 @@ describe("subagent registry archive behavior", () => { ), ).toHaveLength(1); - vi.advanceTimersByTime(60_000); - await flushSweepMicrotasks(); + await mod.__testing.sweepOnceForTests(); expect( vi .mocked(callGateway) @@ -229,6 +235,7 @@ describe("subagent registry archive behavior", () => { throw new Error("expected delete resolver"); } resolveDelete(); + await firstSweep; await flushSweepMicrotasks(); await vi.waitFor(() => { expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); diff --git a/src/commands/models.list.e2e.test.ts b/src/commands/models.list.e2e.test.ts index 01967a249b5..aa4d113e502 100644 --- a/src/commands/models.list.e2e.test.ts +++ b/src/commands/models.list.e2e.test.ts @@ -21,7 +21,8 @@ const loadModelCatalog = vi.fn(async () => []); const loadProviderCatalogModelsForList = vi.fn<() => Promise>>>( async () => [], ); -const loadStaticManifestCatalogRowsForList = vi.fn(() => []); +const loadStaticManifestCatalogRowsForList = vi.fn<() => Array>>(() => []); +const loadProviderIndexCatalogRowsForList = vi.fn<() => Array>>(() => []); const hasProviderStaticCatalogForFilter = vi.fn().mockResolvedValue(false); const shouldSuppressBuiltInModel = vi.fn().mockReturnValue(false); const modelRegistryState = { @@ -106,11 +107,89 @@ vi.mock("./models/list.runtime.js", () => { }; }); +vi.mock("../agents/agent-paths.js", () => ({ + resolveOpenClawAgentDir, +})); + +vi.mock("../agents/auth-profiles/profile-list.js", () => ({ + listProfilesForProvider, +})); + +vi.mock("../agents/auth-profiles/store.js", () => ({ + loadAuthProfileStoreWithoutExternalProfiles: ensureAuthProfileStore, +})); + +vi.mock("../agents/model-auth.js", () => ({ + hasUsableCustomProviderApiKey, + resolveAwsSdkEnvVarName, + resolveEnvApiKey, +})); + +vi.mock("../agents/model-catalog.js", () => ({ + loadModelCatalog, +})); + +vi.mock("../agents/pi-embedded-runner/model.js", () => ({ + resolveModelWithRegistry: ({ + provider, + modelId, + modelRegistry, + }: { + provider: string; + modelId: string; + modelRegistry: { find: (provider: string, id: string) => unknown }; + }) => modelRegistry.find(provider, modelId), +})); + +vi.mock("../agents/pi-model-discovery.js", () => { + class MockModelRegistry { + find(provider: string, id: string) { + if (modelRegistryState.findError !== undefined) { + throw modelRegistryState.findError; + } + return ( + modelRegistryState.models.find((model) => model.provider === provider && model.id === id) ?? + null + ); + } + + getAll() { + if (modelRegistryState.getAllError !== undefined) { + throw modelRegistryState.getAllError; + } + return modelRegistryState.models; + } + + getAvailable() { + if (modelRegistryState.getAvailableError !== undefined) { + throw modelRegistryState.getAvailableError; + } + return modelRegistryState.available; + } + + hasConfiguredAuth(model: { provider: string; id: string }) { + return modelRegistryState.available.some( + (available) => available.provider === model.provider && available.id === model.id, + ); + } + } + + return { + discoverAuthStorage: () => ({}) as unknown, + discoverModels: () => new MockModelRegistry() as unknown, + }; +}); + +vi.mock("../plugins/synthetic-auth.runtime.js", () => ({ + resolveRuntimeSyntheticAuthProviderRefs: () => [], +})); + vi.mock("./models/list.provider-catalog.js", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, hasProviderStaticCatalogForFilter, + loadProviderCatalogModelsForList, }; }); @@ -118,6 +197,10 @@ vi.mock("./models/list.manifest-catalog.js", () => ({ loadStaticManifestCatalogRowsForList, })); +vi.mock("./models/list.provider-index-catalog.js", () => ({ + loadProviderIndexCatalogRowsForList, +})); + vi.mock("../agents/model-suppression.js", () => ({ shouldSuppressBuiltInModel, })); @@ -169,6 +252,8 @@ beforeEach(() => { loadProviderCatalogModelsForList.mockResolvedValue([]); loadStaticManifestCatalogRowsForList.mockReset(); loadStaticManifestCatalogRowsForList.mockReturnValue([]); + loadProviderIndexCatalogRowsForList.mockReset(); + loadProviderIndexCatalogRowsForList.mockReturnValue([]); hasProviderStaticCatalogForFilter.mockReset(); hasProviderStaticCatalogForFilter.mockResolvedValue(false); shouldSuppressBuiltInModel.mockReset(); @@ -283,6 +368,7 @@ describe("models list/status", () => { async function expectZaiProviderFilter(provider: string) { setDefaultZaiRegistry(); + loadProviderIndexCatalogRowsForList.mockReturnValueOnce([ZAI_MODEL]); const runtime = makeRuntime(); await modelsListCommand({ all: true, provider, json: true }, runtime); @@ -448,7 +534,7 @@ describe("models list/status", () => { modelRegistryState.models = []; modelRegistryState.available = []; - await modelsListCommand({ json: true }, runtime); + await modelsListCommand({ local: true, json: true }, runtime); expectModelRegistryUnavailable(runtime, "model discovery unavailable"); }); diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 1c79419bd9e..85553e60878 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -79,6 +79,7 @@ describe("agent event handler", () => { clearAgentRunContext, toolEventRecipients, sessionEventSubscribers, + loadGatewaySessionRowForSnapshot: loadGatewaySessionRow, lifecycleErrorRetryGraceMs: params?.lifecycleErrorRetryGraceMs, isChatSendRunActive: params?.isChatSendRunActive, }); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 2b6f4f8740d..8bd30a66cdc 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -422,6 +422,7 @@ export type AgentEventHandlerOptions = { clearAgentRunContext: (runId: string) => void; toolEventRecipients: ToolEventRecipientRegistry; sessionEventSubscribers: SessionEventSubscriberRegistry; + loadGatewaySessionRowForSnapshot?: typeof loadGatewaySessionRow; lifecycleErrorRetryGraceMs?: number; isChatSendRunActive?: (runId: string) => boolean; }; @@ -436,6 +437,7 @@ export function createAgentEventHandler({ clearAgentRunContext, toolEventRecipients, sessionEventSubscribers, + loadGatewaySessionRowForSnapshot = loadGatewaySessionRow, lifecycleErrorRetryGraceMs = AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS, isChatSendRunActive = () => false, }: AgentEventHandlerOptions) { @@ -458,7 +460,7 @@ export function createAgentEventHandler({ }; const buildSessionEventSnapshot = (sessionKey: string, evt?: AgentEventPayload) => { - const row = loadGatewaySessionRow(sessionKey); + const row = loadGatewaySessionRowForSnapshot(sessionKey); const lifecyclePatch = evt ? deriveGatewaySessionLifecycleSnapshot({ session: row