From f6124f3e17a9614ef16e07d7c65bac0c7a618033 Mon Sep 17 00:00:00 2001 From: Bob Date: Tue, 7 Apr 2026 12:23:50 +0200 Subject: [PATCH] ACP: harden Discord recovery and reset flow (#62132) * ACP: harden Discord recovery and reset flow * CI: harden bundled vitest excludes * ACP: fix Claude launch and reset recovery * Discord: use follow-up replies after slash defer * ACP: route bound resets through gateway service * ACP: unify bound reset authority * ACPX: update OpenClaw branch to 0.5.2 * ACP: fix rebuilt branch replay fallout * ACP: fix CI regressions after ACPX 0.5.2 update --------- Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com> --- extensions/acpx/package.json | 2 +- extensions/acpx/src/acpx-runtime-compat.d.ts | 6 +- extensions/acpx/src/runtime.test.ts | 137 +++---- extensions/acpx/src/runtime.ts | 1 + .../discord/src/monitor/agent-components.ts | 8 +- .../monitor/message-handler.process.test.ts | 43 +- .../src/monitor/message-handler.process.ts | 23 +- .../native-command.commands-allowfrom.test.ts | 11 +- .../native-command.plugin-dispatch.test.ts | 12 +- .../discord/src/monitor/native-command.ts | 4 +- extensions/discord/src/monitor/typing.test.ts | 42 ++ extensions/discord/src/monitor/typing.ts | 28 +- pnpm-lock.yaml | 10 +- src/acp/control-plane/manager.core.ts | 209 +++++++--- src/acp/control-plane/manager.test.ts | 373 ++++++++++++++++++ src/acp/control-plane/manager.types.ts | 4 +- src/acp/persistent-bindings.lifecycle.test.ts | 103 ++++- src/acp/persistent-bindings.lifecycle.ts | 32 +- src/acp/persistent-bindings.test.ts | 56 ++- .../reply/commands-reset-hooks.test.ts | 72 +++- src/auto-reply/reply/commands-reset.ts | 57 +-- .../reply/dispatch-acp-delivery.test.ts | 20 + src/auto-reply/reply/dispatch-acp-delivery.ts | 6 +- .../acp-stateful-target-driver.test.ts | 77 ++++ .../plugins/acp-stateful-target-driver.ts | 49 ++- .../acp-stateful-target-reset.runtime.ts | 1 + src/channels/plugins/binding-targets.test.ts | 2 + src/channels/plugins/binding-targets.ts | 1 + .../plugins/stateful-target-drivers.ts | 1 + ...sessions.gateway-server-sessions-a.test.ts | 39 ++ src/gateway/session-reset-service.ts | 79 ++++ src/infra/path-env.test.ts | 20 + src/infra/path-env.ts | 11 + test/vitest-scoped-config.test.ts | 22 ++ vitest.bundled.config.ts | 22 +- 35 files changed, 1294 insertions(+), 289 deletions(-) create mode 100644 extensions/discord/src/monitor/typing.test.ts create mode 100644 src/channels/plugins/acp-stateful-target-driver.test.ts create mode 100644 src/channels/plugins/acp-stateful-target-reset.runtime.ts diff --git a/extensions/acpx/package.json b/extensions/acpx/package.json index b763d2614fa..4566d520436 100644 --- a/extensions/acpx/package.json +++ b/extensions/acpx/package.json @@ -4,7 +4,7 @@ "description": "OpenClaw ACP runtime backend", "type": "module", "dependencies": { - "acpx": "0.5.1" + "acpx": "0.5.2" }, "devDependencies": { "@openclaw/plugin-sdk": "workspace:*" diff --git a/extensions/acpx/src/acpx-runtime-compat.d.ts b/extensions/acpx/src/acpx-runtime-compat.d.ts index 03ea9e9b00a..444903bc2f7 100644 --- a/extensions/acpx/src/acpx-runtime-compat.d.ts +++ b/extensions/acpx/src/acpx-runtime-compat.d.ts @@ -45,7 +45,11 @@ declare module "acpx/runtime" { setMode(input: { handle: AcpRuntimeHandle; mode: string }): Promise; setConfigOption(input: { handle: AcpRuntimeHandle; key: string; value: string }): Promise; cancel(input: { handle: AcpRuntimeHandle; reason?: string }): Promise; - close(input: { handle: AcpRuntimeHandle; reason?: string }): Promise; + close(input: { + handle: AcpRuntimeHandle; + reason?: string; + discardPersistentState?: boolean; + }): Promise; } export function createAcpRuntime(...args: unknown[]): AcpxRuntime; diff --git a/extensions/acpx/src/runtime.test.ts b/extensions/acpx/src/runtime.test.ts index 0e7186cfa6e..888935ffd80 100644 --- a/extensions/acpx/src/runtime.test.ts +++ b/extensions/acpx/src/runtime.test.ts @@ -1,74 +1,37 @@ -import type { AcpRuntimeHandle, AcpRuntimeOptions, AcpSessionStore } from "acpx/runtime"; +import type { AcpSessionStore } from "acpx/runtime"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { AcpRuntime } from "../runtime-api.js"; +import { AcpxRuntime } from "./runtime.js"; -const mocks = vi.hoisted(() => { - const state = { - capturedStore: undefined as AcpSessionStore | undefined, - }; - - class MockAcpxRuntime { - constructor(options: AcpRuntimeOptions) { - state.capturedStore = options.sessionStore; - } - - isHealthy() { - return true; - } - - async probeAvailability() {} - - async doctor() { - return { ok: true, message: "ok" }; - } - - async ensureSession() { - return { - sessionKey: "agent:codex:acp:binding:test", - backend: "acpx", - runtimeSessionName: "agent:codex:acp:binding:test", - } satisfies AcpRuntimeHandle; - } - - async *runTurn() {} - - getCapabilities() { - return { controls: [] }; - } - - async getStatus() { - return {}; - } - - async setMode() {} - - async setConfigOption() {} - - async cancel() {} - - async close() {} - } +function makeRuntime(baseStore: AcpSessionStore): { + runtime: AcpxRuntime; + wrappedStore: AcpSessionStore & { markFresh: (sessionKey: string) => void }; + delegate: { close: AcpRuntime["close"] }; +} { + const runtime = new AcpxRuntime({ + cwd: "/tmp", + sessionStore: baseStore, + agentRegistry: { + resolve: () => "codex", + list: () => ["codex"], + }, + permissionMode: "approve-reads", + }); return { - state, - MockAcpxRuntime, + runtime, + wrappedStore: ( + runtime as unknown as { + sessionStore: AcpSessionStore & { markFresh: (sessionKey: string) => void }; + } + ).sessionStore, + delegate: (runtime as unknown as { delegate: { close: AcpRuntime["close"] } }).delegate, }; -}); - -vi.mock("acpx/runtime", () => ({ - ACPX_BACKEND_ID: "acpx", - AcpxRuntime: mocks.MockAcpxRuntime, - createAcpRuntime: vi.fn(), - createAgentRegistry: vi.fn(), - createFileSessionStore: vi.fn(), - decodeAcpxRuntimeHandleState: vi.fn(), - encodeAcpxRuntimeHandleState: vi.fn(), -})); - -import { AcpxRuntime } from "./runtime.js"; +} describe("AcpxRuntime fresh reset wrapper", () => { beforeEach(() => { - mocks.state.capturedStore = undefined; + vi.restoreAllMocks(); }); it("keeps stale persistent loads hidden until a fresh record is saved", async () => { @@ -77,20 +40,9 @@ describe("AcpxRuntime fresh reset wrapper", () => { save: vi.fn(async () => {}), }; - const runtime = new AcpxRuntime({ - cwd: "/tmp", - sessionStore: baseStore, - agentRegistry: { - resolve: () => "codex", - list: () => ["codex"], - }, - permissionMode: "approve-reads", - }); + const { runtime, wrappedStore } = makeRuntime(baseStore); - const wrappedStore = mocks.state.capturedStore; - expect(wrappedStore).toBeDefined(); - - expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toEqual({ + expect(await wrappedStore.load("agent:codex:acp:binding:test")).toEqual({ acpxRecordId: "stale", }); expect(baseStore.load).toHaveBeenCalledTimes(1); @@ -99,17 +51,17 @@ describe("AcpxRuntime fresh reset wrapper", () => { sessionKey: "agent:codex:acp:binding:test", }); - expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toBeUndefined(); + expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined(); expect(baseStore.load).toHaveBeenCalledTimes(1); - expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toBeUndefined(); + expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined(); expect(baseStore.load).toHaveBeenCalledTimes(1); - await wrappedStore?.save({ + await wrappedStore.save({ acpxRecordId: "fresh-record", name: "agent:codex:acp:binding:test", } as never); - expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toEqual({ + expect(await wrappedStore.load("agent:codex:acp:binding:test")).toEqual({ acpxRecordId: "stale", }); expect(baseStore.load).toHaveBeenCalledTimes(2); @@ -121,18 +73,8 @@ describe("AcpxRuntime fresh reset wrapper", () => { save: vi.fn(async () => {}), }; - const runtime = new AcpxRuntime({ - cwd: "/tmp", - sessionStore: baseStore, - agentRegistry: { - resolve: () => "codex", - list: () => ["codex"], - }, - permissionMode: "approve-reads", - }); - - const wrappedStore = mocks.state.capturedStore; - expect(wrappedStore).toBeDefined(); + const { runtime, wrappedStore, delegate } = makeRuntime(baseStore); + const close = vi.spyOn(delegate, "close").mockResolvedValue(undefined); await runtime.close({ handle: { @@ -144,7 +86,16 @@ describe("AcpxRuntime fresh reset wrapper", () => { discardPersistentState: true, }); - expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toBeUndefined(); + expect(close).toHaveBeenCalledWith({ + handle: { + sessionKey: "agent:codex:acp:binding:test", + backend: "acpx", + runtimeSessionName: "agent:codex:acp:binding:test", + }, + reason: "new-in-place-reset", + discardPersistentState: true, + }); + expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined(); expect(baseStore.load).not.toHaveBeenCalled(); }); }); diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index 6ca0b466012..03e2436343c 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -131,6 +131,7 @@ export class AcpxRuntime implements AcpxRuntimeLike { .close({ handle: input.handle, reason: input.reason, + discardPersistentState: input.discardPersistentState, }) .then(() => { if (input.discardPersistentState) { diff --git a/extensions/discord/src/monitor/agent-components.ts b/extensions/discord/src/monitor/agent-components.ts index 6f8b05f3cc8..f7ab0acaaa8 100644 --- a/extensions/discord/src/monitor/agent-components.ts +++ b/extensions/discord/src/monitor/agent-components.ts @@ -30,6 +30,7 @@ import { createNonExitingRuntime, logVerbose } from "openclaw/plugin-sdk/runtime import { resolveOpenProviderRuntimeGroupPolicy } from "openclaw/plugin-sdk/runtime-group-policy"; import { logDebug, logError } from "openclaw/plugin-sdk/text-runtime"; import { resolveDiscordMaxLinesPerMessage } from "../accounts.js"; +import { createDiscordRestClient } from "../client.js"; import { parseDiscordComponentCustomIdForCarbon, parseDiscordModalCustomIdForCarbon, @@ -512,6 +513,11 @@ async function dispatchDiscordComponentEvent(params: { fallbackLimit: 2000, }); const token = ctx.token ?? ""; + const feedbackRest = createDiscordRestClient({ + cfg: ctx.cfg, + token, + accountId, + }).rest; const mediaLocalRoots = getAgentScopedMediaLocalRoots(ctx.cfg, agentId); const replyToMode = ctx.discordConfig?.replyToMode ?? ctx.cfg.channels?.discord?.replyToMode ?? "off"; @@ -554,7 +560,7 @@ async function dispatchDiscordComponentEvent(params: { onReplyStart: async () => { try { const { sendTyping } = await loadTypingRuntime(); - await sendTyping({ client: interaction.client, channelId: typingChannelId }); + await sendTyping({ rest: feedbackRest, channelId: typingChannelId }); } catch (err) { logVerbose(`discord: typing failed for component reply: ${String(err)}`); } diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 359415aeceb..9c56737dae9 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -158,6 +158,9 @@ vi.spyOn(configRuntimeModule, "resolveStorePath").mockImplementation( ) => configSessionsMocks.resolveStorePath(path, opts) as never) as never, ); +const clientModule = await import("../client.js"); +const createDiscordRestClientSpy = vi.spyOn(clientModule, "createDiscordRestClient"); + const BASE_CHANNEL_ROUTE = { agentId: "main", channel: "discord", @@ -214,6 +217,7 @@ beforeEach(() => { recordInboundSession.mockClear(); readSessionUpdatedAt.mockClear(); resolveStorePath.mockClear(); + createDiscordRestClientSpy.mockClear(); dispatchInboundMessage.mockResolvedValue(createNoQueuedDispatchResult()); recordInboundSession.mockResolvedValue(undefined); readSessionUpdatedAt.mockReturnValue(undefined); @@ -278,7 +282,7 @@ function expectAckReactionRuntimeOptions(params?: { messages.removeAckAfterReply = params.removeAckAfterReply; } return expect.objectContaining({ - rest: {}, + rest: expect.anything(), ...(Object.keys(messages).length > 0 ? { cfg: expect.objectContaining({ messages: expect.objectContaining(messages) }) } : {}), @@ -337,7 +341,7 @@ function expectSinglePreviewEdit() { "c1", "preview-1", { content: "Hello\nWorld" }, - { rest: {} }, + expect.objectContaining({ rest: expect.anything() }), ); expect(deliverDiscordReply).not.toHaveBeenCalled(); } @@ -397,6 +401,39 @@ describe("processDiscordMessage ack reactions", () => { }); }); + it("uses separate REST clients for feedback and reply delivery", async () => { + const feedbackRest = { post: vi.fn(async () => undefined) }; + const deliveryRest = { post: vi.fn(async () => undefined) }; + createDiscordRestClientSpy + .mockReturnValueOnce({ + token: "feedback-token", + rest: feedbackRest as never, + account: { config: {} } as never, + }) + .mockReturnValueOnce({ + token: "delivery-token", + rest: deliveryRest as never, + account: { config: {} } as never, + }); + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ text: "hello" }); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext(); + + await runProcessDiscordMessage(ctx); + + expect(sendMocks.reactMessageDiscord).toHaveBeenCalled(); + expect(sendMocks.reactMessageDiscord.mock.calls[0]?.[3]).toEqual( + expect.objectContaining({ rest: feedbackRest }), + ); + expect(deliverDiscordReply).toHaveBeenCalledWith( + expect.objectContaining({ rest: deliveryRest }), + ); + expect(feedbackRest).not.toBe(deliveryRest); + }); + it("debounces intermediate phase reactions and jumps to done for short runs", async () => { dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { await params?.replyOptions?.onReasoningStream?.(); @@ -733,7 +770,7 @@ describe("processDiscordMessage draft streaming", () => { "c1", "preview-1", { content: longReply }, - { rest: {} }, + expect.objectContaining({ rest: expect.anything() }), ); expect(deliverDiscordReply).not.toHaveBeenCalled(); }); diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index dc048b6d786..ee65c09d1a3 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -45,6 +45,7 @@ import { } from "openclaw/plugin-sdk/text-runtime"; import { resolveDiscordMaxLinesPerMessage } from "../accounts.js"; import { chunkDiscordTextWithMode } from "../chunk.js"; +import { createDiscordRestClient } from "../client.js"; import { resolveDiscordDraftStreamingChunking } from "../draft-chunking.js"; import { createDiscordDraftStream } from "../draft-stream.js"; import { resolveDiscordPreviewStreamMode } from "../preview-streaming.js"; @@ -209,9 +210,19 @@ export async function processDiscordMessage( const shouldSendAckReaction = shouldAckReaction(); const statusReactionsEnabled = shouldSendAckReaction && cfg.messages?.statusReactions?.enabled !== false; + const feedbackRest = createDiscordRestClient({ + cfg, + token, + accountId, + }).rest as unknown as RequestClient; + const deliveryRest = createDiscordRestClient({ + cfg, + token, + accountId, + }).rest as unknown as RequestClient; // Discord outbound helpers expect Carbon's request client shape explicitly. const ackReactionContext = createDiscordAckReactionContext({ - rest: client.rest as unknown as RequestClient, + rest: feedbackRest, cfg, accountId, }); @@ -522,7 +533,7 @@ export async function processDiscordMessage( channel: "discord", accountId: route.accountId, typing: { - start: () => sendTyping({ client, channelId: typingChannelId }), + start: () => sendTyping({ rest: feedbackRest, channelId: typingChannelId }), onStartError: (err) => { logTypingFailure({ log: logVerbose, @@ -560,7 +571,7 @@ export async function processDiscordMessage( : messageChannelId; const draftStream = canStreamDraft ? createDiscordDraftStream({ - rest: client.rest, + rest: deliveryRest, channelId: deliverChannelId, maxChars: draftMaxChars, replyToMessageId: draftReplyToMessageId, @@ -746,7 +757,7 @@ export async function processDiscordMessage( deliverChannelId, previewMessageId, { content: previewFinalText }, - { rest: client.rest }, + { rest: deliveryRest }, ); finalizedViaPreviewMessage = true; replyReference.markSent(); @@ -779,7 +790,7 @@ export async function processDiscordMessage( deliverChannelId, messageIdAfterStop, { content: previewFinalText }, - { rest: client.rest }, + { rest: deliveryRest }, ); finalizedViaPreviewMessage = true; replyReference.markSent(); @@ -812,7 +823,7 @@ export async function processDiscordMessage( target: deliverTarget, token, accountId, - rest: client.rest, + rest: deliveryRest, runtime, replyToId, replyToMode, diff --git a/extensions/discord/src/monitor/native-command.commands-allowfrom.test.ts b/extensions/discord/src/monitor/native-command.commands-allowfrom.test.ts index 7bbece51bdc..bd7e9c9305d 100644 --- a/extensions/discord/src/monitor/native-command.commands-allowfrom.test.ts +++ b/extensions/discord/src/monitor/native-command.commands-allowfrom.test.ts @@ -95,18 +95,19 @@ async function runGuildSlashCommand(params?: { } function expectNotUnauthorizedReply(interaction: MockCommandInteraction) { - expect(interaction.reply).not.toHaveBeenCalledWith( + expect(interaction.followUp).not.toHaveBeenCalledWith( expect.objectContaining({ content: "You are not authorized to use this command." }), ); } function expectUnauthorizedReply(interaction: MockCommandInteraction) { - expect(interaction.reply).toHaveBeenCalledWith( + expect(interaction.followUp).toHaveBeenCalledWith( expect.objectContaining({ content: "You are not authorized to use this command.", ephemeral: true, }), ); + expect(interaction.reply).not.toHaveBeenCalled(); } describe("Discord native slash commands with commands.allowFrom", () => { @@ -279,8 +280,10 @@ describe("Discord native slash commands with commands.allowFrom", () => { | undefined; await dispatchCall?.dispatcherOptions.deliver({ text: longReply }, { kind: "final" }); - expect(interaction.reply).toHaveBeenCalledWith(expect.objectContaining({ content: longReply })); - expect(interaction.followUp).not.toHaveBeenCalled(); + expect(interaction.followUp).toHaveBeenCalledWith( + expect.objectContaining({ content: longReply }), + ); + expect(interaction.reply).not.toHaveBeenCalled(); }); it("swallows expired slash interactions before dispatch when defer returns Unknown interaction", async () => { diff --git a/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts b/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts index 3dccdbb6f3f..84887695426 100644 --- a/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts +++ b/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts @@ -282,9 +282,10 @@ async function expectPairCommandReply(params: { ); expect(dispatchSpy).not.toHaveBeenCalled(); - expect(params.interaction.reply).toHaveBeenCalledWith( + expect(params.interaction.followUp).toHaveBeenCalledWith( expect.objectContaining({ content: "paired:now" }), ); + expect(params.interaction.reply).not.toHaveBeenCalled(); } async function createStatusCommand(cfg: OpenClawConfig) { @@ -465,12 +466,13 @@ describe("Discord native plugin command dispatch", () => { expect(executeSpy).not.toHaveBeenCalled(); expect(dispatchSpy).not.toHaveBeenCalled(); - expect(interaction.reply).toHaveBeenCalledWith( + expect(interaction.followUp).toHaveBeenCalledWith( expect.objectContaining({ content: "You are not authorized to use this command.", ephemeral: true, }), ); + expect(interaction.reply).not.toHaveBeenCalled(); }); it("rejects group DM slash commands outside dm.groupChannels before dispatch", async () => { @@ -501,11 +503,12 @@ describe("Discord native plugin command dispatch", () => { await (command as { run: (interaction: unknown) => Promise }).run(interaction as unknown); expect(dispatchSpy).not.toHaveBeenCalled(); - expect(interaction.reply).toHaveBeenCalledWith( + expect(interaction.followUp).toHaveBeenCalledWith( expect.objectContaining({ content: "This group DM is not allowed.", }), ); + expect(interaction.reply).not.toHaveBeenCalled(); }); it("executes matched plugin commands directly without invoking the agent dispatcher", async () => { @@ -540,9 +543,10 @@ describe("Discord native plugin command dispatch", () => { expect(executeSpy).toHaveBeenCalledTimes(1); expect(dispatchSpy).not.toHaveBeenCalled(); - expect(interaction.reply).toHaveBeenCalledWith( + expect(interaction.followUp).toHaveBeenCalledWith( expect.objectContaining({ content: "direct plugin output" }), ); + expect(interaction.reply).not.toHaveBeenCalled(); }); it("forwards Discord thread metadata into direct plugin command execution", async () => { diff --git a/extensions/discord/src/monitor/native-command.ts b/extensions/discord/src/monitor/native-command.ts index ac2a8ecd099..f49a9355917 100644 --- a/extensions/discord/src/monitor/native-command.ts +++ b/extensions/discord/src/monitor/native-command.ts @@ -715,7 +715,9 @@ export function createDiscordNativeCommand(params: { discordConfig, accountId, sessionPrefix, - preferFollowUp: false, + // Slash commands are deferred up front, so all later responses must use + // follow-up/edit semantics instead of the initial reply endpoint. + preferFollowUp: true, threadBindings, }); } diff --git a/extensions/discord/src/monitor/typing.test.ts b/extensions/discord/src/monitor/typing.test.ts new file mode 100644 index 00000000000..ce1a37b38c3 --- /dev/null +++ b/extensions/discord/src/monitor/typing.test.ts @@ -0,0 +1,42 @@ +import { Routes } from "discord-api-types/v10"; +import { describe, expect, it, vi } from "vitest"; +import { sendTyping } from "./typing.js"; + +describe("sendTyping", () => { + it("uses the direct Discord typing REST endpoint", async () => { + const rest = { + post: vi.fn(async () => {}), + }; + + await sendTyping({ + // @ts-expect-error test stub only needs rest.post + rest, + channelId: "12345", + }); + + expect(rest.post).toHaveBeenCalledTimes(1); + expect(rest.post).toHaveBeenCalledWith(Routes.channelTyping("12345")); + }); + + it("times out when the typing endpoint hangs", async () => { + vi.useFakeTimers(); + try { + const rest = { + post: vi.fn(() => new Promise(() => {})), + }; + + const promise = sendTyping({ + // @ts-expect-error test stub only needs rest.post + rest, + channelId: "12345", + }); + const rejection = expect(promise).rejects.toThrow("discord typing start timed out"); + + await vi.advanceTimersByTimeAsync(5_000); + + await rejection; + } finally { + vi.useRealTimers(); + } + }); +}); diff --git a/extensions/discord/src/monitor/typing.ts b/extensions/discord/src/monitor/typing.ts index 9c53612277b..347dbc32408 100644 --- a/extensions/discord/src/monitor/typing.ts +++ b/extensions/discord/src/monitor/typing.ts @@ -1,11 +1,23 @@ -import type { Client } from "@buape/carbon"; +import type { RequestClient } from "@buape/carbon"; +import { Routes } from "discord-api-types/v10"; -export async function sendTyping(params: { client: Client; channelId: string }) { - const channel = await params.client.fetchChannel(params.channelId); - if (!channel) { - return; - } - if ("triggerTyping" in channel && typeof channel.triggerTyping === "function") { - await channel.triggerTyping(); +const DISCORD_TYPING_START_TIMEOUT_MS = 5_000; + +export async function sendTyping(params: { rest: RequestClient; channelId: string }) { + let timer: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => { + reject( + new Error(`discord typing start timed out after ${DISCORD_TYPING_START_TIMEOUT_MS}ms`), + ); + }, DISCORD_TYPING_START_TIMEOUT_MS); + timer.unref?.(); + }); + try { + await Promise.race([params.rest.post(Routes.channelTyping(params.channelId)), timeoutPromise]); + } finally { + if (timer) { + clearTimeout(timer); + } } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a86ab265275..ac1e6c4db99 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -267,8 +267,8 @@ importers: extensions/acpx: dependencies: acpx: - specifier: 0.5.1 - version: 0.5.1 + specifier: 0.5.2 + version: 0.5.2 devDependencies: '@openclaw/plugin-sdk': specifier: workspace:* @@ -4081,8 +4081,8 @@ packages: engines: {node: '>=0.4.0'} hasBin: true - acpx@0.5.1: - resolution: {integrity: sha512-r2sWGsztSwsO8JGJAswltQkMnRkKNmTH9faxwRWS9Ad28y2jZcyt7jR7auyCk0zwvDr+Zm/H1byVkWFpJWqzQQ==} + acpx@0.5.2: + resolution: {integrity: sha512-c+jibFqgK2WzUt+hO8e007W6655ROpTfvxAUkC6a/jRK1oaPmsxXmgySCq1/x1uBIbb61qhOvfCS6lW7gZeRgg==} engines: {node: '>=22.12.0'} hasBin: true @@ -10637,7 +10637,7 @@ snapshots: acorn@8.16.0: {} - acpx@0.5.1: + acpx@0.5.2: dependencies: '@agentclientprotocol/sdk': 0.17.1(zod@4.3.6) commander: 14.0.3 diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index a482f16765b..5d7744efa13 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -18,6 +18,7 @@ import { } from "../runtime/errors.js"; import { createIdentityFromEnsure, + identityHasStableSessionId, identityEquals, isSessionIdentityPending, mergeSessionIdentity, @@ -246,7 +247,10 @@ export class AcpSessionManager { continue; } const currentIdentity = resolveSessionIdentityFromMeta(session.acp); - if (!isSessionIdentityPending(currentIdentity)) { + if ( + !isSessionIdentityPending(currentIdentity) || + !identityHasStableSessionId(currentIdentity) + ) { continue; } @@ -1247,65 +1251,96 @@ export class AcpSessionManager { }; } const meta = requireReadySessionMeta(resolution); + const currentIdentity = resolveSessionIdentityFromMeta(meta); + const shouldSkipRuntimeClose = + input.discardPersistentState && + currentIdentity != null && + !identityHasStableSessionId(currentIdentity); let runtimeClosed = false; let runtimeNotice: string | undefined; - try { - const { runtime: ensuredRuntime, handle } = await this.ensureRuntimeHandle({ - cfg: input.cfg, - sessionKey, - meta, - }); - await withAcpRuntimeErrorBoundary({ - run: async () => - await ensuredRuntime.close({ - handle, - reason: input.reason, - discardPersistentState: input.discardPersistentState, - }), - fallbackCode: "ACP_TURN_FAILED", - fallbackMessage: "ACP close failed before completion.", - }); - runtimeClosed = true; - this.clearCachedRuntimeState(sessionKey); - } catch (error) { - const acpError = toAcpRuntimeError({ - error, - fallbackCode: "ACP_TURN_FAILED", - fallbackMessage: "ACP close failed before completion.", - }); - if ( - input.allowBackendUnavailable && - (acpError.code === "ACP_BACKEND_MISSING" || - acpError.code === "ACP_BACKEND_UNAVAILABLE" || - (input.discardPersistentState && acpError.code === "ACP_SESSION_INIT_FAILED") || - this.isRecoverableAcpxExitError(acpError.message)) - ) { - if (input.discardPersistentState) { - const configuredBackend = (meta.backend || input.cfg.acp?.backend || "").trim(); - try { - const runtimeBackend = this.deps.requireRuntimeBackend( - configuredBackend || undefined, - ); - await runtimeBackend.runtime.prepareFreshSession?.({ + if (shouldSkipRuntimeClose) { + if (input.discardPersistentState) { + const configuredBackend = (meta.backend || input.cfg.acp?.backend || "").trim(); + try { + await this.deps + .getRuntimeBackend(configuredBackend || undefined) + ?.runtime.prepareFreshSession?.({ sessionKey, }); - } catch (recoveryError) { - logVerbose( - `acp close recovery: unable to prepare fresh session for ${sessionKey}: ${formatErrorMessage(recoveryError)}`, - ); - } + } catch (error) { + logVerbose( + `acp close fast-reset: unable to prepare fresh session for ${sessionKey}: ${error instanceof Error ? error.message : String(error)}`, + ); } - // Treat unavailable backends as terminal for this cached handle so it - // cannot continue counting against maxConcurrentSessions. + } + this.clearCachedRuntimeState(sessionKey); + } else { + try { + const { runtime: ensuredRuntime, handle } = await this.ensureRuntimeHandle({ + cfg: input.cfg, + sessionKey, + meta, + }); + await withAcpRuntimeErrorBoundary({ + run: async () => + await ensuredRuntime.close({ + handle, + reason: input.reason, + discardPersistentState: input.discardPersistentState, + }), + fallbackCode: "ACP_TURN_FAILED", + fallbackMessage: "ACP close failed before completion.", + }); + runtimeClosed = true; this.clearCachedRuntimeState(sessionKey); - runtimeNotice = acpError.message; - } else { - throw acpError; + } catch (error) { + const acpError = toAcpRuntimeError({ + error, + fallbackCode: "ACP_TURN_FAILED", + fallbackMessage: "ACP close failed before completion.", + }); + if ( + input.allowBackendUnavailable && + (acpError.code === "ACP_BACKEND_MISSING" || + acpError.code === "ACP_BACKEND_UNAVAILABLE" || + (input.discardPersistentState && acpError.code === "ACP_SESSION_INIT_FAILED") || + this.isRecoverableAcpxExitError(acpError.message)) + ) { + if (input.discardPersistentState) { + const configuredBackend = (meta.backend || input.cfg.acp?.backend || "").trim(); + try { + const runtimeBackend = this.deps.getRuntimeBackend(configuredBackend || undefined); + if (!runtimeBackend) { + throw acpError; + } + await runtimeBackend.runtime.prepareFreshSession?.({ + sessionKey, + }); + } catch (recoveryError) { + logVerbose( + `acp close recovery: unable to prepare fresh session for ${sessionKey}: ${recoveryError instanceof Error ? recoveryError.message : String(recoveryError)}`, + ); + } + } + // Treat unavailable backends as terminal for this cached handle so it + // cannot continue counting against maxConcurrentSessions. + this.clearCachedRuntimeState(sessionKey); + runtimeNotice = acpError.message; + } else { + throw acpError; + } } } let metaCleared = false; + if (input.discardPersistentState && !input.clearMeta) { + await this.discardPersistedRuntimeState({ + cfg: input.cfg, + sessionKey, + }); + } + if (input.clearMeta) { await this.writeSessionMeta({ cfg: input.cfg, @@ -1346,11 +1381,16 @@ export class AcpSessionManager { const agentMatches = cached.agent === agent; const modeMatches = cached.mode === mode; const cwdMatches = (cached.cwd ?? "") === (cwd ?? ""); + const handleMatchesMeta = this.runtimeHandleMatchesMeta({ + handle: cached.handle, + meta: params.meta, + }); if ( backendMatches && agentMatches && modeMatches && cwdMatches && + handleMatchesMeta && (await this.isCachedRuntimeHandleReusable({ sessionKey: params.sessionKey, runtime: cached.runtime, @@ -1378,6 +1418,10 @@ export class AcpSessionManager { let identityForEnsure = previousIdentity; const persistedResumeSessionId = mode === "persistent" ? resolveRuntimeResumeSessionId(previousIdentity) : undefined; + const shouldPrepareFreshPersistentSession = + mode === "persistent" && + previousIdentity != null && + !identityHasStableSessionId(previousIdentity); const ensureSession = async (resumeSessionId?: string) => await withAcpRuntimeErrorBoundary({ run: async () => @@ -1392,6 +1436,11 @@ export class AcpSessionManager { fallbackMessage: "Could not initialize ACP session runtime.", }); let ensured: AcpRuntimeHandle; + if (shouldPrepareFreshPersistentSession) { + await runtime.prepareFreshSession?.({ + sessionKey: params.sessionKey, + }); + } if (persistedResumeSessionId) { try { ensured = await ensureSession(persistedResumeSessionId); @@ -1739,6 +1788,49 @@ export class AcpSessionManager { return true; } + private async discardPersistedRuntimeState(params: { + cfg: OpenClawConfig; + sessionKey: string; + }): Promise { + const now = Date.now(); + await this.writeSessionMeta({ + cfg: params.cfg, + sessionKey: params.sessionKey, + mutate: (current, entry) => { + if (!entry) { + return null; + } + const base = current ?? entry.acp; + if (!base) { + return null; + } + const currentIdentity = resolveSessionIdentityFromMeta(base); + const nextIdentity = currentIdentity + ? { + state: "pending" as const, + ...(currentIdentity.acpxRecordId + ? { acpxRecordId: currentIdentity.acpxRecordId } + : {}), + source: currentIdentity.source, + lastUpdatedAt: now, + } + : undefined; + return { + backend: base.backend, + agent: base.agent, + runtimeSessionName: base.runtimeSessionName, + ...(nextIdentity ? { identity: nextIdentity } : {}), + mode: base.mode, + ...(base.runtimeOptions ? { runtimeOptions: base.runtimeOptions } : {}), + ...(base.cwd ? { cwd: base.cwd } : {}), + state: "idle", + lastActivityAt: now, + }; + }, + failOnError: true, + }); + } + private async evictIdleRuntimeHandles(params: { cfg: OpenClawConfig }): Promise { const idleTtlMs = resolveRuntimeIdleTtlMs(params.cfg); if (idleTtlMs <= 0 || this.runtimeCache.size() === 0) { @@ -1994,6 +2086,25 @@ export class AcpSessionManager { ); } + private runtimeHandleMatchesMeta(params: { + handle: AcpRuntimeHandle; + meta: SessionAcpMeta; + }): boolean { + const identity = resolveSessionIdentityFromMeta(params.meta); + const expectedHandleIds = resolveRuntimeHandleIdentifiersFromIdentity(identity); + if ((params.handle.backendSessionId ?? "") !== (expectedHandleIds.backendSessionId ?? "")) { + return false; + } + if ((params.handle.agentSessionId ?? "") !== (expectedHandleIds.agentSessionId ?? "")) { + return false; + } + + const expectedAcpxRecordId = identity?.acpxRecordId ?? ""; + const actualAcpxRecordId = + normalizeText((params.handle as { acpxRecordId?: unknown }).acpxRecordId) ?? ""; + return actualAcpxRecordId === expectedAcpxRecordId; + } + private resolveBackgroundTaskContext(params: { cfg: OpenClawConfig; sessionKey: string; diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index 89ddbc9e6d1..c9d44a4502d 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -12,11 +12,13 @@ const hoisted = vi.hoisted(() => { const listAcpSessionEntriesMock = vi.fn(); const readAcpSessionEntryMock = vi.fn(); const upsertAcpSessionMetaMock = vi.fn(); + const getAcpRuntimeBackendMock = vi.fn(); const requireAcpRuntimeBackendMock = vi.fn(); return { listAcpSessionEntriesMock, readAcpSessionEntryMock, upsertAcpSessionMetaMock, + getAcpRuntimeBackendMock, requireAcpRuntimeBackendMock, }; }); @@ -32,6 +34,7 @@ vi.mock("../runtime/registry.js", async () => { await vi.importActual("../runtime/registry.js"); return { ...actual, + getAcpRuntimeBackend: (backendId?: string) => hoisted.getAcpRuntimeBackendMock(backendId), requireAcpRuntimeBackend: (backendId?: string) => hoisted.requireAcpRuntimeBackendMock(backendId), }; @@ -217,6 +220,13 @@ describe("AcpSessionManager", () => { hoisted.readAcpSessionEntryMock.mockReset(); hoisted.upsertAcpSessionMetaMock.mockReset().mockResolvedValue(null); hoisted.requireAcpRuntimeBackendMock.mockReset(); + hoisted.getAcpRuntimeBackendMock.mockReset().mockImplementation((backendId?: string) => { + try { + return hoisted.requireAcpRuntimeBackendMock(backendId); + } catch { + return null; + } + }); }); afterEach(() => { @@ -789,6 +799,79 @@ describe("AcpSessionManager", () => { expect(runtimeState.runTurn).toHaveBeenCalledTimes(2); }); + it("re-ensures cached runtime handles when persisted ACP session identity changes", async () => { + const runtimeState = createRuntime(); + runtimeState.ensureSession + .mockResolvedValueOnce({ + sessionKey: "agent:codex:acp:session-1", + backend: "acpx", + runtimeSessionName: "runtime-1", + acpxRecordId: "record-1", + backendSessionId: "acpx-session-1", + agentSessionId: "agent-session-1", + }) + .mockResolvedValueOnce({ + sessionKey: "agent:codex:acp:session-1", + backend: "acpx", + runtimeSessionName: "runtime-2", + acpxRecordId: "record-1", + backendSessionId: "acpx-session-2", + agentSessionId: "agent-session-2", + }); + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + let currentMeta = readySessionMeta({ + runtimeSessionName: "runtime-1", + identity: { + state: "resolved", + acpxRecordId: "record-1", + acpxSessionId: "acpx-session-1", + agentSessionId: "agent-session-1", + source: "status", + lastUpdatedAt: Date.now(), + }, + }); + hoisted.readAcpSessionEntryMock.mockImplementation(() => ({ + sessionKey: "agent:codex:acp:session-1", + storeSessionKey: "agent:codex:acp:session-1", + acp: currentMeta, + })); + + const manager = new AcpSessionManager(); + await manager.runTurn({ + cfg: baseCfg, + sessionKey: "agent:codex:acp:session-1", + text: "first", + mode: "prompt", + requestId: "r1", + }); + + currentMeta = readySessionMeta({ + runtimeSessionName: "runtime-2", + identity: { + state: "resolved", + acpxRecordId: "record-1", + acpxSessionId: "acpx-session-2", + agentSessionId: "agent-session-2", + source: "status", + lastUpdatedAt: Date.now(), + }, + }); + + await manager.runTurn({ + cfg: baseCfg, + sessionKey: "agent:codex:acp:session-1", + text: "second", + mode: "prompt", + requestId: "r2", + }); + + expect(runtimeState.ensureSession).toHaveBeenCalledTimes(2); + expect(runtimeState.runTurn).toHaveBeenCalledTimes(2); + }); + it("rehydrates runtime handles after a manager restart", async () => { const runtimeState = createRuntime(); hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ @@ -1325,6 +1408,208 @@ describe("AcpSessionManager", () => { }); }); + it("clears persisted resume identity when close discards persistent state", async () => { + const runtimeState = createRuntime(); + const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4"; + const entry = { + sessionKey, + storeSessionKey: sessionKey, + acp: readySessionMeta({ + agent: "claude", + state: "running", + lastError: "stale failure", + identity: { + state: "resolved", + acpxRecordId: sessionKey, + acpxSessionId: "acpx-session-1", + agentSessionId: "agent-session-1", + source: "status", + lastUpdatedAt: 1, + }, + }), + }; + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + hoisted.readAcpSessionEntryMock.mockImplementation(() => entry); + hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { + mutate: ( + current: SessionAcpMeta | undefined, + entry: { acp?: SessionAcpMeta } | undefined, + ) => SessionAcpMeta | null | undefined; + }; + const next = params.mutate(entry.acp, entry); + if (next === null) { + return null; + } + if (next) { + entry.acp = next; + } + return entry; + }); + + const manager = new AcpSessionManager(); + const result = await manager.closeSession({ + cfg: baseCfg, + sessionKey, + reason: "new-in-place-reset", + discardPersistentState: true, + clearMeta: false, + allowBackendUnavailable: true, + }); + + expect(result.runtimeClosed).toBe(true); + expect(entry.acp?.state).toBe("idle"); + expect(entry.acp?.lastError).toBeUndefined(); + expect(entry.acp?.identity).toMatchObject({ + state: "pending", + acpxRecordId: sessionKey, + source: "status", + }); + expect(entry.acp?.identity).not.toHaveProperty("acpxSessionId"); + expect(entry.acp?.identity).not.toHaveProperty("agentSessionId"); + }); + + it("prepares a fresh persistent session before ensure when metadata has no stable session id", async () => { + const runtimeState = createRuntime(); + const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4"; + runtimeState.ensureSession.mockResolvedValue({ + sessionKey, + backend: "acpx", + runtimeSessionName: "runtime-fresh", + acpxRecordId: sessionKey, + backendSessionId: "acpx-session-fresh", + }); + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + + let currentMeta: SessionAcpMeta = readySessionMeta({ + agent: "claude", + identity: { + state: "pending", + acpxRecordId: sessionKey, + source: "status", + lastUpdatedAt: Date.now(), + }, + }); + hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => { + const key = (paramsUnknown as { sessionKey?: string }).sessionKey ?? sessionKey; + return { + sessionKey: key, + storeSessionKey: key, + acp: currentMeta, + }; + }); + hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { + mutate: ( + current: SessionAcpMeta | undefined, + entry: { acp?: SessionAcpMeta } | undefined, + ) => SessionAcpMeta | null | undefined; + }; + const next = params.mutate(currentMeta, { acp: currentMeta }); + if (next) { + currentMeta = next; + } + return { + sessionId: "session-1", + updatedAt: Date.now(), + acp: currentMeta, + }; + }); + + const manager = new AcpSessionManager(); + await expect( + manager.runTurn({ + cfg: baseCfg, + sessionKey, + text: "who are you?", + mode: "prompt", + requestId: "r-fresh", + }), + ).resolves.toBeUndefined(); + + expect(runtimeState.prepareFreshSession).toHaveBeenCalledWith({ + sessionKey, + }); + expect(runtimeState.ensureSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + }), + ); + expect(runtimeState.prepareFreshSession.mock.invocationCallOrder[0]).toBeLessThan( + runtimeState.ensureSession.mock.invocationCallOrder[0], + ); + }); + + it("skips runtime re-ensure when discarding a pending persistent session", async () => { + const runtimeState = createRuntime(); + const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4"; + hoisted.getAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + + const entry = { + sessionKey, + storeSessionKey: sessionKey, + acp: readySessionMeta({ + agent: "claude", + identity: { + state: "pending", + acpxRecordId: sessionKey, + source: "ensure", + lastUpdatedAt: Date.now(), + }, + }), + }; + hoisted.readAcpSessionEntryMock.mockImplementation(() => entry); + hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { + mutate: ( + current: SessionAcpMeta | undefined, + entry: { acp?: SessionAcpMeta } | undefined, + ) => SessionAcpMeta | null | undefined; + }; + const next = params.mutate(entry.acp, entry); + if (next === null) { + return null; + } + if (next) { + entry.acp = next; + } + return entry; + }); + + const manager = new AcpSessionManager(); + const result = await manager.closeSession({ + cfg: baseCfg, + sessionKey, + reason: "new-in-place-reset", + discardPersistentState: true, + clearMeta: false, + allowBackendUnavailable: true, + }); + + expect(result.runtimeClosed).toBe(false); + expect(runtimeState.prepareFreshSession).toHaveBeenCalledWith({ + sessionKey, + }); + expect(runtimeState.ensureSession).not.toHaveBeenCalled(); + expect(runtimeState.close).not.toHaveBeenCalled(); + expect(entry.acp?.identity).toMatchObject({ + state: "pending", + acpxRecordId: sessionKey, + source: "ensure", + }); + expect(entry.acp?.identity).not.toHaveProperty("acpxSessionId"); + expect(entry.acp?.identity).not.toHaveProperty("agentSessionId"); + }); + it("evicts idle cached runtimes before enforcing max concurrent limits", async () => { vi.useFakeTimers(); try { @@ -2063,6 +2348,57 @@ describe("AcpSessionManager", () => { expect(currentMeta.identity?.agentSessionId).toBe("agent-session-1"); }); + it("skips startup reconcile for pending identities without stable runtime ids", async () => { + const runtimeState = createRuntime(); + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + + const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4"; + hoisted.listAcpSessionEntriesMock.mockResolvedValue([ + { + cfg: baseCfg, + storePath: "/tmp/sessions-acp.json", + sessionKey, + storeSessionKey: sessionKey, + entry: { + sessionId: "session-1", + updatedAt: Date.now(), + acp: { + ...readySessionMeta({ + agent: "claude", + }), + identity: { + state: "pending", + acpxRecordId: sessionKey, + source: "status", + lastUpdatedAt: Date.now(), + }, + }, + }, + acp: { + ...readySessionMeta({ + agent: "claude", + }), + identity: { + state: "pending", + acpxRecordId: sessionKey, + source: "status", + lastUpdatedAt: Date.now(), + }, + }, + }, + ]); + + const manager = new AcpSessionManager(); + const result = await manager.reconcilePendingSessionIdentities({ cfg: baseCfg }); + + expect(result).toEqual({ checked: 0, resolved: 0, failed: 0 }); + expect(runtimeState.ensureSession).not.toHaveBeenCalled(); + expect(runtimeState.getStatus).not.toHaveBeenCalled(); + }); + it("reconciles prompt-learned agent session IDs even when runtime status omits them", async () => { const runtimeState = createRuntime(); runtimeState.ensureSession.mockResolvedValue({ @@ -2471,6 +2807,43 @@ describe("AcpSessionManager", () => { expect(result.metaCleared).toBe(false); }); + it("prepares a fresh session during reset recovery even when the backend is unhealthy", async () => { + const runtimeState = createRuntime(); + hoisted.readAcpSessionEntryMock.mockReturnValue({ + sessionKey: "agent:claude:acp:session-1", + storeSessionKey: "agent:claude:acp:session-1", + acp: readySessionMeta({ + agent: "claude", + }), + }); + hoisted.requireAcpRuntimeBackendMock.mockImplementation(() => { + throw new AcpRuntimeError( + "ACP_BACKEND_UNAVAILABLE", + "ACP runtime backend is currently unavailable. Try again in a moment.", + ); + }); + hoisted.getAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + + const manager = new AcpSessionManager(); + const result = await manager.closeSession({ + cfg: baseCfg, + sessionKey: "agent:claude:acp:session-1", + reason: "new-in-place-reset", + discardPersistentState: true, + allowBackendUnavailable: true, + clearMeta: false, + }); + + expect(result.runtimeClosed).toBe(false); + expect(result.runtimeNotice).toContain("currently unavailable"); + expect(runtimeState.prepareFreshSession).toHaveBeenCalledWith({ + sessionKey: "agent:claude:acp:session-1", + }); + }); + it("surfaces metadata clear errors during closeSession", async () => { hoisted.readAcpSessionEntryMock.mockReturnValue({ sessionKey: "agent:codex:acp:session-1", diff --git a/src/acp/control-plane/manager.types.ts b/src/acp/control-plane/manager.types.ts index d5d929bc328..b9fbf87c9db 100644 --- a/src/acp/control-plane/manager.types.ts +++ b/src/acp/control-plane/manager.types.ts @@ -6,7 +6,7 @@ import type { SessionEntry, } from "../../config/sessions/types.js"; import type { AcpRuntimeError } from "../runtime/errors.js"; -import { requireAcpRuntimeBackend } from "../runtime/registry.js"; +import { getAcpRuntimeBackend, requireAcpRuntimeBackend } from "../runtime/registry.js"; import { listAcpSessionEntries, readAcpSessionEntry, @@ -136,6 +136,7 @@ export type AcpSessionManagerDeps = { listAcpSessions: typeof listAcpSessionEntries; readSessionEntry: typeof readAcpSessionEntry; upsertSessionMeta: typeof upsertAcpSessionMeta; + getRuntimeBackend: typeof getAcpRuntimeBackend; requireRuntimeBackend: typeof requireAcpRuntimeBackend; }; @@ -143,6 +144,7 @@ export const DEFAULT_DEPS: AcpSessionManagerDeps = { listAcpSessions: listAcpSessionEntries, readSessionEntry: readAcpSessionEntry, upsertSessionMeta: upsertAcpSessionMeta, + getRuntimeBackend: getAcpRuntimeBackend, requireRuntimeBackend: requireAcpRuntimeBackend, }; diff --git a/src/acp/persistent-bindings.lifecycle.test.ts b/src/acp/persistent-bindings.lifecycle.test.ts index 6beab1b5fe9..21897a87df2 100644 --- a/src/acp/persistent-bindings.lifecycle.test.ts +++ b/src/acp/persistent-bindings.lifecycle.test.ts @@ -1,7 +1,9 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; +import { buildConfiguredAcpSessionKey } from "./persistent-bindings.types.js"; const managerMocks = vi.hoisted(() => ({ + resolveSession: vi.fn(), closeSession: vi.fn(), initializeSession: vi.fn(), updateSessionRuntimeOptions: vi.fn(), @@ -17,6 +19,7 @@ const resolveMocks = vi.hoisted(() => ({ vi.mock("./control-plane/manager.js", () => ({ getAcpSessionManager: () => ({ + resolveSession: managerMocks.resolveSession, closeSession: managerMocks.closeSession, initializeSession: managerMocks.initializeSession, updateSessionRuntimeOptions: managerMocks.updateSessionRuntimeOptions, @@ -45,6 +48,7 @@ beforeAll(async () => { }); beforeEach(() => { + managerMocks.resolveSession.mockReset().mockReturnValue({ kind: "none" }); managerMocks.closeSession.mockReset().mockResolvedValue({ runtimeClosed: true, metaCleared: false, @@ -56,8 +60,18 @@ beforeEach(() => { }); describe("resetAcpSessionInPlace", () => { - it("does not resolve configured bindings when ACP metadata already exists", async () => { - const sessionKey = "agent:claude:acp:binding:demo-binding:default:9373ab192b2317f4"; + it("clears configured bindings and lets the next turn recreate them", async () => { + const spec = { + channel: "demo-binding", + accountId: "default", + conversationId: "9373ab192b2317f4", + agentId: "claude", + mode: "persistent", + backend: "acpx", + cwd: "/home/bob/clawd", + } as const; + const sessionKey = buildConfiguredAcpSessionKey(spec); + resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockReturnValue(spec); sessionMetaMocks.readAcpSessionEntry.mockReturnValue({ acp: { agent: "claude", @@ -66,9 +80,6 @@ describe("resetAcpSessionInPlace", () => { runtimeOptions: { cwd: "/home/bob/clawd" }, }, }); - resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockImplementation(() => { - throw new Error("configured binding resolution should be skipped"); - }); const result = await resetAcpSessionInPlace({ cfg: baseCfg, @@ -77,15 +88,93 @@ describe("resetAcpSessionInPlace", () => { }); expect(result).toEqual({ ok: true }); - expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).not.toHaveBeenCalled(); + expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).toHaveBeenCalledTimes(1); expect(managerMocks.closeSession).toHaveBeenCalledWith( expect.objectContaining({ sessionKey, discardPersistentState: true, - clearMeta: false, + clearMeta: true, }), ); expect(managerMocks.initializeSession).not.toHaveBeenCalled(); expect(managerMocks.updateSessionRuntimeOptions).not.toHaveBeenCalled(); }); + + it("falls back to close-only resets when no configured binding exists", async () => { + const sessionKey = "agent:claude:acp:binding:demo-binding:default:9373ab192b2317f4"; + sessionMetaMocks.readAcpSessionEntry.mockReturnValue({ + acp: { + agent: "claude", + mode: "persistent", + backend: "acpx", + }, + }); + + const result = await resetAcpSessionInPlace({ + cfg: baseCfg, + sessionKey, + reason: "reset", + }); + + expect(result).toEqual({ ok: true }); + expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).toHaveBeenCalledTimes(1); + expect(managerMocks.closeSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + clearMeta: false, + }), + ); + expect(managerMocks.initializeSession).not.toHaveBeenCalled(); + }); + + it("can force metadata clearing for bound ACP targets outside the configured registry", async () => { + const sessionKey = "agent:claude:acp:binding:demo-binding:default:9373ab192b2317f4"; + sessionMetaMocks.readAcpSessionEntry.mockReturnValue({ + acp: { + agent: "claude", + mode: "persistent", + backend: "acpx", + }, + }); + + const result = await resetAcpSessionInPlace({ + cfg: baseCfg, + sessionKey, + reason: "new", + clearMeta: true, + }); + + expect(result).toEqual({ ok: true }); + expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).toHaveBeenCalledTimes(1); + expect(managerMocks.closeSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + clearMeta: true, + }), + ); + }); + + it("treats configured bindings with no ACP metadata as already reset", async () => { + const spec = { + channel: "demo-binding", + accountId: "default", + conversationId: "9373ab192b2317f4", + agentId: "claude", + mode: "persistent", + backend: "acpx", + cwd: "/home/bob/clawd", + } as const; + const sessionKey = buildConfiguredAcpSessionKey(spec); + resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockReturnValue(spec); + + const result = await resetAcpSessionInPlace({ + cfg: baseCfg, + sessionKey, + reason: "new", + }); + + expect(result).toEqual({ ok: true }); + expect(managerMocks.closeSession).not.toHaveBeenCalled(); + expect(managerMocks.initializeSession).not.toHaveBeenCalled(); + }); }); diff --git a/src/acp/persistent-bindings.lifecycle.ts b/src/acp/persistent-bindings.lifecycle.ts index f9141924e5d..7e031282f97 100644 --- a/src/acp/persistent-bindings.lifecycle.ts +++ b/src/acp/persistent-bindings.lifecycle.ts @@ -139,6 +139,7 @@ export async function resetAcpSessionInPlace(params: { cfg: OpenClawConfig; sessionKey: string; reason: "new" | "reset"; + clearMeta?: boolean; }): Promise<{ ok: true } | { ok: false; skipped?: boolean; error?: string }> { const sessionKey = params.sessionKey.trim(); if (!sessionKey) { @@ -152,26 +153,14 @@ export async function resetAcpSessionInPlace(params: { cfg: params.cfg, sessionKey, })?.acp; - const configuredBinding = - !meta || !normalizeText(meta.agent) - ? resolveConfiguredAcpBindingSpecBySessionKey({ - cfg: params.cfg, - sessionKey, - }) - : null; + const configuredBinding = resolveConfiguredAcpBindingSpecBySessionKey({ + cfg: params.cfg, + sessionKey, + }); + const clearMeta = params.clearMeta ?? Boolean(configuredBinding); if (!meta) { - if (configuredBinding) { - const ensured = await ensureConfiguredAcpBindingSession({ - cfg: params.cfg, - spec: configuredBinding, - }); - if (ensured.ok) { - return { ok: true }; - } - return { - ok: false, - error: ensured.error, - }; + if (clearMeta) { + return { ok: true }; } return { ok: false, @@ -187,14 +176,11 @@ export async function resetAcpSessionInPlace(params: { sessionKey, reason: `${params.reason}-in-place-reset`, discardPersistentState: true, - clearMeta: false, + clearMeta, allowBackendUnavailable: true, requireAcpSession: false, }); - // Bound ACP /new and /reset should return as soon as the previous - // runtime state is discarded. The fresh session can be recreated lazily - // on the next turn through the normal binding readiness path. return { ok: true }; } catch (error) { const message = formatErrorMessage(error); diff --git a/src/acp/persistent-bindings.test.ts b/src/acp/persistent-bindings.test.ts index 4ea017f9e88..5f72ed8f147 100644 --- a/src/acp/persistent-bindings.test.ts +++ b/src/acp/persistent-bindings.test.ts @@ -444,6 +444,7 @@ beforeEach(() => { ]), ); managerMocks.resolveSession.mockReset(); + managerMocks.resolveSession.mockReturnValue({ kind: "none" }); managerMocks.closeSession.mockReset().mockResolvedValue({ runtimeClosed: true, metaCleared: true, @@ -968,7 +969,7 @@ describe("ensureConfiguredAcpBindingSession", () => { }); describe("resetAcpSessionInPlace", () => { - it("reinitializes from configured binding when ACP metadata is missing", async () => { + it("treats configured bindings without ACP metadata as already reset", async () => { const cfg = createCfgWithBindings([ createDiscordBinding({ agentId: "claude", @@ -996,18 +997,28 @@ describe("resetAcpSessionInPlace", () => { }); expect(result).toEqual({ ok: true }); - expect(managerMocks.initializeSession).toHaveBeenCalledWith( - expect.objectContaining({ - sessionKey, - agent: "claude", - mode: "persistent", - backendId: "acpx", - }), - ); + expect(managerMocks.initializeSession).not.toHaveBeenCalled(); }); - it("preserves ACP metadata while discarding runtime state for existing sessions", async () => { - const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4"; + it("clears existing configured ACP sessions and lets the next turn recreate them", async () => { + const cfg = createCfgWithBindings([ + createDiscordBinding({ + agentId: "claude", + conversationId: "1478844424791396446", + acp: { + mode: "persistent", + backend: "acpx", + }, + }), + ]); + const sessionKey = buildConfiguredAcpSessionKey({ + channel: "discord", + accountId: "default", + conversationId: "1478844424791396446", + agentId: "claude", + mode: "persistent", + backend: "acpx", + }); sessionMetaMocks.readAcpSessionEntry.mockReturnValue({ acp: { agent: "claude", @@ -1018,7 +1029,7 @@ describe("resetAcpSessionInPlace", () => { }); const result = await persistentBindings.resetAcpSessionInPlace({ - cfg: baseCfg, + cfg, sessionKey, reason: "reset", }); @@ -1027,7 +1038,7 @@ describe("resetAcpSessionInPlace", () => { expect(managerMocks.closeSession).toHaveBeenCalledWith( expect.objectContaining({ sessionKey, - clearMeta: false, + clearMeta: true, }), ); expect(managerMocks.initializeSession).not.toHaveBeenCalled(); @@ -1092,14 +1103,27 @@ describe("resetAcpSessionInPlace", () => { ); }); - it("does not eagerly reinitialize harness agent sessions during in-place reset", async () => { + it("clears configured harness agent sessions during in-place reset", async () => { const cfg = { ...baseCfg, + bindings: [ + createDiscordBinding({ + agentId: "coding", + conversationId: "1478844424791396446", + }), + ], agents: { list: [{ id: "main" }, { id: "coding" }], }, } satisfies OpenClawConfig; - const sessionKey = "agent:coding:acp:binding:discord:default:9373ab192b2317f4"; + const sessionKey = buildConfiguredAcpSessionKey({ + channel: "discord", + accountId: "default", + conversationId: "1478844424791396446", + agentId: "coding", + mode: "persistent", + backend: "acpx", + }); sessionMetaMocks.readAcpSessionEntry.mockReturnValue({ acp: { agent: "codex", @@ -1118,7 +1142,7 @@ describe("resetAcpSessionInPlace", () => { expect(managerMocks.initializeSession).not.toHaveBeenCalled(); }); - it("does not eagerly reinitialize configured ACP agent overrides when metadata omits the agent", async () => { + it("clears configured ACP agent overrides even when metadata omits the agent", async () => { const cfg = createCfgWithBindings( [ createDiscordBinding({ diff --git a/src/auto-reply/reply/commands-reset-hooks.test.ts b/src/auto-reply/reply/commands-reset-hooks.test.ts index fcacf2d46fc..9511a15ea7a 100644 --- a/src/auto-reply/reply/commands-reset-hooks.test.ts +++ b/src/auto-reply/reply/commands-reset-hooks.test.ts @@ -6,9 +6,9 @@ import type { HandleCommandsParams } from "./commands-types.js"; import { parseInlineDirectives } from "./directive-handling.parse.js"; const triggerInternalHookMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined)); - -vi.mock("../../channels/plugins/binding-targets.js", () => ({ - resetConfiguredBindingTargetInPlace: vi.fn().mockResolvedValue({ ok: false, skipped: true }), +const resetMocks = vi.hoisted(() => ({ + resetConfiguredBindingTargetInPlace: vi.fn().mockResolvedValue({ ok: true as const }), + resolveBoundAcpThreadSessionKey: vi.fn(() => undefined as string | undefined), })); vi.mock("../../hooks/internal-hooks.js", () => ({ @@ -37,8 +37,12 @@ vi.mock("../commands-registry.js", () => ({ shouldHandleTextCommands: () => true, })); +vi.mock("../../channels/plugins/binding-targets.js", () => ({ + resetConfiguredBindingTargetInPlace: resetMocks.resetConfiguredBindingTargetInPlace, +})); + vi.mock("./commands-acp/targets.js", () => ({ - resolveBoundAcpThreadSessionKey: vi.fn(() => undefined), + resolveBoundAcpThreadSessionKey: resetMocks.resolveBoundAcpThreadSessionKey, })); vi.mock("./commands-handlers.runtime.js", () => ({ @@ -96,6 +100,8 @@ function buildResetParams( describe("handleCommands reset hooks", () => { beforeEach(() => { vi.clearAllMocks(); + resetMocks.resetConfiguredBindingTargetInPlace.mockResolvedValue({ ok: true }); + resetMocks.resolveBoundAcpThreadSessionKey.mockReturnValue(undefined); }); it("triggers hooks for /new commands", async () => { @@ -149,4 +155,62 @@ describe("handleCommands reset hooks", () => { triggerInternalHookMock.mockClear(); } }); + + it("uses gateway session reset for bound ACP sessions", async () => { + resetMocks.resolveBoundAcpThreadSessionKey.mockReturnValue( + "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + ); + const params = buildResetParams( + "/reset", + { + commands: { text: true }, + channels: { discord: { allowFrom: ["*"] } }, + } as OpenClawConfig, + { + Provider: "discord", + Surface: "discord", + CommandSource: "native", + }, + ); + + const result = await maybeHandleResetCommand(params); + + expect(resetMocks.resetConfiguredBindingTargetInPlace).toHaveBeenCalledWith({ + cfg: expect.any(Object), + sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + reason: "reset", + commandSource: "discord:native", + }); + expect(result).toEqual({ + shouldContinue: false, + reply: { text: "✅ ACP session reset in place." }, + }); + expect(triggerInternalHookMock).not.toHaveBeenCalled(); + expect(params.command.resetHookTriggered).toBe(true); + }); + + it("keeps tail dispatch after a bound ACP reset", async () => { + resetMocks.resolveBoundAcpThreadSessionKey.mockReturnValue( + "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + ); + const params = buildResetParams( + "/new who are you", + { + commands: { text: true }, + channels: { discord: { allowFrom: ["*"] } }, + } as OpenClawConfig, + { + Provider: "discord", + Surface: "discord", + CommandSource: "native", + }, + ); + + const result = await maybeHandleResetCommand(params); + + expect(result).toEqual({ shouldContinue: false }); + expect(params.ctx.Body).toBe("who are you"); + expect(params.ctx.CommandBody).toBe("who are you"); + expect(params.ctx.AcpDispatchTailAfterReset).toBe(true); + }); }); diff --git a/src/auto-reply/reply/commands-reset.ts b/src/auto-reply/reply/commands-reset.ts index 3de28216b14..7fad691209c 100644 --- a/src/auto-reply/reply/commands-reset.ts +++ b/src/auto-reply/reply/commands-reset.ts @@ -16,29 +16,6 @@ function applyAcpResetTailContext(ctx: HandleCommandsParams["ctx"], resetTail: s mutableCtx.AcpDispatchTailAfterReset = true; } -function resolveSessionEntryForHookSessionKey( - sessionStore: HandleCommandsParams["sessionStore"] | undefined, - sessionKey: string, -): HandleCommandsParams["sessionEntry"] | undefined { - if (!sessionStore) { - return undefined; - } - const directEntry = sessionStore[sessionKey]; - if (directEntry) { - return directEntry; - } - const normalizedTarget = sessionKey.trim().toLowerCase(); - if (!normalizedTarget) { - return undefined; - } - for (const [candidateKey, candidateEntry] of Object.entries(sessionStore)) { - if (candidateKey.trim().toLowerCase() === normalizedTarget) { - return candidateEntry; - } - } - return undefined; -} - export async function maybeHandleResetCommand( params: HandleCommandsParams, ): Promise { @@ -65,31 +42,13 @@ export async function maybeHandleResetCommand( cfg: params.cfg, sessionKey: boundAcpKey, reason: commandAction, + commandSource: `${params.command.surface}:${params.ctx.CommandSource ?? "text"}`, }); - if (!resetResult.ok && !resetResult.skipped) { - logVerbose( - `acp reset-in-place failed for ${boundAcpKey}: ${resetResult.error ?? "unknown error"}`, - ); + if (!resetResult.ok) { + logVerbose(`acp reset failed for ${boundAcpKey}: ${resetResult.error ?? "unknown error"}`); } if (resetResult.ok) { - const hookSessionEntry = - boundAcpKey === params.sessionKey - ? params.sessionEntry - : resolveSessionEntryForHookSessionKey(params.sessionStore, boundAcpKey); - const hookPreviousSessionEntry = - boundAcpKey === params.sessionKey - ? params.previousSessionEntry - : resolveSessionEntryForHookSessionKey(params.sessionStore, boundAcpKey); - await emitResetCommandHooks({ - action: commandAction, - ctx: params.ctx, - cfg: params.cfg, - command: params.command, - sessionKey: boundAcpKey, - sessionEntry: hookSessionEntry, - previousSessionEntry: hookPreviousSessionEntry, - workspaceDir: params.workspaceDir, - }); + params.command.resetHookTriggered = true; if (resetTail) { applyAcpResetTailContext(params.ctx, resetTail); if (params.rootCtx && params.rootCtx !== params.ctx) { @@ -102,14 +61,6 @@ export async function maybeHandleResetCommand( reply: { text: "✅ ACP session reset in place." }, }; } - if (resetResult.skipped) { - return { - shouldContinue: false, - reply: { - text: "⚠️ ACP session reset unavailable for this bound conversation. Rebind with /acp bind or /acp spawn.", - }, - }; - } return { shouldContinue: false, reply: { text: "⚠️ ACP session reset failed. Check /acp status and try again." }, diff --git a/src/auto-reply/reply/dispatch-acp-delivery.test.ts b/src/auto-reply/reply/dispatch-acp-delivery.test.ts index 162635d0941..e56b73c5d9f 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.test.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.test.ts @@ -286,6 +286,26 @@ describe("createAcpDispatchDeliveryCoordinator", () => { expect(onReplyStart).toHaveBeenCalledTimes(1); }); + it("does not block delivery when reply lifecycle startup hangs", async () => { + const onReplyStart = vi.fn( + async () => + await new Promise(() => { + // Intentionally never resolve to simulate a stuck typing/reaction side effect. + }), + ); + const coordinator = createCoordinator(onReplyStart); + + const delivered = await Promise.race([ + coordinator.deliver("final", { text: "hello" }).then(() => "delivered"), + new Promise((resolve) => { + setTimeout(() => resolve("timed-out"), 50); + }), + ]); + + expect(delivered).toBe("delivered"); + expect(onReplyStart).toHaveBeenCalledTimes(1); + }); + it("does not start reply lifecycle for empty payload delivery", async () => { const onReplyStart = vi.fn(async () => {}); const coordinator = createCoordinator(onReplyStart); diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index 44603408200..a158c4e70f9 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -215,7 +215,11 @@ export function createAcpDispatchDeliveryCoordinator(params: { return; } state.startedReplyLifecycle = true; - await params.onReplyStart?.(); + void Promise.resolve(params.onReplyStart?.()).catch((error) => { + logVerbose( + `dispatch-acp: reply lifecycle start failed: ${error instanceof Error ? error.message : String(error)}`, + ); + }); }; const tryEditToolMessage = async ( diff --git a/src/channels/plugins/acp-stateful-target-driver.test.ts b/src/channels/plugins/acp-stateful-target-driver.test.ts new file mode 100644 index 00000000000..0a341e47ae6 --- /dev/null +++ b/src/channels/plugins/acp-stateful-target-driver.test.ts @@ -0,0 +1,77 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const resetMocks = vi.hoisted(() => ({ + performGatewaySessionReset: vi.fn(async () => ({ + ok: true as const, + key: "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + entry: { sessionId: "next-session", updatedAt: 1 }, + })), +})); +const sessionMetaMocks = vi.hoisted(() => ({ + readAcpSessionEntry: vi.fn(() => null), +})); +const resolveMocks = vi.hoisted(() => ({ + resolveConfiguredAcpBindingSpecBySessionKey: vi.fn(() => null), +})); + +vi.mock("../../acp/persistent-bindings.lifecycle.js", () => ({ + ensureConfiguredAcpBindingReady: vi.fn(), + ensureConfiguredAcpBindingSession: vi.fn(), +})); +vi.mock("./acp-stateful-target-reset.runtime.js", () => ({ + performGatewaySessionReset: resetMocks.performGatewaySessionReset, +})); +vi.mock("../../acp/runtime/session-meta.js", () => ({ + readAcpSessionEntry: sessionMetaMocks.readAcpSessionEntry, +})); +vi.mock("../../acp/persistent-bindings.resolve.js", () => ({ + resolveConfiguredAcpBindingSpecBySessionKey: + resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey, +})); + +import { acpStatefulBindingTargetDriver } from "./acp-stateful-target-driver.js"; + +describe("acpStatefulBindingTargetDriver", () => { + beforeEach(() => { + resetMocks.performGatewaySessionReset.mockClear(); + sessionMetaMocks.readAcpSessionEntry.mockClear(); + resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockClear(); + }); + + it("delegates bound resets to the gateway session reset authority", async () => { + await expect( + acpStatefulBindingTargetDriver.resetInPlace?.({ + cfg: {} as never, + sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + reason: "new", + commandSource: "discord:native", + bindingTarget: { + kind: "stateful", + driverId: "acp", + sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + agentId: "claude", + }, + }), + ).resolves.toEqual({ ok: true }); + + expect(resetMocks.performGatewaySessionReset).toHaveBeenCalledWith({ + key: "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + reason: "new", + commandSource: "discord:native", + }); + }); + + it("keeps ACP reset available when metadata has already been cleared", () => { + expect( + acpStatefulBindingTargetDriver.resolveTargetBySessionKey?.({ + cfg: {} as never, + sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + }), + ).toEqual({ + kind: "stateful", + driverId: "acp", + sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4", + agentId: "claude", + }); + }); +}); diff --git a/src/channels/plugins/acp-stateful-target-driver.ts b/src/channels/plugins/acp-stateful-target-driver.ts index 787013fc5b0..4c805813f75 100644 --- a/src/channels/plugins/acp-stateful-target-driver.ts +++ b/src/channels/plugins/acp-stateful-target-driver.ts @@ -1,12 +1,13 @@ import { ensureConfiguredAcpBindingReady, ensureConfiguredAcpBindingSession, - resetAcpSessionInPlace, } from "../../acp/persistent-bindings.lifecycle.js"; import { resolveConfiguredAcpBindingSpecBySessionKey } from "../../acp/persistent-bindings.resolve.js"; import { resolveConfiguredAcpBindingSpecFromRecord } from "../../acp/persistent-bindings.types.js"; import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { isAcpSessionKey, resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; +import { performGatewaySessionReset } from "./acp-stateful-target-reset.runtime.js"; import type { ConfiguredBindingResolution, StatefulBindingTargetDescriptor, @@ -22,24 +23,45 @@ function toAcpStatefulBindingTargetDescriptor(params: { cfg: OpenClawConfig; sessionKey: string; }): StatefulBindingTargetDescriptor | null { - const meta = readAcpSessionEntry(params)?.acp; + const sessionKey = params.sessionKey.trim(); + if (!sessionKey) { + return null; + } + const meta = readAcpSessionEntry({ + ...params, + sessionKey, + })?.acp; const metaAgentId = meta?.agent?.trim(); if (metaAgentId) { return { kind: "stateful", driverId: "acp", - sessionKey: params.sessionKey, + sessionKey, agentId: metaAgentId, }; } - const spec = resolveConfiguredAcpBindingSpecBySessionKey(params); + const spec = resolveConfiguredAcpBindingSpecBySessionKey({ + ...params, + sessionKey, + }); if (!spec) { - return null; + if (!isAcpSessionKey(sessionKey)) { + return null; + } + // Bound ACP sessions can intentionally clear their ACP metadata after a + // reset. The native /reset path still needs to recognize the ACP session + // key as resettable while that metadata is absent. + return { + kind: "stateful", + driverId: "acp", + sessionKey, + agentId: resolveAgentIdFromSessionKey(sessionKey), + }; } return { kind: "stateful", driverId: "acp", - sessionKey: params.sessionKey, + sessionKey, agentId: spec.agentId, ...(spec.label ? { label: spec.label } : {}), }; @@ -88,9 +110,22 @@ async function ensureAcpTargetSession(params: { async function resetAcpTargetInPlace(params: { cfg: OpenClawConfig; sessionKey: string; + bindingTarget: StatefulBindingTargetDescriptor; reason: "new" | "reset"; + commandSource?: string; }): Promise { - return await resetAcpSessionInPlace(params); + const result = await performGatewaySessionReset({ + key: params.sessionKey, + reason: params.reason, + commandSource: params.commandSource ?? "stateful-target:acp-reset-in-place", + }); + if (result.ok) { + return { ok: true }; + } + return { + ok: false, + error: result.error.message, + }; } export const acpStatefulBindingTargetDriver: StatefulBindingTargetDriver = { diff --git a/src/channels/plugins/acp-stateful-target-reset.runtime.ts b/src/channels/plugins/acp-stateful-target-reset.runtime.ts new file mode 100644 index 00000000000..f3297e53197 --- /dev/null +++ b/src/channels/plugins/acp-stateful-target-reset.runtime.ts @@ -0,0 +1 @@ +export { performGatewaySessionReset } from "../../gateway/session-reset-service.js"; diff --git a/src/channels/plugins/binding-targets.test.ts b/src/channels/plugins/binding-targets.test.ts index 28d6b2ea9fe..88c4e18d406 100644 --- a/src/channels/plugins/binding-targets.test.ts +++ b/src/channels/plugins/binding-targets.test.ts @@ -166,6 +166,7 @@ describe("binding target drivers", () => { cfg: {} as never, sessionKey: "agent:codex:test-driver", reason: "reset", + commandSource: "discord:native", }), ).resolves.toEqual({ ok: true }); @@ -174,6 +175,7 @@ describe("binding target drivers", () => { cfg: {} as never, sessionKey: "agent:codex:test-driver", reason: "reset", + commandSource: "discord:native", bindingTarget: { kind: "stateful", driverId: "test-driver", diff --git a/src/channels/plugins/binding-targets.ts b/src/channels/plugins/binding-targets.ts index 13c9d6c6d7f..f146cfafd8d 100644 --- a/src/channels/plugins/binding-targets.ts +++ b/src/channels/plugins/binding-targets.ts @@ -31,6 +31,7 @@ export async function resetConfiguredBindingTargetInPlace(params: { cfg: OpenClawConfig; sessionKey: string; reason: "new" | "reset"; + commandSource?: string; }): Promise<{ ok: true } | { ok: false; skipped?: boolean; error?: string }> { await ensureStatefulTargetBuiltinsRegistered(); const resolved = resolveStatefulBindingTargetBySessionKey({ diff --git a/src/channels/plugins/stateful-target-drivers.ts b/src/channels/plugins/stateful-target-drivers.ts index ede52472c57..408c0a19f3c 100644 --- a/src/channels/plugins/stateful-target-drivers.ts +++ b/src/channels/plugins/stateful-target-drivers.ts @@ -31,6 +31,7 @@ export type StatefulBindingTargetDriver = { sessionKey: string; bindingTarget: StatefulBindingTargetDescriptor; reason: "new" | "reset"; + commandSource?: string; }) => Promise; }; diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 6a04155f96a..7a5504da1a5 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -2169,6 +2169,7 @@ describe("gateway server sessions", () => { expect(acpManagerMocks.closeSession).toHaveBeenCalledWith({ allowBackendUnavailable: true, cfg: expect.any(Object), + discardPersistentState: true, requireAcpSession: false, reason: "session-delete", sessionKey: "agent:main:discord:group:dev", @@ -2411,6 +2412,13 @@ describe("gateway server sessions", () => { test("sessions.reset closes ACP runtime handles for ACP sessions", async () => { const { dir, storePath } = await createSessionStoreDir(); await writeSingleLineSession(dir, "sess-main", "hello"); + const prepareFreshSession = vi.fn(async () => {}); + acpRuntimeMocks.getAcpRuntimeBackend.mockReturnValue({ + id: "acpx", + runtime: { + prepareFreshSession, + }, + }); await writeSessionStore({ entries: { @@ -2421,6 +2429,13 @@ describe("gateway server sessions", () => { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:reset", + identity: { + state: "resolved", + acpxRecordId: "agent:main:main", + acpxSessionId: "backend-session-1", + source: "status", + lastUpdatedAt: Date.now(), + }, mode: "persistent", runtimeOptions: { runtimeMode: "auto", @@ -2442,6 +2457,11 @@ describe("gateway server sessions", () => { backend?: string; agent?: string; runtimeSessionName?: string; + identity?: { + state?: string; + acpxRecordId?: string; + acpxSessionId?: string; + }; mode?: string; runtimeOptions?: { runtimeMode?: string; @@ -2459,6 +2479,10 @@ describe("gateway server sessions", () => { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:reset", + identity: { + state: "pending", + acpxRecordId: "agent:main:main", + }, mode: "persistent", runtimeOptions: { runtimeMode: "auto", @@ -2467,13 +2491,18 @@ describe("gateway server sessions", () => { cwd: "/tmp/acp-session", state: "idle", }); + expect(reset.payload?.entry.acp?.identity?.acpxSessionId).toBeUndefined(); expect(acpManagerMocks.closeSession).toHaveBeenCalledWith({ allowBackendUnavailable: true, cfg: expect.any(Object), + discardPersistentState: true, requireAcpSession: false, reason: "session-reset", sessionKey: "agent:main:main", }); + expect(prepareFreshSession).toHaveBeenCalledWith({ + sessionKey: "agent:main:main", + }); const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< string, { @@ -2481,6 +2510,11 @@ describe("gateway server sessions", () => { backend?: string; agent?: string; runtimeSessionName?: string; + identity?: { + state?: string; + acpxRecordId?: string; + acpxSessionId?: string; + }; mode?: string; runtimeOptions?: { runtimeMode?: string; @@ -2495,6 +2529,10 @@ describe("gateway server sessions", () => { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:reset", + identity: { + state: "pending", + acpxRecordId: "agent:main:main", + }, mode: "persistent", runtimeOptions: { runtimeMode: "auto", @@ -2503,6 +2541,7 @@ describe("gateway server sessions", () => { cwd: "/tmp/acp-session", state: "idle", }); + expect(store["agent:main:main"]?.acp?.identity?.acpxSessionId).toBeUndefined(); ws.close(); }); diff --git a/src/gateway/session-reset-service.ts b/src/gateway/session-reset-service.ts index bb9cb83e33b..8d5036402a5 100644 --- a/src/gateway/session-reset-service.ts +++ b/src/gateway/session-reset-service.ts @@ -3,6 +3,8 @@ import fs from "node:fs"; import path from "node:path"; import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; import { getAcpSessionManager } from "../acp/control-plane/manager.js"; +import { getAcpRuntimeBackend } from "../acp/runtime/registry.js"; +import { readAcpSessionEntry, upsertAcpSessionMeta } from "../acp/runtime/session-meta.js"; import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { clearBootstrapSnapshot } from "../agents/bootstrap-cache.js"; import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../agents/pi-embedded.js"; @@ -19,6 +21,7 @@ import { updateSessionStore, } from "../config/sessions.js"; import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js"; +import type { SessionAcpMeta } from "../config/sessions/types.js"; import { logVerbose } from "../globals.js"; import { createInternalHookEvent, triggerInternalHook } from "../hooks/internal-hooks.js"; import { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; @@ -284,6 +287,7 @@ async function closeAcpRuntimeForSession(params: { cfg: params.cfg, sessionKey: params.sessionKey, reason: params.reason, + discardPersistentState: true, requireAcpSession: false, allowBackendUnavailable: true, }); @@ -300,9 +304,84 @@ async function closeAcpRuntimeForSession(params: { `sessions.${params.reason}: ACP runtime close failed for ${params.sessionKey}: ${String(closeOutcome.error)}`, ); } + await ensureFreshAcpResetState({ + cfg: params.cfg, + sessionKey: params.sessionKey, + reason: params.reason, + entry: params.entry, + }); return undefined; } +function buildPendingAcpMeta(base: SessionAcpMeta, now: number): SessionAcpMeta { + const currentIdentity = base.identity; + const nextIdentity = currentIdentity + ? { + state: "pending" as const, + ...(currentIdentity.acpxRecordId ? { acpxRecordId: currentIdentity.acpxRecordId } : {}), + source: currentIdentity.source, + lastUpdatedAt: now, + } + : undefined; + return { + backend: base.backend, + agent: base.agent, + runtimeSessionName: base.runtimeSessionName, + ...(nextIdentity ? { identity: nextIdentity } : {}), + mode: base.mode, + ...(base.runtimeOptions ? { runtimeOptions: base.runtimeOptions } : {}), + ...(base.cwd ? { cwd: base.cwd } : {}), + state: "idle", + lastActivityAt: now, + }; +} + +async function ensureFreshAcpResetState(params: { + cfg: ReturnType; + sessionKey: string; + reason: "session-reset" | "session-delete"; + entry?: SessionEntry; +}): Promise { + if (params.reason !== "session-reset" || !params.entry?.acp) { + return; + } + const latestMeta = readAcpSessionEntry({ + cfg: params.cfg, + sessionKey: params.sessionKey, + })?.acp; + if ( + !latestMeta?.identity || + latestMeta.identity.state !== "resolved" || + (!latestMeta.identity.acpxSessionId && !latestMeta.identity.agentSessionId) + ) { + return; + } + + const backendId = (latestMeta.backend || params.cfg.acp?.backend || "").trim() || undefined; + try { + await getAcpRuntimeBackend(backendId)?.runtime.prepareFreshSession?.({ + sessionKey: params.sessionKey, + }); + } catch (error) { + logVerbose( + `sessions.${params.reason}: ACP prepareFreshSession failed for ${params.sessionKey}: ${String(error)}`, + ); + } + + const now = Date.now(); + await upsertAcpSessionMeta({ + cfg: params.cfg, + sessionKey: params.sessionKey, + mutate: (current, entry) => { + const base = current ?? entry?.acp; + if (!base) { + return null; + } + return buildPendingAcpMeta(base, now); + }, + }); +} + export async function cleanupSessionBeforeMutation(params: { cfg: ReturnType; key: string; diff --git a/src/infra/path-env.test.ts b/src/infra/path-env.test.ts index 56665fd9e0a..c568665f6c6 100644 --- a/src/infra/path-env.test.ts +++ b/src/infra/path-env.test.ts @@ -131,6 +131,26 @@ describe("ensureOpenClawCliOnPath", () => { expect(updated[0]).toBe(appBinDir); }); + it("keeps the current runtime directory ahead of system PATH hardening", () => { + const tmp = abs("/tmp/openclaw-path/case-runtime-dir"); + const nodeBinDir = path.join(tmp, "node-bin"); + const nodeExec = path.join(nodeBinDir, "node"); + setDir(tmp); + setDir(nodeBinDir); + setExe(nodeExec); + + resetBootstrapEnv("/usr/bin:/bin"); + + const updated = bootstrapPath({ + execPath: nodeExec, + cwd: tmp, + homeDir: tmp, + platform: "linux", + }); + expect(updated[0]).toBe(nodeBinDir); + expect(updated.indexOf(nodeBinDir)).toBeLessThan(updated.indexOf("/usr/bin")); + }); + it("is idempotent", () => { process.env.PATH = "/bin"; process.env.OPENCLAW_PATH_BOOTSTRAPPED = "1"; diff --git a/src/infra/path-env.ts b/src/infra/path-env.ts index 3efe0084c09..2d1701097c4 100644 --- a/src/infra/path-env.ts +++ b/src/infra/path-env.ts @@ -58,6 +58,17 @@ function candidateBinDirs(opts: EnsureOpenClawPathOpts): { prepend: string[]; ap const prepend: string[] = []; const append: string[] = []; + // Keep the active runtime directory ahead of PATH hardening so shebang-based + // subprocesses keep using the same Node/Bun the current OpenClaw process is on. + try { + const execDir = path.dirname(execPath); + if (isExecutable(execPath)) { + prepend.push(execDir); + } + } catch { + // ignore + } + // Bundled macOS app: `openclaw` lives next to the executable (process.execPath). try { const execDir = path.dirname(execPath); diff --git a/test/vitest-scoped-config.test.ts b/test/vitest-scoped-config.test.ts index d6ce32ccb60..d212b7b30a2 100644 --- a/test/vitest-scoped-config.test.ts +++ b/test/vitest-scoped-config.test.ts @@ -8,6 +8,7 @@ import { createAutoReplyCoreVitestConfig } from "../vitest.auto-reply-core.confi import { createAutoReplyReplyVitestConfig } from "../vitest.auto-reply-reply.config.ts"; import { createAutoReplyTopLevelVitestConfig } from "../vitest.auto-reply-top-level.config.ts"; import { createAutoReplyVitestConfig } from "../vitest.auto-reply.config.ts"; +import bundledVitestConfig from "../vitest.bundled.config.ts"; import { createChannelsVitestConfig } from "../vitest.channels.config.ts"; import { createCliVitestConfig } from "../vitest.cli.config.ts"; import { createCommandsLightVitestConfig } from "../vitest.commands-light.config.ts"; @@ -49,6 +50,7 @@ import { createTasksVitestConfig } from "../vitest.tasks.config.ts"; import { createToolingVitestConfig } from "../vitest.tooling.config.ts"; import { createTuiVitestConfig } from "../vitest.tui.config.ts"; import { createUiVitestConfig } from "../vitest.ui.config.ts"; +import { bundledPluginDependentUnitTestFiles } from "../vitest.unit-paths.mjs"; import { createUtilsVitestConfig } from "../vitest.utils.config.ts"; import { createWizardVitestConfig } from "../vitest.wizard.config.ts"; import { BUNDLED_PLUGIN_TEST_GLOB, bundledPluginFile } from "./helpers/bundled-plugin-paths.js"; @@ -56,6 +58,17 @@ import { cleanupTempDirs, makeTempDir } from "./helpers/temp-dir.js"; const EXTENSIONS_CHANNEL_GLOB = ["extensions", "channel", "**"].join("/"); +function bundledExcludePatternCouldMatchFile(pattern: string, file: string): boolean { + if (pattern === file) { + return true; + } + if (pattern.endsWith("/**")) { + const prefix = pattern.slice(0, -3); + return file === prefix || file.startsWith(`${prefix}/`); + } + return false; +} + describe("resolveVitestIsolation", () => { it("defaults shared scoped configs to the non-isolated runner", () => { expect(resolveVitestIsolation({})).toBe(false); @@ -138,6 +151,15 @@ describe("createScopedVitestConfig", () => { "test/setup-openclaw-runtime.ts", ]); }); + + it("keeps bundled unit test includes out of the bundled exclude list", () => { + const excludePatterns = bundledVitestConfig.test?.exclude ?? []; + for (const file of bundledPluginDependentUnitTestFiles) { + expect( + excludePatterns.some((pattern) => bundledExcludePatternCouldMatchFile(pattern, file)), + ).toBe(false); + } + }); }); describe("scoped vitest configs", () => { diff --git a/vitest.bundled.config.ts b/vitest.bundled.config.ts index 0f668eec73e..cc6b91e363a 100644 --- a/vitest.bundled.config.ts +++ b/vitest.bundled.config.ts @@ -5,8 +5,28 @@ import { } from "./vitest.unit-paths.mjs"; import { createUnitVitestConfigWithOptions } from "./vitest.unit.config.ts"; +function normalizeGlobCandidate(value: string): string { + return value.split(path.sep).join("/"); +} + +function excludePatternCouldMatchFile(pattern: string, file: string): boolean { + const normalizedPattern = normalizeGlobCandidate(pattern); + const normalizedFile = normalizeGlobCandidate(file); + if (normalizedPattern === normalizedFile) { + return true; + } + if (normalizedPattern.endsWith("/**")) { + const prefix = normalizedPattern.slice(0, -3); + return normalizedFile === prefix || normalizedFile.startsWith(`${prefix}/`); + } + return path.matchesGlob(normalizedFile, normalizedPattern); +} + const bundledUnitExcludePatterns = unitTestAdditionalExcludePatterns.filter( - (pattern) => !bundledPluginDependentUnitTestFiles.some((file) => path.matchesGlob(file, pattern)), + (pattern) => + !bundledPluginDependentUnitTestFiles.some((file) => + excludePatternCouldMatchFile(pattern, file), + ), ); export default createUnitVitestConfigWithOptions(process.env, {