ACP: harden Discord recovery and reset flow (#62132)

* ACP: harden Discord recovery and reset flow

* CI: harden bundled vitest excludes

* ACP: fix Claude launch and reset recovery

* Discord: use follow-up replies after slash defer

* ACP: route bound resets through gateway service

* ACP: unify bound reset authority

* ACPX: update OpenClaw branch to 0.5.2

* ACP: fix rebuilt branch replay fallout

* ACP: fix CI regressions after ACPX 0.5.2 update

---------

Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
Bob
2026-04-07 12:23:50 +02:00
committed by GitHub
parent 4fa7931b1b
commit f6124f3e17
35 changed files with 1294 additions and 289 deletions

View File

@@ -4,7 +4,7 @@
"description": "OpenClaw ACP runtime backend",
"type": "module",
"dependencies": {
"acpx": "0.5.1"
"acpx": "0.5.2"
},
"devDependencies": {
"@openclaw/plugin-sdk": "workspace:*"

View File

@@ -45,7 +45,11 @@ declare module "acpx/runtime" {
setMode(input: { handle: AcpRuntimeHandle; mode: string }): Promise<void>;
setConfigOption(input: { handle: AcpRuntimeHandle; key: string; value: string }): Promise<void>;
cancel(input: { handle: AcpRuntimeHandle; reason?: string }): Promise<void>;
close(input: { handle: AcpRuntimeHandle; reason?: string }): Promise<void>;
close(input: {
handle: AcpRuntimeHandle;
reason?: string;
discardPersistentState?: boolean;
}): Promise<void>;
}
export function createAcpRuntime(...args: unknown[]): AcpxRuntime;

View File

@@ -1,74 +1,37 @@
import type { AcpRuntimeHandle, AcpRuntimeOptions, AcpSessionStore } from "acpx/runtime";
import type { AcpSessionStore } from "acpx/runtime";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { AcpRuntime } from "../runtime-api.js";
import { AcpxRuntime } from "./runtime.js";
const mocks = vi.hoisted(() => {
const state = {
capturedStore: undefined as AcpSessionStore | undefined,
};
class MockAcpxRuntime {
constructor(options: AcpRuntimeOptions) {
state.capturedStore = options.sessionStore;
}
isHealthy() {
return true;
}
async probeAvailability() {}
async doctor() {
return { ok: true, message: "ok" };
}
async ensureSession() {
return {
sessionKey: "agent:codex:acp:binding:test",
backend: "acpx",
runtimeSessionName: "agent:codex:acp:binding:test",
} satisfies AcpRuntimeHandle;
}
async *runTurn() {}
getCapabilities() {
return { controls: [] };
}
async getStatus() {
return {};
}
async setMode() {}
async setConfigOption() {}
async cancel() {}
async close() {}
}
function makeRuntime(baseStore: AcpSessionStore): {
runtime: AcpxRuntime;
wrappedStore: AcpSessionStore & { markFresh: (sessionKey: string) => void };
delegate: { close: AcpRuntime["close"] };
} {
const runtime = new AcpxRuntime({
cwd: "/tmp",
sessionStore: baseStore,
agentRegistry: {
resolve: () => "codex",
list: () => ["codex"],
},
permissionMode: "approve-reads",
});
return {
state,
MockAcpxRuntime,
runtime,
wrappedStore: (
runtime as unknown as {
sessionStore: AcpSessionStore & { markFresh: (sessionKey: string) => void };
}
).sessionStore,
delegate: (runtime as unknown as { delegate: { close: AcpRuntime["close"] } }).delegate,
};
});
vi.mock("acpx/runtime", () => ({
ACPX_BACKEND_ID: "acpx",
AcpxRuntime: mocks.MockAcpxRuntime,
createAcpRuntime: vi.fn(),
createAgentRegistry: vi.fn(),
createFileSessionStore: vi.fn(),
decodeAcpxRuntimeHandleState: vi.fn(),
encodeAcpxRuntimeHandleState: vi.fn(),
}));
import { AcpxRuntime } from "./runtime.js";
}
describe("AcpxRuntime fresh reset wrapper", () => {
beforeEach(() => {
mocks.state.capturedStore = undefined;
vi.restoreAllMocks();
});
it("keeps stale persistent loads hidden until a fresh record is saved", async () => {
@@ -77,20 +40,9 @@ describe("AcpxRuntime fresh reset wrapper", () => {
save: vi.fn(async () => {}),
};
const runtime = new AcpxRuntime({
cwd: "/tmp",
sessionStore: baseStore,
agentRegistry: {
resolve: () => "codex",
list: () => ["codex"],
},
permissionMode: "approve-reads",
});
const { runtime, wrappedStore } = makeRuntime(baseStore);
const wrappedStore = mocks.state.capturedStore;
expect(wrappedStore).toBeDefined();
expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toEqual({
expect(await wrappedStore.load("agent:codex:acp:binding:test")).toEqual({
acpxRecordId: "stale",
});
expect(baseStore.load).toHaveBeenCalledTimes(1);
@@ -99,17 +51,17 @@ describe("AcpxRuntime fresh reset wrapper", () => {
sessionKey: "agent:codex:acp:binding:test",
});
expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toBeUndefined();
expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined();
expect(baseStore.load).toHaveBeenCalledTimes(1);
expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toBeUndefined();
expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined();
expect(baseStore.load).toHaveBeenCalledTimes(1);
await wrappedStore?.save({
await wrappedStore.save({
acpxRecordId: "fresh-record",
name: "agent:codex:acp:binding:test",
} as never);
expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toEqual({
expect(await wrappedStore.load("agent:codex:acp:binding:test")).toEqual({
acpxRecordId: "stale",
});
expect(baseStore.load).toHaveBeenCalledTimes(2);
@@ -121,18 +73,8 @@ describe("AcpxRuntime fresh reset wrapper", () => {
save: vi.fn(async () => {}),
};
const runtime = new AcpxRuntime({
cwd: "/tmp",
sessionStore: baseStore,
agentRegistry: {
resolve: () => "codex",
list: () => ["codex"],
},
permissionMode: "approve-reads",
});
const wrappedStore = mocks.state.capturedStore;
expect(wrappedStore).toBeDefined();
const { runtime, wrappedStore, delegate } = makeRuntime(baseStore);
const close = vi.spyOn(delegate, "close").mockResolvedValue(undefined);
await runtime.close({
handle: {
@@ -144,7 +86,16 @@ describe("AcpxRuntime fresh reset wrapper", () => {
discardPersistentState: true,
});
expect(await wrappedStore?.load("agent:codex:acp:binding:test")).toBeUndefined();
expect(close).toHaveBeenCalledWith({
handle: {
sessionKey: "agent:codex:acp:binding:test",
backend: "acpx",
runtimeSessionName: "agent:codex:acp:binding:test",
},
reason: "new-in-place-reset",
discardPersistentState: true,
});
expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined();
expect(baseStore.load).not.toHaveBeenCalled();
});
});

View File

@@ -131,6 +131,7 @@ export class AcpxRuntime implements AcpxRuntimeLike {
.close({
handle: input.handle,
reason: input.reason,
discardPersistentState: input.discardPersistentState,
})
.then(() => {
if (input.discardPersistentState) {

View File

@@ -30,6 +30,7 @@ import { createNonExitingRuntime, logVerbose } from "openclaw/plugin-sdk/runtime
import { resolveOpenProviderRuntimeGroupPolicy } from "openclaw/plugin-sdk/runtime-group-policy";
import { logDebug, logError } from "openclaw/plugin-sdk/text-runtime";
import { resolveDiscordMaxLinesPerMessage } from "../accounts.js";
import { createDiscordRestClient } from "../client.js";
import {
parseDiscordComponentCustomIdForCarbon,
parseDiscordModalCustomIdForCarbon,
@@ -512,6 +513,11 @@ async function dispatchDiscordComponentEvent(params: {
fallbackLimit: 2000,
});
const token = ctx.token ?? "";
const feedbackRest = createDiscordRestClient({
cfg: ctx.cfg,
token,
accountId,
}).rest;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(ctx.cfg, agentId);
const replyToMode =
ctx.discordConfig?.replyToMode ?? ctx.cfg.channels?.discord?.replyToMode ?? "off";
@@ -554,7 +560,7 @@ async function dispatchDiscordComponentEvent(params: {
onReplyStart: async () => {
try {
const { sendTyping } = await loadTypingRuntime();
await sendTyping({ client: interaction.client, channelId: typingChannelId });
await sendTyping({ rest: feedbackRest, channelId: typingChannelId });
} catch (err) {
logVerbose(`discord: typing failed for component reply: ${String(err)}`);
}

View File

@@ -158,6 +158,9 @@ vi.spyOn(configRuntimeModule, "resolveStorePath").mockImplementation(
) => configSessionsMocks.resolveStorePath(path, opts) as never) as never,
);
const clientModule = await import("../client.js");
const createDiscordRestClientSpy = vi.spyOn(clientModule, "createDiscordRestClient");
const BASE_CHANNEL_ROUTE = {
agentId: "main",
channel: "discord",
@@ -214,6 +217,7 @@ beforeEach(() => {
recordInboundSession.mockClear();
readSessionUpdatedAt.mockClear();
resolveStorePath.mockClear();
createDiscordRestClientSpy.mockClear();
dispatchInboundMessage.mockResolvedValue(createNoQueuedDispatchResult());
recordInboundSession.mockResolvedValue(undefined);
readSessionUpdatedAt.mockReturnValue(undefined);
@@ -278,7 +282,7 @@ function expectAckReactionRuntimeOptions(params?: {
messages.removeAckAfterReply = params.removeAckAfterReply;
}
return expect.objectContaining({
rest: {},
rest: expect.anything(),
...(Object.keys(messages).length > 0
? { cfg: expect.objectContaining({ messages: expect.objectContaining(messages) }) }
: {}),
@@ -337,7 +341,7 @@ function expectSinglePreviewEdit() {
"c1",
"preview-1",
{ content: "Hello\nWorld" },
{ rest: {} },
expect.objectContaining({ rest: expect.anything() }),
);
expect(deliverDiscordReply).not.toHaveBeenCalled();
}
@@ -397,6 +401,39 @@ describe("processDiscordMessage ack reactions", () => {
});
});
it("uses separate REST clients for feedback and reply delivery", async () => {
const feedbackRest = { post: vi.fn(async () => undefined) };
const deliveryRest = { post: vi.fn(async () => undefined) };
createDiscordRestClientSpy
.mockReturnValueOnce({
token: "feedback-token",
rest: feedbackRest as never,
account: { config: {} } as never,
})
.mockReturnValueOnce({
token: "delivery-token",
rest: deliveryRest as never,
account: { config: {} } as never,
});
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendFinalReply({ text: "hello" });
return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } };
});
const ctx = await createBaseContext();
await runProcessDiscordMessage(ctx);
expect(sendMocks.reactMessageDiscord).toHaveBeenCalled();
expect(sendMocks.reactMessageDiscord.mock.calls[0]?.[3]).toEqual(
expect.objectContaining({ rest: feedbackRest }),
);
expect(deliverDiscordReply).toHaveBeenCalledWith(
expect.objectContaining({ rest: deliveryRest }),
);
expect(feedbackRest).not.toBe(deliveryRest);
});
it("debounces intermediate phase reactions and jumps to done for short runs", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.replyOptions?.onReasoningStream?.();
@@ -733,7 +770,7 @@ describe("processDiscordMessage draft streaming", () => {
"c1",
"preview-1",
{ content: longReply },
{ rest: {} },
expect.objectContaining({ rest: expect.anything() }),
);
expect(deliverDiscordReply).not.toHaveBeenCalled();
});

View File

@@ -45,6 +45,7 @@ import {
} from "openclaw/plugin-sdk/text-runtime";
import { resolveDiscordMaxLinesPerMessage } from "../accounts.js";
import { chunkDiscordTextWithMode } from "../chunk.js";
import { createDiscordRestClient } from "../client.js";
import { resolveDiscordDraftStreamingChunking } from "../draft-chunking.js";
import { createDiscordDraftStream } from "../draft-stream.js";
import { resolveDiscordPreviewStreamMode } from "../preview-streaming.js";
@@ -209,9 +210,19 @@ export async function processDiscordMessage(
const shouldSendAckReaction = shouldAckReaction();
const statusReactionsEnabled =
shouldSendAckReaction && cfg.messages?.statusReactions?.enabled !== false;
const feedbackRest = createDiscordRestClient({
cfg,
token,
accountId,
}).rest as unknown as RequestClient;
const deliveryRest = createDiscordRestClient({
cfg,
token,
accountId,
}).rest as unknown as RequestClient;
// Discord outbound helpers expect Carbon's request client shape explicitly.
const ackReactionContext = createDiscordAckReactionContext({
rest: client.rest as unknown as RequestClient,
rest: feedbackRest,
cfg,
accountId,
});
@@ -522,7 +533,7 @@ export async function processDiscordMessage(
channel: "discord",
accountId: route.accountId,
typing: {
start: () => sendTyping({ client, channelId: typingChannelId }),
start: () => sendTyping({ rest: feedbackRest, channelId: typingChannelId }),
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
@@ -560,7 +571,7 @@ export async function processDiscordMessage(
: messageChannelId;
const draftStream = canStreamDraft
? createDiscordDraftStream({
rest: client.rest,
rest: deliveryRest,
channelId: deliverChannelId,
maxChars: draftMaxChars,
replyToMessageId: draftReplyToMessageId,
@@ -746,7 +757,7 @@ export async function processDiscordMessage(
deliverChannelId,
previewMessageId,
{ content: previewFinalText },
{ rest: client.rest },
{ rest: deliveryRest },
);
finalizedViaPreviewMessage = true;
replyReference.markSent();
@@ -779,7 +790,7 @@ export async function processDiscordMessage(
deliverChannelId,
messageIdAfterStop,
{ content: previewFinalText },
{ rest: client.rest },
{ rest: deliveryRest },
);
finalizedViaPreviewMessage = true;
replyReference.markSent();
@@ -812,7 +823,7 @@ export async function processDiscordMessage(
target: deliverTarget,
token,
accountId,
rest: client.rest,
rest: deliveryRest,
runtime,
replyToId,
replyToMode,

View File

@@ -95,18 +95,19 @@ async function runGuildSlashCommand(params?: {
}
function expectNotUnauthorizedReply(interaction: MockCommandInteraction) {
expect(interaction.reply).not.toHaveBeenCalledWith(
expect(interaction.followUp).not.toHaveBeenCalledWith(
expect.objectContaining({ content: "You are not authorized to use this command." }),
);
}
function expectUnauthorizedReply(interaction: MockCommandInteraction) {
expect(interaction.reply).toHaveBeenCalledWith(
expect(interaction.followUp).toHaveBeenCalledWith(
expect.objectContaining({
content: "You are not authorized to use this command.",
ephemeral: true,
}),
);
expect(interaction.reply).not.toHaveBeenCalled();
}
describe("Discord native slash commands with commands.allowFrom", () => {
@@ -279,8 +280,10 @@ describe("Discord native slash commands with commands.allowFrom", () => {
| undefined;
await dispatchCall?.dispatcherOptions.deliver({ text: longReply }, { kind: "final" });
expect(interaction.reply).toHaveBeenCalledWith(expect.objectContaining({ content: longReply }));
expect(interaction.followUp).not.toHaveBeenCalled();
expect(interaction.followUp).toHaveBeenCalledWith(
expect.objectContaining({ content: longReply }),
);
expect(interaction.reply).not.toHaveBeenCalled();
});
it("swallows expired slash interactions before dispatch when defer returns Unknown interaction", async () => {

View File

@@ -282,9 +282,10 @@ async function expectPairCommandReply(params: {
);
expect(dispatchSpy).not.toHaveBeenCalled();
expect(params.interaction.reply).toHaveBeenCalledWith(
expect(params.interaction.followUp).toHaveBeenCalledWith(
expect.objectContaining({ content: "paired:now" }),
);
expect(params.interaction.reply).not.toHaveBeenCalled();
}
async function createStatusCommand(cfg: OpenClawConfig) {
@@ -465,12 +466,13 @@ describe("Discord native plugin command dispatch", () => {
expect(executeSpy).not.toHaveBeenCalled();
expect(dispatchSpy).not.toHaveBeenCalled();
expect(interaction.reply).toHaveBeenCalledWith(
expect(interaction.followUp).toHaveBeenCalledWith(
expect.objectContaining({
content: "You are not authorized to use this command.",
ephemeral: true,
}),
);
expect(interaction.reply).not.toHaveBeenCalled();
});
it("rejects group DM slash commands outside dm.groupChannels before dispatch", async () => {
@@ -501,11 +503,12 @@ describe("Discord native plugin command dispatch", () => {
await (command as { run: (interaction: unknown) => Promise<void> }).run(interaction as unknown);
expect(dispatchSpy).not.toHaveBeenCalled();
expect(interaction.reply).toHaveBeenCalledWith(
expect(interaction.followUp).toHaveBeenCalledWith(
expect.objectContaining({
content: "This group DM is not allowed.",
}),
);
expect(interaction.reply).not.toHaveBeenCalled();
});
it("executes matched plugin commands directly without invoking the agent dispatcher", async () => {
@@ -540,9 +543,10 @@ describe("Discord native plugin command dispatch", () => {
expect(executeSpy).toHaveBeenCalledTimes(1);
expect(dispatchSpy).not.toHaveBeenCalled();
expect(interaction.reply).toHaveBeenCalledWith(
expect(interaction.followUp).toHaveBeenCalledWith(
expect.objectContaining({ content: "direct plugin output" }),
);
expect(interaction.reply).not.toHaveBeenCalled();
});
it("forwards Discord thread metadata into direct plugin command execution", async () => {

View File

@@ -715,7 +715,9 @@ export function createDiscordNativeCommand(params: {
discordConfig,
accountId,
sessionPrefix,
preferFollowUp: false,
// Slash commands are deferred up front, so all later responses must use
// follow-up/edit semantics instead of the initial reply endpoint.
preferFollowUp: true,
threadBindings,
});
}

View File

@@ -0,0 +1,42 @@
import { Routes } from "discord-api-types/v10";
import { describe, expect, it, vi } from "vitest";
import { sendTyping } from "./typing.js";
describe("sendTyping", () => {
it("uses the direct Discord typing REST endpoint", async () => {
const rest = {
post: vi.fn(async () => {}),
};
await sendTyping({
// @ts-expect-error test stub only needs rest.post
rest,
channelId: "12345",
});
expect(rest.post).toHaveBeenCalledTimes(1);
expect(rest.post).toHaveBeenCalledWith(Routes.channelTyping("12345"));
});
it("times out when the typing endpoint hangs", async () => {
vi.useFakeTimers();
try {
const rest = {
post: vi.fn(() => new Promise(() => {})),
};
const promise = sendTyping({
// @ts-expect-error test stub only needs rest.post
rest,
channelId: "12345",
});
const rejection = expect(promise).rejects.toThrow("discord typing start timed out");
await vi.advanceTimersByTimeAsync(5_000);
await rejection;
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -1,11 +1,23 @@
import type { Client } from "@buape/carbon";
import type { RequestClient } from "@buape/carbon";
import { Routes } from "discord-api-types/v10";
export async function sendTyping(params: { client: Client; channelId: string }) {
const channel = await params.client.fetchChannel(params.channelId);
if (!channel) {
return;
}
if ("triggerTyping" in channel && typeof channel.triggerTyping === "function") {
await channel.triggerTyping();
const DISCORD_TYPING_START_TIMEOUT_MS = 5_000;
export async function sendTyping(params: { rest: RequestClient; channelId: string }) {
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => {
reject(
new Error(`discord typing start timed out after ${DISCORD_TYPING_START_TIMEOUT_MS}ms`),
);
}, DISCORD_TYPING_START_TIMEOUT_MS);
timer.unref?.();
});
try {
await Promise.race([params.rest.post(Routes.channelTyping(params.channelId)), timeoutPromise]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
}

10
pnpm-lock.yaml generated
View File

@@ -267,8 +267,8 @@ importers:
extensions/acpx:
dependencies:
acpx:
specifier: 0.5.1
version: 0.5.1
specifier: 0.5.2
version: 0.5.2
devDependencies:
'@openclaw/plugin-sdk':
specifier: workspace:*
@@ -4081,8 +4081,8 @@ packages:
engines: {node: '>=0.4.0'}
hasBin: true
acpx@0.5.1:
resolution: {integrity: sha512-r2sWGsztSwsO8JGJAswltQkMnRkKNmTH9faxwRWS9Ad28y2jZcyt7jR7auyCk0zwvDr+Zm/H1byVkWFpJWqzQQ==}
acpx@0.5.2:
resolution: {integrity: sha512-c+jibFqgK2WzUt+hO8e007W6655ROpTfvxAUkC6a/jRK1oaPmsxXmgySCq1/x1uBIbb61qhOvfCS6lW7gZeRgg==}
engines: {node: '>=22.12.0'}
hasBin: true
@@ -10637,7 +10637,7 @@ snapshots:
acorn@8.16.0: {}
acpx@0.5.1:
acpx@0.5.2:
dependencies:
'@agentclientprotocol/sdk': 0.17.1(zod@4.3.6)
commander: 14.0.3

View File

@@ -18,6 +18,7 @@ import {
} from "../runtime/errors.js";
import {
createIdentityFromEnsure,
identityHasStableSessionId,
identityEquals,
isSessionIdentityPending,
mergeSessionIdentity,
@@ -246,7 +247,10 @@ export class AcpSessionManager {
continue;
}
const currentIdentity = resolveSessionIdentityFromMeta(session.acp);
if (!isSessionIdentityPending(currentIdentity)) {
if (
!isSessionIdentityPending(currentIdentity) ||
!identityHasStableSessionId(currentIdentity)
) {
continue;
}
@@ -1247,65 +1251,96 @@ export class AcpSessionManager {
};
}
const meta = requireReadySessionMeta(resolution);
const currentIdentity = resolveSessionIdentityFromMeta(meta);
const shouldSkipRuntimeClose =
input.discardPersistentState &&
currentIdentity != null &&
!identityHasStableSessionId(currentIdentity);
let runtimeClosed = false;
let runtimeNotice: string | undefined;
try {
const { runtime: ensuredRuntime, handle } = await this.ensureRuntimeHandle({
cfg: input.cfg,
sessionKey,
meta,
});
await withAcpRuntimeErrorBoundary({
run: async () =>
await ensuredRuntime.close({
handle,
reason: input.reason,
discardPersistentState: input.discardPersistentState,
}),
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP close failed before completion.",
});
runtimeClosed = true;
this.clearCachedRuntimeState(sessionKey);
} catch (error) {
const acpError = toAcpRuntimeError({
error,
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP close failed before completion.",
});
if (
input.allowBackendUnavailable &&
(acpError.code === "ACP_BACKEND_MISSING" ||
acpError.code === "ACP_BACKEND_UNAVAILABLE" ||
(input.discardPersistentState && acpError.code === "ACP_SESSION_INIT_FAILED") ||
this.isRecoverableAcpxExitError(acpError.message))
) {
if (input.discardPersistentState) {
const configuredBackend = (meta.backend || input.cfg.acp?.backend || "").trim();
try {
const runtimeBackend = this.deps.requireRuntimeBackend(
configuredBackend || undefined,
);
await runtimeBackend.runtime.prepareFreshSession?.({
if (shouldSkipRuntimeClose) {
if (input.discardPersistentState) {
const configuredBackend = (meta.backend || input.cfg.acp?.backend || "").trim();
try {
await this.deps
.getRuntimeBackend(configuredBackend || undefined)
?.runtime.prepareFreshSession?.({
sessionKey,
});
} catch (recoveryError) {
logVerbose(
`acp close recovery: unable to prepare fresh session for ${sessionKey}: ${formatErrorMessage(recoveryError)}`,
);
}
} catch (error) {
logVerbose(
`acp close fast-reset: unable to prepare fresh session for ${sessionKey}: ${error instanceof Error ? error.message : String(error)}`,
);
}
// Treat unavailable backends as terminal for this cached handle so it
// cannot continue counting against maxConcurrentSessions.
}
this.clearCachedRuntimeState(sessionKey);
} else {
try {
const { runtime: ensuredRuntime, handle } = await this.ensureRuntimeHandle({
cfg: input.cfg,
sessionKey,
meta,
});
await withAcpRuntimeErrorBoundary({
run: async () =>
await ensuredRuntime.close({
handle,
reason: input.reason,
discardPersistentState: input.discardPersistentState,
}),
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP close failed before completion.",
});
runtimeClosed = true;
this.clearCachedRuntimeState(sessionKey);
runtimeNotice = acpError.message;
} else {
throw acpError;
} catch (error) {
const acpError = toAcpRuntimeError({
error,
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP close failed before completion.",
});
if (
input.allowBackendUnavailable &&
(acpError.code === "ACP_BACKEND_MISSING" ||
acpError.code === "ACP_BACKEND_UNAVAILABLE" ||
(input.discardPersistentState && acpError.code === "ACP_SESSION_INIT_FAILED") ||
this.isRecoverableAcpxExitError(acpError.message))
) {
if (input.discardPersistentState) {
const configuredBackend = (meta.backend || input.cfg.acp?.backend || "").trim();
try {
const runtimeBackend = this.deps.getRuntimeBackend(configuredBackend || undefined);
if (!runtimeBackend) {
throw acpError;
}
await runtimeBackend.runtime.prepareFreshSession?.({
sessionKey,
});
} catch (recoveryError) {
logVerbose(
`acp close recovery: unable to prepare fresh session for ${sessionKey}: ${recoveryError instanceof Error ? recoveryError.message : String(recoveryError)}`,
);
}
}
// Treat unavailable backends as terminal for this cached handle so it
// cannot continue counting against maxConcurrentSessions.
this.clearCachedRuntimeState(sessionKey);
runtimeNotice = acpError.message;
} else {
throw acpError;
}
}
}
let metaCleared = false;
if (input.discardPersistentState && !input.clearMeta) {
await this.discardPersistedRuntimeState({
cfg: input.cfg,
sessionKey,
});
}
if (input.clearMeta) {
await this.writeSessionMeta({
cfg: input.cfg,
@@ -1346,11 +1381,16 @@ export class AcpSessionManager {
const agentMatches = cached.agent === agent;
const modeMatches = cached.mode === mode;
const cwdMatches = (cached.cwd ?? "") === (cwd ?? "");
const handleMatchesMeta = this.runtimeHandleMatchesMeta({
handle: cached.handle,
meta: params.meta,
});
if (
backendMatches &&
agentMatches &&
modeMatches &&
cwdMatches &&
handleMatchesMeta &&
(await this.isCachedRuntimeHandleReusable({
sessionKey: params.sessionKey,
runtime: cached.runtime,
@@ -1378,6 +1418,10 @@ export class AcpSessionManager {
let identityForEnsure = previousIdentity;
const persistedResumeSessionId =
mode === "persistent" ? resolveRuntimeResumeSessionId(previousIdentity) : undefined;
const shouldPrepareFreshPersistentSession =
mode === "persistent" &&
previousIdentity != null &&
!identityHasStableSessionId(previousIdentity);
const ensureSession = async (resumeSessionId?: string) =>
await withAcpRuntimeErrorBoundary({
run: async () =>
@@ -1392,6 +1436,11 @@ export class AcpSessionManager {
fallbackMessage: "Could not initialize ACP session runtime.",
});
let ensured: AcpRuntimeHandle;
if (shouldPrepareFreshPersistentSession) {
await runtime.prepareFreshSession?.({
sessionKey: params.sessionKey,
});
}
if (persistedResumeSessionId) {
try {
ensured = await ensureSession(persistedResumeSessionId);
@@ -1739,6 +1788,49 @@ export class AcpSessionManager {
return true;
}
private async discardPersistedRuntimeState(params: {
cfg: OpenClawConfig;
sessionKey: string;
}): Promise<void> {
const now = Date.now();
await this.writeSessionMeta({
cfg: params.cfg,
sessionKey: params.sessionKey,
mutate: (current, entry) => {
if (!entry) {
return null;
}
const base = current ?? entry.acp;
if (!base) {
return null;
}
const currentIdentity = resolveSessionIdentityFromMeta(base);
const nextIdentity = currentIdentity
? {
state: "pending" as const,
...(currentIdentity.acpxRecordId
? { acpxRecordId: currentIdentity.acpxRecordId }
: {}),
source: currentIdentity.source,
lastUpdatedAt: now,
}
: undefined;
return {
backend: base.backend,
agent: base.agent,
runtimeSessionName: base.runtimeSessionName,
...(nextIdentity ? { identity: nextIdentity } : {}),
mode: base.mode,
...(base.runtimeOptions ? { runtimeOptions: base.runtimeOptions } : {}),
...(base.cwd ? { cwd: base.cwd } : {}),
state: "idle",
lastActivityAt: now,
};
},
failOnError: true,
});
}
private async evictIdleRuntimeHandles(params: { cfg: OpenClawConfig }): Promise<void> {
const idleTtlMs = resolveRuntimeIdleTtlMs(params.cfg);
if (idleTtlMs <= 0 || this.runtimeCache.size() === 0) {
@@ -1994,6 +2086,25 @@ export class AcpSessionManager {
);
}
private runtimeHandleMatchesMeta(params: {
handle: AcpRuntimeHandle;
meta: SessionAcpMeta;
}): boolean {
const identity = resolveSessionIdentityFromMeta(params.meta);
const expectedHandleIds = resolveRuntimeHandleIdentifiersFromIdentity(identity);
if ((params.handle.backendSessionId ?? "") !== (expectedHandleIds.backendSessionId ?? "")) {
return false;
}
if ((params.handle.agentSessionId ?? "") !== (expectedHandleIds.agentSessionId ?? "")) {
return false;
}
const expectedAcpxRecordId = identity?.acpxRecordId ?? "";
const actualAcpxRecordId =
normalizeText((params.handle as { acpxRecordId?: unknown }).acpxRecordId) ?? "";
return actualAcpxRecordId === expectedAcpxRecordId;
}
private resolveBackgroundTaskContext(params: {
cfg: OpenClawConfig;
sessionKey: string;

View File

@@ -12,11 +12,13 @@ const hoisted = vi.hoisted(() => {
const listAcpSessionEntriesMock = vi.fn();
const readAcpSessionEntryMock = vi.fn();
const upsertAcpSessionMetaMock = vi.fn();
const getAcpRuntimeBackendMock = vi.fn();
const requireAcpRuntimeBackendMock = vi.fn();
return {
listAcpSessionEntriesMock,
readAcpSessionEntryMock,
upsertAcpSessionMetaMock,
getAcpRuntimeBackendMock,
requireAcpRuntimeBackendMock,
};
});
@@ -32,6 +34,7 @@ vi.mock("../runtime/registry.js", async () => {
await vi.importActual<typeof import("../runtime/registry.js")>("../runtime/registry.js");
return {
...actual,
getAcpRuntimeBackend: (backendId?: string) => hoisted.getAcpRuntimeBackendMock(backendId),
requireAcpRuntimeBackend: (backendId?: string) =>
hoisted.requireAcpRuntimeBackendMock(backendId),
};
@@ -217,6 +220,13 @@ describe("AcpSessionManager", () => {
hoisted.readAcpSessionEntryMock.mockReset();
hoisted.upsertAcpSessionMetaMock.mockReset().mockResolvedValue(null);
hoisted.requireAcpRuntimeBackendMock.mockReset();
hoisted.getAcpRuntimeBackendMock.mockReset().mockImplementation((backendId?: string) => {
try {
return hoisted.requireAcpRuntimeBackendMock(backendId);
} catch {
return null;
}
});
});
afterEach(() => {
@@ -789,6 +799,79 @@ describe("AcpSessionManager", () => {
expect(runtimeState.runTurn).toHaveBeenCalledTimes(2);
});
it("re-ensures cached runtime handles when persisted ACP session identity changes", async () => {
const runtimeState = createRuntime();
runtimeState.ensureSession
.mockResolvedValueOnce({
sessionKey: "agent:codex:acp:session-1",
backend: "acpx",
runtimeSessionName: "runtime-1",
acpxRecordId: "record-1",
backendSessionId: "acpx-session-1",
agentSessionId: "agent-session-1",
})
.mockResolvedValueOnce({
sessionKey: "agent:codex:acp:session-1",
backend: "acpx",
runtimeSessionName: "runtime-2",
acpxRecordId: "record-1",
backendSessionId: "acpx-session-2",
agentSessionId: "agent-session-2",
});
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
let currentMeta = readySessionMeta({
runtimeSessionName: "runtime-1",
identity: {
state: "resolved",
acpxRecordId: "record-1",
acpxSessionId: "acpx-session-1",
agentSessionId: "agent-session-1",
source: "status",
lastUpdatedAt: Date.now(),
},
});
hoisted.readAcpSessionEntryMock.mockImplementation(() => ({
sessionKey: "agent:codex:acp:session-1",
storeSessionKey: "agent:codex:acp:session-1",
acp: currentMeta,
}));
const manager = new AcpSessionManager();
await manager.runTurn({
cfg: baseCfg,
sessionKey: "agent:codex:acp:session-1",
text: "first",
mode: "prompt",
requestId: "r1",
});
currentMeta = readySessionMeta({
runtimeSessionName: "runtime-2",
identity: {
state: "resolved",
acpxRecordId: "record-1",
acpxSessionId: "acpx-session-2",
agentSessionId: "agent-session-2",
source: "status",
lastUpdatedAt: Date.now(),
},
});
await manager.runTurn({
cfg: baseCfg,
sessionKey: "agent:codex:acp:session-1",
text: "second",
mode: "prompt",
requestId: "r2",
});
expect(runtimeState.ensureSession).toHaveBeenCalledTimes(2);
expect(runtimeState.runTurn).toHaveBeenCalledTimes(2);
});
it("rehydrates runtime handles after a manager restart", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
@@ -1325,6 +1408,208 @@ describe("AcpSessionManager", () => {
});
});
it("clears persisted resume identity when close discards persistent state", async () => {
const runtimeState = createRuntime();
const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4";
const entry = {
sessionKey,
storeSessionKey: sessionKey,
acp: readySessionMeta({
agent: "claude",
state: "running",
lastError: "stale failure",
identity: {
state: "resolved",
acpxRecordId: sessionKey,
acpxSessionId: "acpx-session-1",
agentSessionId: "agent-session-1",
source: "status",
lastUpdatedAt: 1,
},
}),
};
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
hoisted.readAcpSessionEntryMock.mockImplementation(() => entry);
hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => {
const params = paramsUnknown as {
mutate: (
current: SessionAcpMeta | undefined,
entry: { acp?: SessionAcpMeta } | undefined,
) => SessionAcpMeta | null | undefined;
};
const next = params.mutate(entry.acp, entry);
if (next === null) {
return null;
}
if (next) {
entry.acp = next;
}
return entry;
});
const manager = new AcpSessionManager();
const result = await manager.closeSession({
cfg: baseCfg,
sessionKey,
reason: "new-in-place-reset",
discardPersistentState: true,
clearMeta: false,
allowBackendUnavailable: true,
});
expect(result.runtimeClosed).toBe(true);
expect(entry.acp?.state).toBe("idle");
expect(entry.acp?.lastError).toBeUndefined();
expect(entry.acp?.identity).toMatchObject({
state: "pending",
acpxRecordId: sessionKey,
source: "status",
});
expect(entry.acp?.identity).not.toHaveProperty("acpxSessionId");
expect(entry.acp?.identity).not.toHaveProperty("agentSessionId");
});
it("prepares a fresh persistent session before ensure when metadata has no stable session id", async () => {
const runtimeState = createRuntime();
const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4";
runtimeState.ensureSession.mockResolvedValue({
sessionKey,
backend: "acpx",
runtimeSessionName: "runtime-fresh",
acpxRecordId: sessionKey,
backendSessionId: "acpx-session-fresh",
});
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
let currentMeta: SessionAcpMeta = readySessionMeta({
agent: "claude",
identity: {
state: "pending",
acpxRecordId: sessionKey,
source: "status",
lastUpdatedAt: Date.now(),
},
});
hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => {
const key = (paramsUnknown as { sessionKey?: string }).sessionKey ?? sessionKey;
return {
sessionKey: key,
storeSessionKey: key,
acp: currentMeta,
};
});
hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => {
const params = paramsUnknown as {
mutate: (
current: SessionAcpMeta | undefined,
entry: { acp?: SessionAcpMeta } | undefined,
) => SessionAcpMeta | null | undefined;
};
const next = params.mutate(currentMeta, { acp: currentMeta });
if (next) {
currentMeta = next;
}
return {
sessionId: "session-1",
updatedAt: Date.now(),
acp: currentMeta,
};
});
const manager = new AcpSessionManager();
await expect(
manager.runTurn({
cfg: baseCfg,
sessionKey,
text: "who are you?",
mode: "prompt",
requestId: "r-fresh",
}),
).resolves.toBeUndefined();
expect(runtimeState.prepareFreshSession).toHaveBeenCalledWith({
sessionKey,
});
expect(runtimeState.ensureSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
}),
);
expect(runtimeState.prepareFreshSession.mock.invocationCallOrder[0]).toBeLessThan(
runtimeState.ensureSession.mock.invocationCallOrder[0],
);
});
it("skips runtime re-ensure when discarding a pending persistent session", async () => {
const runtimeState = createRuntime();
const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4";
hoisted.getAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
const entry = {
sessionKey,
storeSessionKey: sessionKey,
acp: readySessionMeta({
agent: "claude",
identity: {
state: "pending",
acpxRecordId: sessionKey,
source: "ensure",
lastUpdatedAt: Date.now(),
},
}),
};
hoisted.readAcpSessionEntryMock.mockImplementation(() => entry);
hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => {
const params = paramsUnknown as {
mutate: (
current: SessionAcpMeta | undefined,
entry: { acp?: SessionAcpMeta } | undefined,
) => SessionAcpMeta | null | undefined;
};
const next = params.mutate(entry.acp, entry);
if (next === null) {
return null;
}
if (next) {
entry.acp = next;
}
return entry;
});
const manager = new AcpSessionManager();
const result = await manager.closeSession({
cfg: baseCfg,
sessionKey,
reason: "new-in-place-reset",
discardPersistentState: true,
clearMeta: false,
allowBackendUnavailable: true,
});
expect(result.runtimeClosed).toBe(false);
expect(runtimeState.prepareFreshSession).toHaveBeenCalledWith({
sessionKey,
});
expect(runtimeState.ensureSession).not.toHaveBeenCalled();
expect(runtimeState.close).not.toHaveBeenCalled();
expect(entry.acp?.identity).toMatchObject({
state: "pending",
acpxRecordId: sessionKey,
source: "ensure",
});
expect(entry.acp?.identity).not.toHaveProperty("acpxSessionId");
expect(entry.acp?.identity).not.toHaveProperty("agentSessionId");
});
it("evicts idle cached runtimes before enforcing max concurrent limits", async () => {
vi.useFakeTimers();
try {
@@ -2063,6 +2348,57 @@ describe("AcpSessionManager", () => {
expect(currentMeta.identity?.agentSessionId).toBe("agent-session-1");
});
it("skips startup reconcile for pending identities without stable runtime ids", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4";
hoisted.listAcpSessionEntriesMock.mockResolvedValue([
{
cfg: baseCfg,
storePath: "/tmp/sessions-acp.json",
sessionKey,
storeSessionKey: sessionKey,
entry: {
sessionId: "session-1",
updatedAt: Date.now(),
acp: {
...readySessionMeta({
agent: "claude",
}),
identity: {
state: "pending",
acpxRecordId: sessionKey,
source: "status",
lastUpdatedAt: Date.now(),
},
},
},
acp: {
...readySessionMeta({
agent: "claude",
}),
identity: {
state: "pending",
acpxRecordId: sessionKey,
source: "status",
lastUpdatedAt: Date.now(),
},
},
},
]);
const manager = new AcpSessionManager();
const result = await manager.reconcilePendingSessionIdentities({ cfg: baseCfg });
expect(result).toEqual({ checked: 0, resolved: 0, failed: 0 });
expect(runtimeState.ensureSession).not.toHaveBeenCalled();
expect(runtimeState.getStatus).not.toHaveBeenCalled();
});
it("reconciles prompt-learned agent session IDs even when runtime status omits them", async () => {
const runtimeState = createRuntime();
runtimeState.ensureSession.mockResolvedValue({
@@ -2471,6 +2807,43 @@ describe("AcpSessionManager", () => {
expect(result.metaCleared).toBe(false);
});
it("prepares a fresh session during reset recovery even when the backend is unhealthy", async () => {
const runtimeState = createRuntime();
hoisted.readAcpSessionEntryMock.mockReturnValue({
sessionKey: "agent:claude:acp:session-1",
storeSessionKey: "agent:claude:acp:session-1",
acp: readySessionMeta({
agent: "claude",
}),
});
hoisted.requireAcpRuntimeBackendMock.mockImplementation(() => {
throw new AcpRuntimeError(
"ACP_BACKEND_UNAVAILABLE",
"ACP runtime backend is currently unavailable. Try again in a moment.",
);
});
hoisted.getAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
const manager = new AcpSessionManager();
const result = await manager.closeSession({
cfg: baseCfg,
sessionKey: "agent:claude:acp:session-1",
reason: "new-in-place-reset",
discardPersistentState: true,
allowBackendUnavailable: true,
clearMeta: false,
});
expect(result.runtimeClosed).toBe(false);
expect(result.runtimeNotice).toContain("currently unavailable");
expect(runtimeState.prepareFreshSession).toHaveBeenCalledWith({
sessionKey: "agent:claude:acp:session-1",
});
});
it("surfaces metadata clear errors during closeSession", async () => {
hoisted.readAcpSessionEntryMock.mockReturnValue({
sessionKey: "agent:codex:acp:session-1",

View File

@@ -6,7 +6,7 @@ import type {
SessionEntry,
} from "../../config/sessions/types.js";
import type { AcpRuntimeError } from "../runtime/errors.js";
import { requireAcpRuntimeBackend } from "../runtime/registry.js";
import { getAcpRuntimeBackend, requireAcpRuntimeBackend } from "../runtime/registry.js";
import {
listAcpSessionEntries,
readAcpSessionEntry,
@@ -136,6 +136,7 @@ export type AcpSessionManagerDeps = {
listAcpSessions: typeof listAcpSessionEntries;
readSessionEntry: typeof readAcpSessionEntry;
upsertSessionMeta: typeof upsertAcpSessionMeta;
getRuntimeBackend: typeof getAcpRuntimeBackend;
requireRuntimeBackend: typeof requireAcpRuntimeBackend;
};
@@ -143,6 +144,7 @@ export const DEFAULT_DEPS: AcpSessionManagerDeps = {
listAcpSessions: listAcpSessionEntries,
readSessionEntry: readAcpSessionEntry,
upsertSessionMeta: upsertAcpSessionMeta,
getRuntimeBackend: getAcpRuntimeBackend,
requireRuntimeBackend: requireAcpRuntimeBackend,
};

View File

@@ -1,7 +1,9 @@
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { buildConfiguredAcpSessionKey } from "./persistent-bindings.types.js";
const managerMocks = vi.hoisted(() => ({
resolveSession: vi.fn(),
closeSession: vi.fn(),
initializeSession: vi.fn(),
updateSessionRuntimeOptions: vi.fn(),
@@ -17,6 +19,7 @@ const resolveMocks = vi.hoisted(() => ({
vi.mock("./control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
resolveSession: managerMocks.resolveSession,
closeSession: managerMocks.closeSession,
initializeSession: managerMocks.initializeSession,
updateSessionRuntimeOptions: managerMocks.updateSessionRuntimeOptions,
@@ -45,6 +48,7 @@ beforeAll(async () => {
});
beforeEach(() => {
managerMocks.resolveSession.mockReset().mockReturnValue({ kind: "none" });
managerMocks.closeSession.mockReset().mockResolvedValue({
runtimeClosed: true,
metaCleared: false,
@@ -56,8 +60,18 @@ beforeEach(() => {
});
describe("resetAcpSessionInPlace", () => {
it("does not resolve configured bindings when ACP metadata already exists", async () => {
const sessionKey = "agent:claude:acp:binding:demo-binding:default:9373ab192b2317f4";
it("clears configured bindings and lets the next turn recreate them", async () => {
const spec = {
channel: "demo-binding",
accountId: "default",
conversationId: "9373ab192b2317f4",
agentId: "claude",
mode: "persistent",
backend: "acpx",
cwd: "/home/bob/clawd",
} as const;
const sessionKey = buildConfiguredAcpSessionKey(spec);
resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockReturnValue(spec);
sessionMetaMocks.readAcpSessionEntry.mockReturnValue({
acp: {
agent: "claude",
@@ -66,9 +80,6 @@ describe("resetAcpSessionInPlace", () => {
runtimeOptions: { cwd: "/home/bob/clawd" },
},
});
resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockImplementation(() => {
throw new Error("configured binding resolution should be skipped");
});
const result = await resetAcpSessionInPlace({
cfg: baseCfg,
@@ -77,15 +88,93 @@ describe("resetAcpSessionInPlace", () => {
});
expect(result).toEqual({ ok: true });
expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).not.toHaveBeenCalled();
expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).toHaveBeenCalledTimes(1);
expect(managerMocks.closeSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
discardPersistentState: true,
clearMeta: false,
clearMeta: true,
}),
);
expect(managerMocks.initializeSession).not.toHaveBeenCalled();
expect(managerMocks.updateSessionRuntimeOptions).not.toHaveBeenCalled();
});
it("falls back to close-only resets when no configured binding exists", async () => {
const sessionKey = "agent:claude:acp:binding:demo-binding:default:9373ab192b2317f4";
sessionMetaMocks.readAcpSessionEntry.mockReturnValue({
acp: {
agent: "claude",
mode: "persistent",
backend: "acpx",
},
});
const result = await resetAcpSessionInPlace({
cfg: baseCfg,
sessionKey,
reason: "reset",
});
expect(result).toEqual({ ok: true });
expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).toHaveBeenCalledTimes(1);
expect(managerMocks.closeSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
clearMeta: false,
}),
);
expect(managerMocks.initializeSession).not.toHaveBeenCalled();
});
it("can force metadata clearing for bound ACP targets outside the configured registry", async () => {
const sessionKey = "agent:claude:acp:binding:demo-binding:default:9373ab192b2317f4";
sessionMetaMocks.readAcpSessionEntry.mockReturnValue({
acp: {
agent: "claude",
mode: "persistent",
backend: "acpx",
},
});
const result = await resetAcpSessionInPlace({
cfg: baseCfg,
sessionKey,
reason: "new",
clearMeta: true,
});
expect(result).toEqual({ ok: true });
expect(resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey).toHaveBeenCalledTimes(1);
expect(managerMocks.closeSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
clearMeta: true,
}),
);
});
it("treats configured bindings with no ACP metadata as already reset", async () => {
const spec = {
channel: "demo-binding",
accountId: "default",
conversationId: "9373ab192b2317f4",
agentId: "claude",
mode: "persistent",
backend: "acpx",
cwd: "/home/bob/clawd",
} as const;
const sessionKey = buildConfiguredAcpSessionKey(spec);
resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockReturnValue(spec);
const result = await resetAcpSessionInPlace({
cfg: baseCfg,
sessionKey,
reason: "new",
});
expect(result).toEqual({ ok: true });
expect(managerMocks.closeSession).not.toHaveBeenCalled();
expect(managerMocks.initializeSession).not.toHaveBeenCalled();
});
});

View File

@@ -139,6 +139,7 @@ export async function resetAcpSessionInPlace(params: {
cfg: OpenClawConfig;
sessionKey: string;
reason: "new" | "reset";
clearMeta?: boolean;
}): Promise<{ ok: true } | { ok: false; skipped?: boolean; error?: string }> {
const sessionKey = params.sessionKey.trim();
if (!sessionKey) {
@@ -152,26 +153,14 @@ export async function resetAcpSessionInPlace(params: {
cfg: params.cfg,
sessionKey,
})?.acp;
const configuredBinding =
!meta || !normalizeText(meta.agent)
? resolveConfiguredAcpBindingSpecBySessionKey({
cfg: params.cfg,
sessionKey,
})
: null;
const configuredBinding = resolveConfiguredAcpBindingSpecBySessionKey({
cfg: params.cfg,
sessionKey,
});
const clearMeta = params.clearMeta ?? Boolean(configuredBinding);
if (!meta) {
if (configuredBinding) {
const ensured = await ensureConfiguredAcpBindingSession({
cfg: params.cfg,
spec: configuredBinding,
});
if (ensured.ok) {
return { ok: true };
}
return {
ok: false,
error: ensured.error,
};
if (clearMeta) {
return { ok: true };
}
return {
ok: false,
@@ -187,14 +176,11 @@ export async function resetAcpSessionInPlace(params: {
sessionKey,
reason: `${params.reason}-in-place-reset`,
discardPersistentState: true,
clearMeta: false,
clearMeta,
allowBackendUnavailable: true,
requireAcpSession: false,
});
// Bound ACP /new and /reset should return as soon as the previous
// runtime state is discarded. The fresh session can be recreated lazily
// on the next turn through the normal binding readiness path.
return { ok: true };
} catch (error) {
const message = formatErrorMessage(error);

View File

@@ -444,6 +444,7 @@ beforeEach(() => {
]),
);
managerMocks.resolveSession.mockReset();
managerMocks.resolveSession.mockReturnValue({ kind: "none" });
managerMocks.closeSession.mockReset().mockResolvedValue({
runtimeClosed: true,
metaCleared: true,
@@ -968,7 +969,7 @@ describe("ensureConfiguredAcpBindingSession", () => {
});
describe("resetAcpSessionInPlace", () => {
it("reinitializes from configured binding when ACP metadata is missing", async () => {
it("treats configured bindings without ACP metadata as already reset", async () => {
const cfg = createCfgWithBindings([
createDiscordBinding({
agentId: "claude",
@@ -996,18 +997,28 @@ describe("resetAcpSessionInPlace", () => {
});
expect(result).toEqual({ ok: true });
expect(managerMocks.initializeSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
agent: "claude",
mode: "persistent",
backendId: "acpx",
}),
);
expect(managerMocks.initializeSession).not.toHaveBeenCalled();
});
it("preserves ACP metadata while discarding runtime state for existing sessions", async () => {
const sessionKey = "agent:claude:acp:binding:discord:default:9373ab192b2317f4";
it("clears existing configured ACP sessions and lets the next turn recreate them", async () => {
const cfg = createCfgWithBindings([
createDiscordBinding({
agentId: "claude",
conversationId: "1478844424791396446",
acp: {
mode: "persistent",
backend: "acpx",
},
}),
]);
const sessionKey = buildConfiguredAcpSessionKey({
channel: "discord",
accountId: "default",
conversationId: "1478844424791396446",
agentId: "claude",
mode: "persistent",
backend: "acpx",
});
sessionMetaMocks.readAcpSessionEntry.mockReturnValue({
acp: {
agent: "claude",
@@ -1018,7 +1029,7 @@ describe("resetAcpSessionInPlace", () => {
});
const result = await persistentBindings.resetAcpSessionInPlace({
cfg: baseCfg,
cfg,
sessionKey,
reason: "reset",
});
@@ -1027,7 +1038,7 @@ describe("resetAcpSessionInPlace", () => {
expect(managerMocks.closeSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
clearMeta: false,
clearMeta: true,
}),
);
expect(managerMocks.initializeSession).not.toHaveBeenCalled();
@@ -1092,14 +1103,27 @@ describe("resetAcpSessionInPlace", () => {
);
});
it("does not eagerly reinitialize harness agent sessions during in-place reset", async () => {
it("clears configured harness agent sessions during in-place reset", async () => {
const cfg = {
...baseCfg,
bindings: [
createDiscordBinding({
agentId: "coding",
conversationId: "1478844424791396446",
}),
],
agents: {
list: [{ id: "main" }, { id: "coding" }],
},
} satisfies OpenClawConfig;
const sessionKey = "agent:coding:acp:binding:discord:default:9373ab192b2317f4";
const sessionKey = buildConfiguredAcpSessionKey({
channel: "discord",
accountId: "default",
conversationId: "1478844424791396446",
agentId: "coding",
mode: "persistent",
backend: "acpx",
});
sessionMetaMocks.readAcpSessionEntry.mockReturnValue({
acp: {
agent: "codex",
@@ -1118,7 +1142,7 @@ describe("resetAcpSessionInPlace", () => {
expect(managerMocks.initializeSession).not.toHaveBeenCalled();
});
it("does not eagerly reinitialize configured ACP agent overrides when metadata omits the agent", async () => {
it("clears configured ACP agent overrides even when metadata omits the agent", async () => {
const cfg = createCfgWithBindings(
[
createDiscordBinding({

View File

@@ -6,9 +6,9 @@ import type { HandleCommandsParams } from "./commands-types.js";
import { parseInlineDirectives } from "./directive-handling.parse.js";
const triggerInternalHookMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined));
vi.mock("../../channels/plugins/binding-targets.js", () => ({
resetConfiguredBindingTargetInPlace: vi.fn().mockResolvedValue({ ok: false, skipped: true }),
const resetMocks = vi.hoisted(() => ({
resetConfiguredBindingTargetInPlace: vi.fn().mockResolvedValue({ ok: true as const }),
resolveBoundAcpThreadSessionKey: vi.fn(() => undefined as string | undefined),
}));
vi.mock("../../hooks/internal-hooks.js", () => ({
@@ -37,8 +37,12 @@ vi.mock("../commands-registry.js", () => ({
shouldHandleTextCommands: () => true,
}));
vi.mock("../../channels/plugins/binding-targets.js", () => ({
resetConfiguredBindingTargetInPlace: resetMocks.resetConfiguredBindingTargetInPlace,
}));
vi.mock("./commands-acp/targets.js", () => ({
resolveBoundAcpThreadSessionKey: vi.fn(() => undefined),
resolveBoundAcpThreadSessionKey: resetMocks.resolveBoundAcpThreadSessionKey,
}));
vi.mock("./commands-handlers.runtime.js", () => ({
@@ -96,6 +100,8 @@ function buildResetParams(
describe("handleCommands reset hooks", () => {
beforeEach(() => {
vi.clearAllMocks();
resetMocks.resetConfiguredBindingTargetInPlace.mockResolvedValue({ ok: true });
resetMocks.resolveBoundAcpThreadSessionKey.mockReturnValue(undefined);
});
it("triggers hooks for /new commands", async () => {
@@ -149,4 +155,62 @@ describe("handleCommands reset hooks", () => {
triggerInternalHookMock.mockClear();
}
});
it("uses gateway session reset for bound ACP sessions", async () => {
resetMocks.resolveBoundAcpThreadSessionKey.mockReturnValue(
"agent:claude:acp:binding:discord:default:9373ab192b2317f4",
);
const params = buildResetParams(
"/reset",
{
commands: { text: true },
channels: { discord: { allowFrom: ["*"] } },
} as OpenClawConfig,
{
Provider: "discord",
Surface: "discord",
CommandSource: "native",
},
);
const result = await maybeHandleResetCommand(params);
expect(resetMocks.resetConfiguredBindingTargetInPlace).toHaveBeenCalledWith({
cfg: expect.any(Object),
sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4",
reason: "reset",
commandSource: "discord:native",
});
expect(result).toEqual({
shouldContinue: false,
reply: { text: "✅ ACP session reset in place." },
});
expect(triggerInternalHookMock).not.toHaveBeenCalled();
expect(params.command.resetHookTriggered).toBe(true);
});
it("keeps tail dispatch after a bound ACP reset", async () => {
resetMocks.resolveBoundAcpThreadSessionKey.mockReturnValue(
"agent:claude:acp:binding:discord:default:9373ab192b2317f4",
);
const params = buildResetParams(
"/new who are you",
{
commands: { text: true },
channels: { discord: { allowFrom: ["*"] } },
} as OpenClawConfig,
{
Provider: "discord",
Surface: "discord",
CommandSource: "native",
},
);
const result = await maybeHandleResetCommand(params);
expect(result).toEqual({ shouldContinue: false });
expect(params.ctx.Body).toBe("who are you");
expect(params.ctx.CommandBody).toBe("who are you");
expect(params.ctx.AcpDispatchTailAfterReset).toBe(true);
});
});

View File

@@ -16,29 +16,6 @@ function applyAcpResetTailContext(ctx: HandleCommandsParams["ctx"], resetTail: s
mutableCtx.AcpDispatchTailAfterReset = true;
}
function resolveSessionEntryForHookSessionKey(
sessionStore: HandleCommandsParams["sessionStore"] | undefined,
sessionKey: string,
): HandleCommandsParams["sessionEntry"] | undefined {
if (!sessionStore) {
return undefined;
}
const directEntry = sessionStore[sessionKey];
if (directEntry) {
return directEntry;
}
const normalizedTarget = sessionKey.trim().toLowerCase();
if (!normalizedTarget) {
return undefined;
}
for (const [candidateKey, candidateEntry] of Object.entries(sessionStore)) {
if (candidateKey.trim().toLowerCase() === normalizedTarget) {
return candidateEntry;
}
}
return undefined;
}
export async function maybeHandleResetCommand(
params: HandleCommandsParams,
): Promise<CommandHandlerResult | null> {
@@ -65,31 +42,13 @@ export async function maybeHandleResetCommand(
cfg: params.cfg,
sessionKey: boundAcpKey,
reason: commandAction,
commandSource: `${params.command.surface}:${params.ctx.CommandSource ?? "text"}`,
});
if (!resetResult.ok && !resetResult.skipped) {
logVerbose(
`acp reset-in-place failed for ${boundAcpKey}: ${resetResult.error ?? "unknown error"}`,
);
if (!resetResult.ok) {
logVerbose(`acp reset failed for ${boundAcpKey}: ${resetResult.error ?? "unknown error"}`);
}
if (resetResult.ok) {
const hookSessionEntry =
boundAcpKey === params.sessionKey
? params.sessionEntry
: resolveSessionEntryForHookSessionKey(params.sessionStore, boundAcpKey);
const hookPreviousSessionEntry =
boundAcpKey === params.sessionKey
? params.previousSessionEntry
: resolveSessionEntryForHookSessionKey(params.sessionStore, boundAcpKey);
await emitResetCommandHooks({
action: commandAction,
ctx: params.ctx,
cfg: params.cfg,
command: params.command,
sessionKey: boundAcpKey,
sessionEntry: hookSessionEntry,
previousSessionEntry: hookPreviousSessionEntry,
workspaceDir: params.workspaceDir,
});
params.command.resetHookTriggered = true;
if (resetTail) {
applyAcpResetTailContext(params.ctx, resetTail);
if (params.rootCtx && params.rootCtx !== params.ctx) {
@@ -102,14 +61,6 @@ export async function maybeHandleResetCommand(
reply: { text: "✅ ACP session reset in place." },
};
}
if (resetResult.skipped) {
return {
shouldContinue: false,
reply: {
text: "⚠️ ACP session reset unavailable for this bound conversation. Rebind with /acp bind or /acp spawn.",
},
};
}
return {
shouldContinue: false,
reply: { text: "⚠️ ACP session reset failed. Check /acp status and try again." },

View File

@@ -286,6 +286,26 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("does not block delivery when reply lifecycle startup hangs", async () => {
const onReplyStart = vi.fn(
async () =>
await new Promise<void>(() => {
// Intentionally never resolve to simulate a stuck typing/reaction side effect.
}),
);
const coordinator = createCoordinator(onReplyStart);
const delivered = await Promise.race([
coordinator.deliver("final", { text: "hello" }).then(() => "delivered"),
new Promise<string>((resolve) => {
setTimeout(() => resolve("timed-out"), 50);
}),
]);
expect(delivered).toBe("delivered");
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("does not start reply lifecycle for empty payload delivery", async () => {
const onReplyStart = vi.fn(async () => {});
const coordinator = createCoordinator(onReplyStart);

View File

@@ -215,7 +215,11 @@ export function createAcpDispatchDeliveryCoordinator(params: {
return;
}
state.startedReplyLifecycle = true;
await params.onReplyStart?.();
void Promise.resolve(params.onReplyStart?.()).catch((error) => {
logVerbose(
`dispatch-acp: reply lifecycle start failed: ${error instanceof Error ? error.message : String(error)}`,
);
});
};
const tryEditToolMessage = async (

View File

@@ -0,0 +1,77 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const resetMocks = vi.hoisted(() => ({
performGatewaySessionReset: vi.fn(async () => ({
ok: true as const,
key: "agent:claude:acp:binding:discord:default:9373ab192b2317f4",
entry: { sessionId: "next-session", updatedAt: 1 },
})),
}));
const sessionMetaMocks = vi.hoisted(() => ({
readAcpSessionEntry: vi.fn(() => null),
}));
const resolveMocks = vi.hoisted(() => ({
resolveConfiguredAcpBindingSpecBySessionKey: vi.fn(() => null),
}));
vi.mock("../../acp/persistent-bindings.lifecycle.js", () => ({
ensureConfiguredAcpBindingReady: vi.fn(),
ensureConfiguredAcpBindingSession: vi.fn(),
}));
vi.mock("./acp-stateful-target-reset.runtime.js", () => ({
performGatewaySessionReset: resetMocks.performGatewaySessionReset,
}));
vi.mock("../../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: sessionMetaMocks.readAcpSessionEntry,
}));
vi.mock("../../acp/persistent-bindings.resolve.js", () => ({
resolveConfiguredAcpBindingSpecBySessionKey:
resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey,
}));
import { acpStatefulBindingTargetDriver } from "./acp-stateful-target-driver.js";
describe("acpStatefulBindingTargetDriver", () => {
beforeEach(() => {
resetMocks.performGatewaySessionReset.mockClear();
sessionMetaMocks.readAcpSessionEntry.mockClear();
resolveMocks.resolveConfiguredAcpBindingSpecBySessionKey.mockClear();
});
it("delegates bound resets to the gateway session reset authority", async () => {
await expect(
acpStatefulBindingTargetDriver.resetInPlace?.({
cfg: {} as never,
sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4",
reason: "new",
commandSource: "discord:native",
bindingTarget: {
kind: "stateful",
driverId: "acp",
sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4",
agentId: "claude",
},
}),
).resolves.toEqual({ ok: true });
expect(resetMocks.performGatewaySessionReset).toHaveBeenCalledWith({
key: "agent:claude:acp:binding:discord:default:9373ab192b2317f4",
reason: "new",
commandSource: "discord:native",
});
});
it("keeps ACP reset available when metadata has already been cleared", () => {
expect(
acpStatefulBindingTargetDriver.resolveTargetBySessionKey?.({
cfg: {} as never,
sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4",
}),
).toEqual({
kind: "stateful",
driverId: "acp",
sessionKey: "agent:claude:acp:binding:discord:default:9373ab192b2317f4",
agentId: "claude",
});
});
});

View File

@@ -1,12 +1,13 @@
import {
ensureConfiguredAcpBindingReady,
ensureConfiguredAcpBindingSession,
resetAcpSessionInPlace,
} from "../../acp/persistent-bindings.lifecycle.js";
import { resolveConfiguredAcpBindingSpecBySessionKey } from "../../acp/persistent-bindings.resolve.js";
import { resolveConfiguredAcpBindingSpecFromRecord } from "../../acp/persistent-bindings.types.js";
import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import { isAcpSessionKey, resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { performGatewaySessionReset } from "./acp-stateful-target-reset.runtime.js";
import type {
ConfiguredBindingResolution,
StatefulBindingTargetDescriptor,
@@ -22,24 +23,45 @@ function toAcpStatefulBindingTargetDescriptor(params: {
cfg: OpenClawConfig;
sessionKey: string;
}): StatefulBindingTargetDescriptor | null {
const meta = readAcpSessionEntry(params)?.acp;
const sessionKey = params.sessionKey.trim();
if (!sessionKey) {
return null;
}
const meta = readAcpSessionEntry({
...params,
sessionKey,
})?.acp;
const metaAgentId = meta?.agent?.trim();
if (metaAgentId) {
return {
kind: "stateful",
driverId: "acp",
sessionKey: params.sessionKey,
sessionKey,
agentId: metaAgentId,
};
}
const spec = resolveConfiguredAcpBindingSpecBySessionKey(params);
const spec = resolveConfiguredAcpBindingSpecBySessionKey({
...params,
sessionKey,
});
if (!spec) {
return null;
if (!isAcpSessionKey(sessionKey)) {
return null;
}
// Bound ACP sessions can intentionally clear their ACP metadata after a
// reset. The native /reset path still needs to recognize the ACP session
// key as resettable while that metadata is absent.
return {
kind: "stateful",
driverId: "acp",
sessionKey,
agentId: resolveAgentIdFromSessionKey(sessionKey),
};
}
return {
kind: "stateful",
driverId: "acp",
sessionKey: params.sessionKey,
sessionKey,
agentId: spec.agentId,
...(spec.label ? { label: spec.label } : {}),
};
@@ -88,9 +110,22 @@ async function ensureAcpTargetSession(params: {
async function resetAcpTargetInPlace(params: {
cfg: OpenClawConfig;
sessionKey: string;
bindingTarget: StatefulBindingTargetDescriptor;
reason: "new" | "reset";
commandSource?: string;
}): Promise<StatefulBindingTargetResetResult> {
return await resetAcpSessionInPlace(params);
const result = await performGatewaySessionReset({
key: params.sessionKey,
reason: params.reason,
commandSource: params.commandSource ?? "stateful-target:acp-reset-in-place",
});
if (result.ok) {
return { ok: true };
}
return {
ok: false,
error: result.error.message,
};
}
export const acpStatefulBindingTargetDriver: StatefulBindingTargetDriver = {

View File

@@ -0,0 +1 @@
export { performGatewaySessionReset } from "../../gateway/session-reset-service.js";

View File

@@ -166,6 +166,7 @@ describe("binding target drivers", () => {
cfg: {} as never,
sessionKey: "agent:codex:test-driver",
reason: "reset",
commandSource: "discord:native",
}),
).resolves.toEqual({ ok: true });
@@ -174,6 +175,7 @@ describe("binding target drivers", () => {
cfg: {} as never,
sessionKey: "agent:codex:test-driver",
reason: "reset",
commandSource: "discord:native",
bindingTarget: {
kind: "stateful",
driverId: "test-driver",

View File

@@ -31,6 +31,7 @@ export async function resetConfiguredBindingTargetInPlace(params: {
cfg: OpenClawConfig;
sessionKey: string;
reason: "new" | "reset";
commandSource?: string;
}): Promise<{ ok: true } | { ok: false; skipped?: boolean; error?: string }> {
await ensureStatefulTargetBuiltinsRegistered();
const resolved = resolveStatefulBindingTargetBySessionKey({

View File

@@ -31,6 +31,7 @@ export type StatefulBindingTargetDriver = {
sessionKey: string;
bindingTarget: StatefulBindingTargetDescriptor;
reason: "new" | "reset";
commandSource?: string;
}) => Promise<StatefulBindingTargetResetResult>;
};

View File

@@ -2169,6 +2169,7 @@ describe("gateway server sessions", () => {
expect(acpManagerMocks.closeSession).toHaveBeenCalledWith({
allowBackendUnavailable: true,
cfg: expect.any(Object),
discardPersistentState: true,
requireAcpSession: false,
reason: "session-delete",
sessionKey: "agent:main:discord:group:dev",
@@ -2411,6 +2412,13 @@ describe("gateway server sessions", () => {
test("sessions.reset closes ACP runtime handles for ACP sessions", async () => {
const { dir, storePath } = await createSessionStoreDir();
await writeSingleLineSession(dir, "sess-main", "hello");
const prepareFreshSession = vi.fn(async () => {});
acpRuntimeMocks.getAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime: {
prepareFreshSession,
},
});
await writeSessionStore({
entries: {
@@ -2421,6 +2429,13 @@ describe("gateway server sessions", () => {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:reset",
identity: {
state: "resolved",
acpxRecordId: "agent:main:main",
acpxSessionId: "backend-session-1",
source: "status",
lastUpdatedAt: Date.now(),
},
mode: "persistent",
runtimeOptions: {
runtimeMode: "auto",
@@ -2442,6 +2457,11 @@ describe("gateway server sessions", () => {
backend?: string;
agent?: string;
runtimeSessionName?: string;
identity?: {
state?: string;
acpxRecordId?: string;
acpxSessionId?: string;
};
mode?: string;
runtimeOptions?: {
runtimeMode?: string;
@@ -2459,6 +2479,10 @@ describe("gateway server sessions", () => {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:reset",
identity: {
state: "pending",
acpxRecordId: "agent:main:main",
},
mode: "persistent",
runtimeOptions: {
runtimeMode: "auto",
@@ -2467,13 +2491,18 @@ describe("gateway server sessions", () => {
cwd: "/tmp/acp-session",
state: "idle",
});
expect(reset.payload?.entry.acp?.identity?.acpxSessionId).toBeUndefined();
expect(acpManagerMocks.closeSession).toHaveBeenCalledWith({
allowBackendUnavailable: true,
cfg: expect.any(Object),
discardPersistentState: true,
requireAcpSession: false,
reason: "session-reset",
sessionKey: "agent:main:main",
});
expect(prepareFreshSession).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
});
const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
{
@@ -2481,6 +2510,11 @@ describe("gateway server sessions", () => {
backend?: string;
agent?: string;
runtimeSessionName?: string;
identity?: {
state?: string;
acpxRecordId?: string;
acpxSessionId?: string;
};
mode?: string;
runtimeOptions?: {
runtimeMode?: string;
@@ -2495,6 +2529,10 @@ describe("gateway server sessions", () => {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:reset",
identity: {
state: "pending",
acpxRecordId: "agent:main:main",
},
mode: "persistent",
runtimeOptions: {
runtimeMode: "auto",
@@ -2503,6 +2541,7 @@ describe("gateway server sessions", () => {
cwd: "/tmp/acp-session",
state: "idle",
});
expect(store["agent:main:main"]?.acp?.identity?.acpxSessionId).toBeUndefined();
ws.close();
});

View File

@@ -3,6 +3,8 @@ import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
import { getAcpRuntimeBackend } from "../acp/runtime/registry.js";
import { readAcpSessionEntry, upsertAcpSessionMeta } from "../acp/runtime/session-meta.js";
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
import { clearBootstrapSnapshot } from "../agents/bootstrap-cache.js";
import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../agents/pi-embedded.js";
@@ -19,6 +21,7 @@ import {
updateSessionStore,
} from "../config/sessions.js";
import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js";
import type { SessionAcpMeta } from "../config/sessions/types.js";
import { logVerbose } from "../globals.js";
import { createInternalHookEvent, triggerInternalHook } from "../hooks/internal-hooks.js";
import { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
@@ -284,6 +287,7 @@ async function closeAcpRuntimeForSession(params: {
cfg: params.cfg,
sessionKey: params.sessionKey,
reason: params.reason,
discardPersistentState: true,
requireAcpSession: false,
allowBackendUnavailable: true,
});
@@ -300,9 +304,84 @@ async function closeAcpRuntimeForSession(params: {
`sessions.${params.reason}: ACP runtime close failed for ${params.sessionKey}: ${String(closeOutcome.error)}`,
);
}
await ensureFreshAcpResetState({
cfg: params.cfg,
sessionKey: params.sessionKey,
reason: params.reason,
entry: params.entry,
});
return undefined;
}
function buildPendingAcpMeta(base: SessionAcpMeta, now: number): SessionAcpMeta {
const currentIdentity = base.identity;
const nextIdentity = currentIdentity
? {
state: "pending" as const,
...(currentIdentity.acpxRecordId ? { acpxRecordId: currentIdentity.acpxRecordId } : {}),
source: currentIdentity.source,
lastUpdatedAt: now,
}
: undefined;
return {
backend: base.backend,
agent: base.agent,
runtimeSessionName: base.runtimeSessionName,
...(nextIdentity ? { identity: nextIdentity } : {}),
mode: base.mode,
...(base.runtimeOptions ? { runtimeOptions: base.runtimeOptions } : {}),
...(base.cwd ? { cwd: base.cwd } : {}),
state: "idle",
lastActivityAt: now,
};
}
async function ensureFreshAcpResetState(params: {
cfg: ReturnType<typeof loadConfig>;
sessionKey: string;
reason: "session-reset" | "session-delete";
entry?: SessionEntry;
}): Promise<void> {
if (params.reason !== "session-reset" || !params.entry?.acp) {
return;
}
const latestMeta = readAcpSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
})?.acp;
if (
!latestMeta?.identity ||
latestMeta.identity.state !== "resolved" ||
(!latestMeta.identity.acpxSessionId && !latestMeta.identity.agentSessionId)
) {
return;
}
const backendId = (latestMeta.backend || params.cfg.acp?.backend || "").trim() || undefined;
try {
await getAcpRuntimeBackend(backendId)?.runtime.prepareFreshSession?.({
sessionKey: params.sessionKey,
});
} catch (error) {
logVerbose(
`sessions.${params.reason}: ACP prepareFreshSession failed for ${params.sessionKey}: ${String(error)}`,
);
}
const now = Date.now();
await upsertAcpSessionMeta({
cfg: params.cfg,
sessionKey: params.sessionKey,
mutate: (current, entry) => {
const base = current ?? entry?.acp;
if (!base) {
return null;
}
return buildPendingAcpMeta(base, now);
},
});
}
export async function cleanupSessionBeforeMutation(params: {
cfg: ReturnType<typeof loadConfig>;
key: string;

View File

@@ -131,6 +131,26 @@ describe("ensureOpenClawCliOnPath", () => {
expect(updated[0]).toBe(appBinDir);
});
it("keeps the current runtime directory ahead of system PATH hardening", () => {
const tmp = abs("/tmp/openclaw-path/case-runtime-dir");
const nodeBinDir = path.join(tmp, "node-bin");
const nodeExec = path.join(nodeBinDir, "node");
setDir(tmp);
setDir(nodeBinDir);
setExe(nodeExec);
resetBootstrapEnv("/usr/bin:/bin");
const updated = bootstrapPath({
execPath: nodeExec,
cwd: tmp,
homeDir: tmp,
platform: "linux",
});
expect(updated[0]).toBe(nodeBinDir);
expect(updated.indexOf(nodeBinDir)).toBeLessThan(updated.indexOf("/usr/bin"));
});
it("is idempotent", () => {
process.env.PATH = "/bin";
process.env.OPENCLAW_PATH_BOOTSTRAPPED = "1";

View File

@@ -58,6 +58,17 @@ function candidateBinDirs(opts: EnsureOpenClawPathOpts): { prepend: string[]; ap
const prepend: string[] = [];
const append: string[] = [];
// Keep the active runtime directory ahead of PATH hardening so shebang-based
// subprocesses keep using the same Node/Bun the current OpenClaw process is on.
try {
const execDir = path.dirname(execPath);
if (isExecutable(execPath)) {
prepend.push(execDir);
}
} catch {
// ignore
}
// Bundled macOS app: `openclaw` lives next to the executable (process.execPath).
try {
const execDir = path.dirname(execPath);

View File

@@ -8,6 +8,7 @@ import { createAutoReplyCoreVitestConfig } from "../vitest.auto-reply-core.confi
import { createAutoReplyReplyVitestConfig } from "../vitest.auto-reply-reply.config.ts";
import { createAutoReplyTopLevelVitestConfig } from "../vitest.auto-reply-top-level.config.ts";
import { createAutoReplyVitestConfig } from "../vitest.auto-reply.config.ts";
import bundledVitestConfig from "../vitest.bundled.config.ts";
import { createChannelsVitestConfig } from "../vitest.channels.config.ts";
import { createCliVitestConfig } from "../vitest.cli.config.ts";
import { createCommandsLightVitestConfig } from "../vitest.commands-light.config.ts";
@@ -49,6 +50,7 @@ import { createTasksVitestConfig } from "../vitest.tasks.config.ts";
import { createToolingVitestConfig } from "../vitest.tooling.config.ts";
import { createTuiVitestConfig } from "../vitest.tui.config.ts";
import { createUiVitestConfig } from "../vitest.ui.config.ts";
import { bundledPluginDependentUnitTestFiles } from "../vitest.unit-paths.mjs";
import { createUtilsVitestConfig } from "../vitest.utils.config.ts";
import { createWizardVitestConfig } from "../vitest.wizard.config.ts";
import { BUNDLED_PLUGIN_TEST_GLOB, bundledPluginFile } from "./helpers/bundled-plugin-paths.js";
@@ -56,6 +58,17 @@ import { cleanupTempDirs, makeTempDir } from "./helpers/temp-dir.js";
const EXTENSIONS_CHANNEL_GLOB = ["extensions", "channel", "**"].join("/");
function bundledExcludePatternCouldMatchFile(pattern: string, file: string): boolean {
if (pattern === file) {
return true;
}
if (pattern.endsWith("/**")) {
const prefix = pattern.slice(0, -3);
return file === prefix || file.startsWith(`${prefix}/`);
}
return false;
}
describe("resolveVitestIsolation", () => {
it("defaults shared scoped configs to the non-isolated runner", () => {
expect(resolveVitestIsolation({})).toBe(false);
@@ -138,6 +151,15 @@ describe("createScopedVitestConfig", () => {
"test/setup-openclaw-runtime.ts",
]);
});
it("keeps bundled unit test includes out of the bundled exclude list", () => {
const excludePatterns = bundledVitestConfig.test?.exclude ?? [];
for (const file of bundledPluginDependentUnitTestFiles) {
expect(
excludePatterns.some((pattern) => bundledExcludePatternCouldMatchFile(pattern, file)),
).toBe(false);
}
});
});
describe("scoped vitest configs", () => {

View File

@@ -5,8 +5,28 @@ import {
} from "./vitest.unit-paths.mjs";
import { createUnitVitestConfigWithOptions } from "./vitest.unit.config.ts";
function normalizeGlobCandidate(value: string): string {
return value.split(path.sep).join("/");
}
function excludePatternCouldMatchFile(pattern: string, file: string): boolean {
const normalizedPattern = normalizeGlobCandidate(pattern);
const normalizedFile = normalizeGlobCandidate(file);
if (normalizedPattern === normalizedFile) {
return true;
}
if (normalizedPattern.endsWith("/**")) {
const prefix = normalizedPattern.slice(0, -3);
return normalizedFile === prefix || normalizedFile.startsWith(`${prefix}/`);
}
return path.matchesGlob(normalizedFile, normalizedPattern);
}
const bundledUnitExcludePatterns = unitTestAdditionalExcludePatterns.filter(
(pattern) => !bundledPluginDependentUnitTestFiles.some((file) => path.matchesGlob(file, pattern)),
(pattern) =>
!bundledPluginDependentUnitTestFiles.some((file) =>
excludePatternCouldMatchFile(pattern, file),
),
);
export default createUnitVitestConfigWithOptions(process.env, {