diff --git a/src/agents/pi-tools.policy.ts b/src/agents/pi-tools.policy.ts index 2d57a8ab527..048194b71c1 100644 --- a/src/agents/pi-tools.policy.ts +++ b/src/agents/pi-tools.policy.ts @@ -1,4 +1,4 @@ -import { getChannelPlugin } from "../channels/plugins/index.js"; +import { getLoadedChannelPlugin } from "../channels/plugins/index.js"; import { resolveSessionConversation } from "../channels/plugins/session-conversation.js"; import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js"; import { resolveChannelGroupToolsPolicy } from "../config/group-policy.js"; @@ -388,7 +388,7 @@ export function resolveGroupToolPolicy(params: { } let plugin; try { - plugin = getChannelPlugin(channel); + plugin = getLoadedChannelPlugin(channel); } catch { plugin = undefined; } diff --git a/src/channels/plugins/session-conversation.bundled-fallback.test.ts b/src/channels/plugins/session-conversation.bundled-fallback.test.ts index 8ce5d4c418b..e3c2e3850d2 100644 --- a/src/channels/plugins/session-conversation.bundled-fallback.test.ts +++ b/src/channels/plugins/session-conversation.bundled-fallback.test.ts @@ -4,6 +4,7 @@ import { resetPluginRuntimeStateForTest } from "../../plugins/runtime.js"; const fallbackState = vi.hoisted(() => ({ activeDirName: null as string | null, + loadCalls: 0, resolveSessionConversation: null as | ((params: { kind: "group" | "channel"; rawId: string }) => { id: string; @@ -20,10 +21,12 @@ vi.mock("../../plugin-sdk/facade-runtime.js", async () => { ); return { ...actual, - tryLoadActivatedBundledPluginPublicSurfaceModuleSync: ({ dirName }: { dirName: string }) => + tryLoadActivatedBundledPluginPublicSurfaceModuleSync: ({ dirName }: { dirName: string }) => ( + (fallbackState.loadCalls += 1), dirName === fallbackState.activeDirName && fallbackState.resolveSessionConversation ? { resolveSessionConversation: fallbackState.resolveSessionConversation } - : null, + : null + ), }; }); @@ -32,6 +35,7 @@ import { resolveSessionConversationRef, resolveSessionThreadInfo } from "./sessi describe("session conversation bundled fallback", () => { beforeEach(() => { fallbackState.activeDirName = null; + fallbackState.loadCalls = 0; fallbackState.resolveSessionConversation = null; resetPluginRuntimeStateForTest(); }); @@ -148,4 +152,42 @@ describe("session conversation bundled fallback", () => { parentConversationCandidates: ["room:topic:root", "room"], }); }); + + it("reuses the bundled fallback loader result across repeated calls", () => { + fallbackState.activeDirName = "mock-threaded"; + fallbackState.resolveSessionConversation = ({ rawId }) => { + const [conversationId, threadId] = rawId.split(":topic:"); + return { + id: conversationId, + threadId, + baseConversationId: conversationId, + parentConversationCandidates: [conversationId], + }; + }; + setRuntimeConfigSnapshot({ + plugins: { + entries: { + "mock-threaded": { + enabled: true, + }, + }, + }, + }); + + expect(resolveSessionConversationRef("agent:main:mock-threaded:group:room:topic:42")).toEqual( + expect.objectContaining({ + channel: "mock-threaded", + id: "room", + threadId: "42", + }), + ); + expect(resolveSessionConversationRef("agent:main:mock-threaded:group:room:topic:43")).toEqual( + expect.objectContaining({ + channel: "mock-threaded", + id: "room", + threadId: "43", + }), + ); + expect(fallbackState.loadCalls).toBe(1); + }); }); diff --git a/src/channels/plugins/session-conversation.ts b/src/channels/plugins/session-conversation.ts index 46d5e60cdca..d3dcf3f5cc8 100644 --- a/src/channels/plugins/session-conversation.ts +++ b/src/channels/plugins/session-conversation.ts @@ -1,5 +1,6 @@ import { getRuntimeConfigSnapshot } from "../../config/runtime-snapshot.js"; import { tryLoadActivatedBundledPluginPublicSurfaceModuleSync } from "../../plugin-sdk/facade-runtime.js"; +import { getActivePluginChannelRegistryVersion } from "../../plugins/runtime.js"; import { parseRawSessionConversationRef, parseThreadSessionSuffix, @@ -58,6 +59,16 @@ type NormalizedSessionConversationResolution = ResolvedSessionConversation & { hasExplicitParentConversationCandidates: boolean; }; +type BundledSessionConversationFallbackCacheEntry = { + version: number; + resolveSessionConversation: BundledSessionKeyModule["resolveSessionConversation"] | null; +}; + +const bundledSessionConversationFallbackCache = new Map< + string, + BundledSessionConversationFallbackCacheEntry +>(); + function normalizeResolvedChannel(channel: string): string { return ( normalizeAnyChannelId(channel) ?? @@ -148,22 +159,35 @@ function resolveBundledSessionConversationFallback(params: { return null; } const dirName = normalizeResolvedChannel(params.channel); - let resolveSessionConversation: BundledSessionKeyModule["resolveSessionConversation"]; - try { - resolveSessionConversation = - tryLoadActivatedBundledPluginPublicSurfaceModuleSync({ + const version = getActivePluginChannelRegistryVersion(); + let cached = bundledSessionConversationFallbackCache.get(dirName); + if (!cached || cached.version !== version) { + let resolveSessionConversation: BundledSessionKeyModule["resolveSessionConversation"] | null = + null; + try { + const loaded = tryLoadActivatedBundledPluginPublicSurfaceModuleSync({ dirName, artifactBasename: SESSION_KEY_API_ARTIFACT_BASENAME, - })?.resolveSessionConversation; - } catch { - return null; + }); + resolveSessionConversation = + typeof loaded?.resolveSessionConversation === "function" + ? loaded.resolveSessionConversation + : null; + } catch { + resolveSessionConversation = null; + } + cached = { + version, + resolveSessionConversation, + }; + bundledSessionConversationFallbackCache.set(dirName, cached); } - if (typeof resolveSessionConversation !== "function") { + if (typeof cached.resolveSessionConversation !== "function") { return null; } return normalizeSessionConversationResolution( - resolveSessionConversation({ + cached.resolveSessionConversation({ kind: params.kind, rawId: params.rawId, }), @@ -182,6 +206,10 @@ function isBundledSessionConversationFallbackDisabled(channel: string): boolean return !!entry && typeof entry === "object" && entry.enabled === false; } +function shouldProbeBundledSessionConversationFallback(rawId: string): boolean { + return rawId.includes(":"); +} + function resolveSessionConversationResolution(params: { channel: string; kind: "group" | "channel"; @@ -200,7 +228,10 @@ function resolveSessionConversationResolution(params: { rawId, }), ); - const shouldTryBundledFallback = params.bundledFallback !== false && !messaging; + const shouldTryBundledFallback = + params.bundledFallback !== false && + !messaging && + shouldProbeBundledSessionConversationFallback(rawId); const resolved = pluginResolved ?? (shouldTryBundledFallback diff --git a/src/infra/exec-approval-forwarder.test.ts b/src/infra/exec-approval-forwarder.test.ts index 9221b5209b2..e5ed2e2cb3c 100644 --- a/src/infra/exec-approval-forwarder.test.ts +++ b/src/infra/exec-approval-forwarder.test.ts @@ -346,7 +346,7 @@ describe("exec approval forwarder", () => { }); expect(deliver).toHaveBeenCalledTimes(2); - await vi.runAllTimersAsync(); + await vi.advanceTimersByTimeAsync(baseRequest.expiresAtMs - baseRequest.createdAtMs); expect(deliver).toHaveBeenCalledTimes(2); }); @@ -358,7 +358,7 @@ describe("exec approval forwarder", () => { await Promise.resolve(); expect(deliver).toHaveBeenCalledTimes(1); - await vi.runAllTimersAsync(); + await vi.advanceTimersByTimeAsync(baseRequest.expiresAtMs - baseRequest.createdAtMs); expect(deliver).toHaveBeenCalledTimes(2); }); diff --git a/src/infra/exec-approval-forwarder.ts b/src/infra/exec-approval-forwarder.ts index 69d8fd689b7..56211baf431 100644 --- a/src/infra/exec-approval-forwarder.ts +++ b/src/infra/exec-approval-forwarder.ts @@ -1,5 +1,8 @@ import type { ReplyPayload } from "../auto-reply/types.js"; -import { getChannelPlugin, resolveChannelApprovalAdapter } from "../channels/plugins/index.js"; +import { + getLoadedChannelPlugin, + resolveChannelApprovalAdapter, +} from "../channels/plugins/index.js"; import { loadConfig } from "../config/config.js"; import type { ExecApprovalForwardingConfig, @@ -198,7 +201,7 @@ function shouldSkipForwardingFallback(params: { if (!channel) { return false; } - const adapter = resolveChannelApprovalAdapter(getChannelPlugin(channel)); + const adapter = resolveChannelApprovalAdapter(getLoadedChannelPlugin(channel)); return ( adapter?.delivery?.shouldSuppressForwardingFallback?.({ cfg: params.cfg, @@ -377,7 +380,7 @@ function buildApprovalRenderPayload(params: { }): ReplyPayload { const channel = normalizeMessageChannel(params.target.channel) ?? params.target.channel; const adapterPayload = channel - ? params.resolveRenderer(resolveChannelApprovalAdapter(getChannelPlugin(channel)))?.( + ? params.resolveRenderer(resolveChannelApprovalAdapter(getLoadedChannelPlugin(channel)))?.( params.renderParams, ) : null; @@ -583,7 +586,7 @@ function createApprovalHandlers< if (!channel) { return; } - await getChannelPlugin(channel)?.outbound?.beforeDeliverPayload?.({ + await getLoadedChannelPlugin(channel)?.outbound?.beforeDeliverPayload?.({ cfg, target, payload, diff --git a/src/infra/restart-stale-pids.test.ts b/src/infra/restart-stale-pids.test.ts index ed8195c9142..968b335dea0 100644 --- a/src/infra/restart-stale-pids.test.ts +++ b/src/infra/restart-stale-pids.test.ts @@ -822,6 +822,8 @@ describe.skipIf(isWindows)("restart-stale-pids", () => { const stalePid = process.pid + 912; Object.defineProperty(process, "platform", { value: "win32", configurable: true }); try { + let fakeNow = 0; + __testing.setDateNowOverride(() => fakeNow); mockReadWindowsListeningPidsResult.mockReturnValue({ ok: true, pids: [stalePid] }); mockReadWindowsProcessArgs.mockReturnValue(["openclaw", "gateway"]); mockReadWindowsProcessArgsResult.mockReturnValue({ @@ -847,6 +849,9 @@ describe.skipIf(isWindows)("restart-stale-pids", () => { } return true; }); + __testing.setSleepSyncOverride((ms) => { + fakeNow += ms; + }); expect(cleanStaleGatewayProcessesSync()).toEqual([]); expect(mockSpawnSync).toHaveBeenNthCalledWith( @@ -862,6 +867,8 @@ describe.skipIf(isWindows)("restart-stale-pids", () => { expect.objectContaining({ timeout: 5000 }), ); } finally { + __testing.setSleepSyncOverride(null); + __testing.setDateNowOverride(null); if (origDescriptor) { Object.defineProperty(process, "platform", origDescriptor); }