Discord/ACP: forward abort signals into ACP turns (#52148)

* Discord/ACP: forward abort signals into ACP turns

* ACP: abort queued turns before actor start
This commit is contained in:
Bob
2026-03-22 10:04:32 +01:00
committed by GitHub
parent 8ac3e41cdf
commit aa6b962a3a
6 changed files with 414 additions and 198 deletions

View File

@@ -233,6 +233,7 @@ Docs: https://docs.openclaw.ai
- make `openclaw update status` explicitly say `up to date` when the local version already matches npm latest, while keeping the availability logic unchanged. (#51409) Thanks @dongzhenye.
- Android/canvas: recycle captured and scaled snapshot bitmaps so repeated canvas snapshots do not leak native image memory. (#41889) Thanks @Kaneki-x.
- Android/theme: switch status bar icon contrast with the active system theme so Android light mode no longer leaves unreadable light icons over the app header. (#51098) Thanks @goweii.
- Discord/ACP: forward worker abort signals into ACP turns so timed-out Discord jobs cancel the running turn instead of silently leaving the bound ACP session working in the background.
### Breaking

View File

@@ -609,215 +609,219 @@ export class AcpSessionManager {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP session key is required.");
}
await this.evictIdleRuntimeHandles({ cfg: input.cfg });
await this.withSessionActor(sessionKey, async () => {
const turnStartedAt = Date.now();
const actorKey = normalizeActorKey(sessionKey);
for (let attempt = 0; attempt < 2; attempt += 1) {
const resolution = this.resolveSession({
cfg: input.cfg,
sessionKey,
});
const resolvedMeta = requireReadySessionMeta(resolution);
let runtime: AcpRuntime | undefined;
let handle: AcpRuntimeHandle | undefined;
let meta: SessionAcpMeta | undefined;
let activeTurn: ActiveTurnState | undefined;
let internalAbortController: AbortController | undefined;
let onCallerAbort: (() => void) | undefined;
let activeTurnStarted = false;
let sawTurnOutput = false;
let retryFreshHandle = false;
let skipPostTurnCleanup = false;
try {
const ensured = await this.ensureRuntimeHandle({
await this.withSessionActor(
sessionKey,
async () => {
const turnStartedAt = Date.now();
const actorKey = normalizeActorKey(sessionKey);
for (let attempt = 0; attempt < 2; attempt += 1) {
const resolution = this.resolveSession({
cfg: input.cfg,
sessionKey,
meta: resolvedMeta,
});
runtime = ensured.runtime;
handle = ensured.handle;
meta = ensured.meta;
await this.applyRuntimeControls({
sessionKey,
runtime,
handle,
meta,
});
await this.setSessionState({
cfg: input.cfg,
sessionKey,
state: "running",
clearLastError: true,
});
internalAbortController = new AbortController();
onCallerAbort = () => {
internalAbortController?.abort();
};
if (input.signal?.aborted) {
internalAbortController.abort();
} else if (input.signal) {
input.signal.addEventListener("abort", onCallerAbort, { once: true });
}
activeTurn = {
runtime,
handle,
abortController: internalAbortController,
};
this.activeTurnBySession.set(actorKey, activeTurn);
activeTurnStarted = true;
let streamError: AcpRuntimeError | null = null;
const combinedSignal =
input.signal && typeof AbortSignal.any === "function"
? AbortSignal.any([input.signal, internalAbortController.signal])
: internalAbortController.signal;
const eventGate = { open: true };
const turnPromise = (async () => {
for await (const event of runtime.runTurn({
handle,
text: input.text,
attachments: input.attachments,
mode: input.mode,
requestId: input.requestId,
signal: combinedSignal,
})) {
if (!eventGate.open) {
continue;
}
if (event.type === "error") {
streamError = new AcpRuntimeError(
normalizeAcpErrorCode(event.code),
event.message?.trim() || "ACP turn failed before completion.",
);
} else if (event.type === "text_delta" || event.type === "tool_call") {
sawTurnOutput = true;
}
if (input.onEvent) {
await input.onEvent(event);
}
}
if (eventGate.open && streamError) {
throw streamError;
}
})();
const turnTimeoutMs = this.resolveTurnTimeoutMs({
cfg: input.cfg,
meta,
});
const sessionMode = meta.mode;
await this.awaitTurnWithTimeout({
sessionKey,
turnPromise,
timeoutMs: turnTimeoutMs + ACP_TURN_TIMEOUT_GRACE_MS,
timeoutLabelMs: turnTimeoutMs,
onTimeout: async () => {
eventGate.open = false;
skipPostTurnCleanup = true;
if (!activeTurn) {
return;
}
await this.cleanupTimedOutTurn({
sessionKey,
activeTurn,
mode: sessionMode,
});
},
});
if (streamError) {
throw streamError;
}
this.recordTurnCompletion({
startedAt: turnStartedAt,
});
await this.setSessionState({
cfg: input.cfg,
sessionKey,
state: "idle",
clearLastError: true,
});
return;
} catch (error) {
const acpError = toAcpRuntimeError({
error,
fallbackCode: activeTurnStarted ? "ACP_TURN_FAILED" : "ACP_SESSION_INIT_FAILED",
fallbackMessage: activeTurnStarted
? "ACP turn failed before completion."
: "Could not initialize ACP session runtime.",
});
retryFreshHandle = this.shouldRetryTurnWithFreshHandle({
attempt,
sessionKey,
error: acpError,
sawTurnOutput,
});
if (retryFreshHandle) {
continue;
}
this.recordTurnCompletion({
startedAt: turnStartedAt,
errorCode: acpError.code,
});
await this.setSessionState({
cfg: input.cfg,
sessionKey,
state: "error",
lastError: acpError.message,
});
throw acpError;
} finally {
if (input.signal && onCallerAbort) {
input.signal.removeEventListener("abort", onCallerAbort);
}
if (activeTurn && this.activeTurnBySession.get(actorKey) === activeTurn) {
this.activeTurnBySession.delete(actorKey);
}
if (
!retryFreshHandle &&
!skipPostTurnCleanup &&
runtime &&
handle &&
meta &&
meta.mode !== "oneshot"
) {
({ handle } = await this.reconcileRuntimeSessionIdentifiers({
const resolvedMeta = requireReadySessionMeta(resolution);
let runtime: AcpRuntime | undefined;
let handle: AcpRuntimeHandle | undefined;
let meta: SessionAcpMeta | undefined;
let activeTurn: ActiveTurnState | undefined;
let internalAbortController: AbortController | undefined;
let onCallerAbort: (() => void) | undefined;
let activeTurnStarted = false;
let sawTurnOutput = false;
let retryFreshHandle = false;
let skipPostTurnCleanup = false;
try {
const ensured = await this.ensureRuntimeHandle({
cfg: input.cfg,
sessionKey,
meta: resolvedMeta,
});
runtime = ensured.runtime;
handle = ensured.handle;
meta = ensured.meta;
await this.applyRuntimeControls({
sessionKey,
runtime,
handle,
meta,
failOnStatusError: false,
}));
}
if (
!retryFreshHandle &&
!skipPostTurnCleanup &&
runtime &&
handle &&
meta &&
meta.mode === "oneshot"
) {
try {
await runtime.close({
});
await this.setSessionState({
cfg: input.cfg,
sessionKey,
state: "running",
clearLastError: true,
});
internalAbortController = new AbortController();
onCallerAbort = () => {
internalAbortController?.abort();
};
if (input.signal?.aborted) {
internalAbortController.abort();
} else if (input.signal) {
input.signal.addEventListener("abort", onCallerAbort, { once: true });
}
activeTurn = {
runtime,
handle,
abortController: internalAbortController,
};
this.activeTurnBySession.set(actorKey, activeTurn);
activeTurnStarted = true;
let streamError: AcpRuntimeError | null = null;
const combinedSignal =
input.signal && typeof AbortSignal.any === "function"
? AbortSignal.any([input.signal, internalAbortController.signal])
: internalAbortController.signal;
const eventGate = { open: true };
const turnPromise = (async () => {
for await (const event of runtime.runTurn({
handle,
reason: "oneshot-complete",
});
} catch (error) {
logVerbose(
`acp-manager: ACP oneshot close failed for ${sessionKey}: ${String(error)}`,
);
} finally {
this.clearCachedRuntimeState(sessionKey);
text: input.text,
attachments: input.attachments,
mode: input.mode,
requestId: input.requestId,
signal: combinedSignal,
})) {
if (!eventGate.open) {
continue;
}
if (event.type === "error") {
streamError = new AcpRuntimeError(
normalizeAcpErrorCode(event.code),
event.message?.trim() || "ACP turn failed before completion.",
);
} else if (event.type === "text_delta" || event.type === "tool_call") {
sawTurnOutput = true;
}
if (input.onEvent) {
await input.onEvent(event);
}
}
if (eventGate.open && streamError) {
throw streamError;
}
})();
const turnTimeoutMs = this.resolveTurnTimeoutMs({
cfg: input.cfg,
meta,
});
const sessionMode = meta.mode;
await this.awaitTurnWithTimeout({
sessionKey,
turnPromise,
timeoutMs: turnTimeoutMs + ACP_TURN_TIMEOUT_GRACE_MS,
timeoutLabelMs: turnTimeoutMs,
onTimeout: async () => {
eventGate.open = false;
skipPostTurnCleanup = true;
if (!activeTurn) {
return;
}
await this.cleanupTimedOutTurn({
sessionKey,
activeTurn,
mode: sessionMode,
});
},
});
if (streamError) {
throw streamError;
}
this.recordTurnCompletion({
startedAt: turnStartedAt,
});
await this.setSessionState({
cfg: input.cfg,
sessionKey,
state: "idle",
clearLastError: true,
});
return;
} catch (error) {
const acpError = toAcpRuntimeError({
error,
fallbackCode: activeTurnStarted ? "ACP_TURN_FAILED" : "ACP_SESSION_INIT_FAILED",
fallbackMessage: activeTurnStarted
? "ACP turn failed before completion."
: "Could not initialize ACP session runtime.",
});
retryFreshHandle = this.shouldRetryTurnWithFreshHandle({
attempt,
sessionKey,
error: acpError,
sawTurnOutput,
});
if (retryFreshHandle) {
continue;
}
this.recordTurnCompletion({
startedAt: turnStartedAt,
errorCode: acpError.code,
});
await this.setSessionState({
cfg: input.cfg,
sessionKey,
state: "error",
lastError: acpError.message,
});
throw acpError;
} finally {
if (input.signal && onCallerAbort) {
input.signal.removeEventListener("abort", onCallerAbort);
}
if (activeTurn && this.activeTurnBySession.get(actorKey) === activeTurn) {
this.activeTurnBySession.delete(actorKey);
}
if (
!retryFreshHandle &&
!skipPostTurnCleanup &&
runtime &&
handle &&
meta &&
meta.mode !== "oneshot"
) {
({ handle } = await this.reconcileRuntimeSessionIdentifiers({
cfg: input.cfg,
sessionKey,
runtime,
handle,
meta,
failOnStatusError: false,
}));
}
if (
!retryFreshHandle &&
!skipPostTurnCleanup &&
runtime &&
handle &&
meta &&
meta.mode === "oneshot"
) {
try {
await runtime.close({
handle,
reason: "oneshot-complete",
});
} catch (error) {
logVerbose(
`acp-manager: ACP oneshot close failed for ${sessionKey}: ${String(error)}`,
);
} finally {
this.clearCachedRuntimeState(sessionKey);
}
}
}
if (retryFreshHandle) {
continue;
}
}
if (retryFreshHandle) {
continue;
}
}
});
},
input.signal,
);
}
private resolveTurnTimeoutMs(params: { cfg: OpenClawConfig; meta: SessionAcpMeta }): number {
@@ -1632,10 +1636,56 @@ export class AcpSessionManager {
signal?: AbortSignal,
): Promise<T> {
const actorKey = normalizeActorKey(sessionKey);
return await this.actorQueue.run(actorKey, async () => {
this.throwIfAborted(signal);
let actorStarted = false;
const queued = this.actorQueue.run(actorKey, async () => {
actorStarted = true;
this.throwIfAborted(signal);
return await op();
});
if (!signal) {
return await queued;
}
return await new Promise<T>((resolve, reject) => {
let settled = false;
const cleanup = () => {
signal.removeEventListener("abort", onAbort);
};
const settleValue = (value: T) => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve(value);
};
const settleError = (error: unknown) => {
if (settled) {
return;
}
settled = true;
cleanup();
reject(error);
};
const onAbort = () => {
if (actorStarted) {
return;
}
try {
this.throwIfAborted(signal);
} catch (error) {
settleError(error);
}
};
signal.addEventListener("abort", onAbort, { once: true });
queued.then(settleValue, settleError);
if (signal.aborted) {
onAbort();
}
});
}
private throwIfAborted(signal?: AbortSignal): void {

View File

@@ -268,6 +268,81 @@ describe("AcpSessionManager", () => {
expect(runtimeState.runTurn).toHaveBeenCalledTimes(2);
});
it("rejects a queued turn promptly when its caller aborts before the actor is free", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
hoisted.readAcpSessionEntryMock.mockReturnValue({
sessionKey: "agent:codex:acp:session-1",
storeSessionKey: "agent:codex:acp:session-1",
acp: readySessionMeta(),
});
let firstTurnStarted = false;
let releaseFirstTurn: (() => void) | undefined;
runtimeState.runTurn.mockImplementation(async function* (input: { requestId: string }) {
if (input.requestId === "r1") {
firstTurnStarted = true;
await new Promise<void>((resolve) => {
releaseFirstTurn = resolve;
});
}
yield { type: "done" as const };
});
const manager = new AcpSessionManager();
const first = manager.runTurn({
cfg: baseCfg,
sessionKey: "agent:codex:acp:session-1",
text: "first",
mode: "prompt",
requestId: "r1",
});
await vi.waitFor(() => {
expect(firstTurnStarted).toBe(true);
});
const abortController = new AbortController();
const second = manager.runTurn({
cfg: baseCfg,
sessionKey: "agent:codex:acp:session-1",
text: "second",
mode: "prompt",
requestId: "r2",
signal: abortController.signal,
});
abortController.abort();
const secondOutcome = await Promise.race([
second.then(
() => ({ status: "resolved" as const }),
(error) => ({ status: "rejected" as const, error }),
),
new Promise<{ status: "pending" }>((resolve) => {
setTimeout(() => resolve({ status: "pending" }), 100);
}),
]);
releaseFirstTurn?.();
await first;
await vi.waitFor(() => {
expect(manager.getObservabilitySnapshot(baseCfg).turns.queueDepth).toBe(0);
});
expect(secondOutcome.status).toBe("rejected");
if (secondOutcome.status !== "rejected") {
return;
}
expect(secondOutcome.error).toBeInstanceOf(AcpRuntimeError);
expect(secondOutcome.error).toMatchObject({
code: "ACP_TURN_FAILED",
message: "ACP operation aborted.",
});
expect(runtimeState.runTurn).toHaveBeenCalledTimes(1);
});
it("times out a hung persistent turn without closing the session and lets queued work continue", async () => {
vi.useFakeTimers();
try {

View File

@@ -189,6 +189,7 @@ export async function tryDispatchAcpReply(params: {
cfg: OpenClawConfig;
dispatcher: ReplyDispatcher;
sessionKey?: string;
abortSignal?: AbortSignal;
inboundAudio: boolean;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
@@ -308,6 +309,7 @@ export async function tryDispatchAcpReply(params: {
attachments: attachments.length > 0 ? attachments : undefined,
mode: "prompt",
requestId: resolveAcpRequestId(params.ctx),
...(params.abortSignal ? { signal: params.abortSignal } : {}),
onEvent: async (event) => await projector.onEvent(event),
});

View File

@@ -980,6 +980,92 @@ describe("dispatchReplyFromConfig", () => {
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("aborts ACP dispatch promptly when the caller abort signal fires", async () => {
setNoAbort();
let releaseTurn: (() => void) | undefined;
const releasePromise = new Promise<void>((resolve) => {
releaseTurn = resolve;
});
const runtime = {
ensureSession: vi.fn(
async (input: { sessionKey: string; mode: string; agent: string }) =>
({
sessionKey: input.sessionKey,
backend: "acpx",
runtimeSessionName: `${input.sessionKey}:${input.mode}`,
}) as { sessionKey: string; backend: string; runtimeSessionName: string },
),
runTurn: vi.fn(async function* (params: { signal?: AbortSignal }) {
await new Promise<void>((resolve) => {
if (params.signal?.aborted) {
resolve();
return;
}
const onAbort = () => resolve();
params.signal?.addEventListener("abort", onAbort, { once: true });
void releasePromise.then(resolve);
});
yield { type: "done" };
}),
cancel: vi.fn(async () => {}),
close: vi.fn(async () => {}),
};
acpMocks.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
storePath: "/tmp/mock-sessions.json",
entry: {},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime,
});
const abortController = new AbortController();
const cfg = {
acp: {
enabled: true,
dispatch: { enabled: true },
},
} as OpenClawConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "write a test",
});
const dispatchPromise = dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyOptions: { abortSignal: abortController.signal },
});
await vi.waitFor(() => {
expect(runtime.runTurn).toHaveBeenCalledTimes(1);
});
abortController.abort();
const outcome = await Promise.race([
dispatchPromise.then(() => "settled" as const),
new Promise<"pending">((resolve) => {
setTimeout(() => resolve("pending"), 100);
}),
]);
releaseTurn?.();
await dispatchPromise;
expect(outcome).toBe("settled");
});
it("posts a one-time resolved-session-id notice in thread after the first ACP turn", async () => {
setNoAbort();
const runtime = createAcpRuntime([{ type: "text_delta", text: "hello" }, { type: "done" }]);

View File

@@ -486,6 +486,7 @@ export async function dispatchReplyFromConfig(params: {
cfg,
dispatcher,
sessionKey: acpDispatchSessionKey,
abortSignal: params.replyOptions?.abortSignal,
inboundAudio,
sessionTtsAuto,
ttsChannel,
@@ -621,6 +622,7 @@ export async function dispatchReplyFromConfig(params: {
cfg,
dispatcher,
sessionKey: acpDispatchSessionKey,
abortSignal: params.replyOptions?.abortSignal,
inboundAudio,
sessionTtsAuto,
ttsChannel,