diff --git a/CHANGELOG.md b/CHANGELOG.md index 69d449f6c1c..66539eaf41d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -233,6 +233,7 @@ Docs: https://docs.openclaw.ai - make `openclaw update status` explicitly say `up to date` when the local version already matches npm latest, while keeping the availability logic unchanged. (#51409) Thanks @dongzhenye. - Android/canvas: recycle captured and scaled snapshot bitmaps so repeated canvas snapshots do not leak native image memory. (#41889) Thanks @Kaneki-x. - Android/theme: switch status bar icon contrast with the active system theme so Android light mode no longer leaves unreadable light icons over the app header. (#51098) Thanks @goweii. +- Discord/ACP: forward worker abort signals into ACP turns so timed-out Discord jobs cancel the running turn instead of silently leaving the bound ACP session working in the background. ### Breaking diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index b4ee4c7d0fb..69eed5d5264 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -609,215 +609,219 @@ export class AcpSessionManager { throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP session key is required."); } await this.evictIdleRuntimeHandles({ cfg: input.cfg }); - await this.withSessionActor(sessionKey, async () => { - const turnStartedAt = Date.now(); - const actorKey = normalizeActorKey(sessionKey); - for (let attempt = 0; attempt < 2; attempt += 1) { - const resolution = this.resolveSession({ - cfg: input.cfg, - sessionKey, - }); - const resolvedMeta = requireReadySessionMeta(resolution); - let runtime: AcpRuntime | undefined; - let handle: AcpRuntimeHandle | undefined; - let meta: SessionAcpMeta | undefined; - let activeTurn: ActiveTurnState | undefined; - let internalAbortController: AbortController | undefined; - let onCallerAbort: (() => void) | undefined; - let activeTurnStarted = false; - let sawTurnOutput = false; - let retryFreshHandle = false; - let skipPostTurnCleanup = false; - try { - const ensured = await this.ensureRuntimeHandle({ + await this.withSessionActor( + sessionKey, + async () => { + const turnStartedAt = Date.now(); + const actorKey = normalizeActorKey(sessionKey); + for (let attempt = 0; attempt < 2; attempt += 1) { + const resolution = this.resolveSession({ cfg: input.cfg, sessionKey, - meta: resolvedMeta, }); - runtime = ensured.runtime; - handle = ensured.handle; - meta = ensured.meta; - await this.applyRuntimeControls({ - sessionKey, - runtime, - handle, - meta, - }); - - await this.setSessionState({ - cfg: input.cfg, - sessionKey, - state: "running", - clearLastError: true, - }); - - internalAbortController = new AbortController(); - onCallerAbort = () => { - internalAbortController?.abort(); - }; - if (input.signal?.aborted) { - internalAbortController.abort(); - } else if (input.signal) { - input.signal.addEventListener("abort", onCallerAbort, { once: true }); - } - - activeTurn = { - runtime, - handle, - abortController: internalAbortController, - }; - this.activeTurnBySession.set(actorKey, activeTurn); - activeTurnStarted = true; - - let streamError: AcpRuntimeError | null = null; - const combinedSignal = - input.signal && typeof AbortSignal.any === "function" - ? AbortSignal.any([input.signal, internalAbortController.signal]) - : internalAbortController.signal; - const eventGate = { open: true }; - const turnPromise = (async () => { - for await (const event of runtime.runTurn({ - handle, - text: input.text, - attachments: input.attachments, - mode: input.mode, - requestId: input.requestId, - signal: combinedSignal, - })) { - if (!eventGate.open) { - continue; - } - if (event.type === "error") { - streamError = new AcpRuntimeError( - normalizeAcpErrorCode(event.code), - event.message?.trim() || "ACP turn failed before completion.", - ); - } else if (event.type === "text_delta" || event.type === "tool_call") { - sawTurnOutput = true; - } - if (input.onEvent) { - await input.onEvent(event); - } - } - if (eventGate.open && streamError) { - throw streamError; - } - })(); - const turnTimeoutMs = this.resolveTurnTimeoutMs({ - cfg: input.cfg, - meta, - }); - const sessionMode = meta.mode; - await this.awaitTurnWithTimeout({ - sessionKey, - turnPromise, - timeoutMs: turnTimeoutMs + ACP_TURN_TIMEOUT_GRACE_MS, - timeoutLabelMs: turnTimeoutMs, - onTimeout: async () => { - eventGate.open = false; - skipPostTurnCleanup = true; - if (!activeTurn) { - return; - } - await this.cleanupTimedOutTurn({ - sessionKey, - activeTurn, - mode: sessionMode, - }); - }, - }); - if (streamError) { - throw streamError; - } - this.recordTurnCompletion({ - startedAt: turnStartedAt, - }); - await this.setSessionState({ - cfg: input.cfg, - sessionKey, - state: "idle", - clearLastError: true, - }); - return; - } catch (error) { - const acpError = toAcpRuntimeError({ - error, - fallbackCode: activeTurnStarted ? "ACP_TURN_FAILED" : "ACP_SESSION_INIT_FAILED", - fallbackMessage: activeTurnStarted - ? "ACP turn failed before completion." - : "Could not initialize ACP session runtime.", - }); - retryFreshHandle = this.shouldRetryTurnWithFreshHandle({ - attempt, - sessionKey, - error: acpError, - sawTurnOutput, - }); - if (retryFreshHandle) { - continue; - } - this.recordTurnCompletion({ - startedAt: turnStartedAt, - errorCode: acpError.code, - }); - await this.setSessionState({ - cfg: input.cfg, - sessionKey, - state: "error", - lastError: acpError.message, - }); - throw acpError; - } finally { - if (input.signal && onCallerAbort) { - input.signal.removeEventListener("abort", onCallerAbort); - } - if (activeTurn && this.activeTurnBySession.get(actorKey) === activeTurn) { - this.activeTurnBySession.delete(actorKey); - } - if ( - !retryFreshHandle && - !skipPostTurnCleanup && - runtime && - handle && - meta && - meta.mode !== "oneshot" - ) { - ({ handle } = await this.reconcileRuntimeSessionIdentifiers({ + const resolvedMeta = requireReadySessionMeta(resolution); + let runtime: AcpRuntime | undefined; + let handle: AcpRuntimeHandle | undefined; + let meta: SessionAcpMeta | undefined; + let activeTurn: ActiveTurnState | undefined; + let internalAbortController: AbortController | undefined; + let onCallerAbort: (() => void) | undefined; + let activeTurnStarted = false; + let sawTurnOutput = false; + let retryFreshHandle = false; + let skipPostTurnCleanup = false; + try { + const ensured = await this.ensureRuntimeHandle({ cfg: input.cfg, + sessionKey, + meta: resolvedMeta, + }); + runtime = ensured.runtime; + handle = ensured.handle; + meta = ensured.meta; + await this.applyRuntimeControls({ sessionKey, runtime, handle, meta, - failOnStatusError: false, - })); - } - if ( - !retryFreshHandle && - !skipPostTurnCleanup && - runtime && - handle && - meta && - meta.mode === "oneshot" - ) { - try { - await runtime.close({ + }); + + await this.setSessionState({ + cfg: input.cfg, + sessionKey, + state: "running", + clearLastError: true, + }); + + internalAbortController = new AbortController(); + onCallerAbort = () => { + internalAbortController?.abort(); + }; + if (input.signal?.aborted) { + internalAbortController.abort(); + } else if (input.signal) { + input.signal.addEventListener("abort", onCallerAbort, { once: true }); + } + + activeTurn = { + runtime, + handle, + abortController: internalAbortController, + }; + this.activeTurnBySession.set(actorKey, activeTurn); + activeTurnStarted = true; + + let streamError: AcpRuntimeError | null = null; + const combinedSignal = + input.signal && typeof AbortSignal.any === "function" + ? AbortSignal.any([input.signal, internalAbortController.signal]) + : internalAbortController.signal; + const eventGate = { open: true }; + const turnPromise = (async () => { + for await (const event of runtime.runTurn({ handle, - reason: "oneshot-complete", - }); - } catch (error) { - logVerbose( - `acp-manager: ACP oneshot close failed for ${sessionKey}: ${String(error)}`, - ); - } finally { - this.clearCachedRuntimeState(sessionKey); + text: input.text, + attachments: input.attachments, + mode: input.mode, + requestId: input.requestId, + signal: combinedSignal, + })) { + if (!eventGate.open) { + continue; + } + if (event.type === "error") { + streamError = new AcpRuntimeError( + normalizeAcpErrorCode(event.code), + event.message?.trim() || "ACP turn failed before completion.", + ); + } else if (event.type === "text_delta" || event.type === "tool_call") { + sawTurnOutput = true; + } + if (input.onEvent) { + await input.onEvent(event); + } + } + if (eventGate.open && streamError) { + throw streamError; + } + })(); + const turnTimeoutMs = this.resolveTurnTimeoutMs({ + cfg: input.cfg, + meta, + }); + const sessionMode = meta.mode; + await this.awaitTurnWithTimeout({ + sessionKey, + turnPromise, + timeoutMs: turnTimeoutMs + ACP_TURN_TIMEOUT_GRACE_MS, + timeoutLabelMs: turnTimeoutMs, + onTimeout: async () => { + eventGate.open = false; + skipPostTurnCleanup = true; + if (!activeTurn) { + return; + } + await this.cleanupTimedOutTurn({ + sessionKey, + activeTurn, + mode: sessionMode, + }); + }, + }); + if (streamError) { + throw streamError; + } + this.recordTurnCompletion({ + startedAt: turnStartedAt, + }); + await this.setSessionState({ + cfg: input.cfg, + sessionKey, + state: "idle", + clearLastError: true, + }); + return; + } catch (error) { + const acpError = toAcpRuntimeError({ + error, + fallbackCode: activeTurnStarted ? "ACP_TURN_FAILED" : "ACP_SESSION_INIT_FAILED", + fallbackMessage: activeTurnStarted + ? "ACP turn failed before completion." + : "Could not initialize ACP session runtime.", + }); + retryFreshHandle = this.shouldRetryTurnWithFreshHandle({ + attempt, + sessionKey, + error: acpError, + sawTurnOutput, + }); + if (retryFreshHandle) { + continue; + } + this.recordTurnCompletion({ + startedAt: turnStartedAt, + errorCode: acpError.code, + }); + await this.setSessionState({ + cfg: input.cfg, + sessionKey, + state: "error", + lastError: acpError.message, + }); + throw acpError; + } finally { + if (input.signal && onCallerAbort) { + input.signal.removeEventListener("abort", onCallerAbort); + } + if (activeTurn && this.activeTurnBySession.get(actorKey) === activeTurn) { + this.activeTurnBySession.delete(actorKey); + } + if ( + !retryFreshHandle && + !skipPostTurnCleanup && + runtime && + handle && + meta && + meta.mode !== "oneshot" + ) { + ({ handle } = await this.reconcileRuntimeSessionIdentifiers({ + cfg: input.cfg, + sessionKey, + runtime, + handle, + meta, + failOnStatusError: false, + })); + } + if ( + !retryFreshHandle && + !skipPostTurnCleanup && + runtime && + handle && + meta && + meta.mode === "oneshot" + ) { + try { + await runtime.close({ + handle, + reason: "oneshot-complete", + }); + } catch (error) { + logVerbose( + `acp-manager: ACP oneshot close failed for ${sessionKey}: ${String(error)}`, + ); + } finally { + this.clearCachedRuntimeState(sessionKey); + } } } + if (retryFreshHandle) { + continue; + } } - if (retryFreshHandle) { - continue; - } - } - }); + }, + input.signal, + ); } private resolveTurnTimeoutMs(params: { cfg: OpenClawConfig; meta: SessionAcpMeta }): number { @@ -1632,10 +1636,56 @@ export class AcpSessionManager { signal?: AbortSignal, ): Promise { const actorKey = normalizeActorKey(sessionKey); - return await this.actorQueue.run(actorKey, async () => { + this.throwIfAborted(signal); + + let actorStarted = false; + const queued = this.actorQueue.run(actorKey, async () => { + actorStarted = true; this.throwIfAborted(signal); return await op(); }); + if (!signal) { + return await queued; + } + + return await new Promise((resolve, reject) => { + let settled = false; + const cleanup = () => { + signal.removeEventListener("abort", onAbort); + }; + const settleValue = (value: T) => { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve(value); + }; + const settleError = (error: unknown) => { + if (settled) { + return; + } + settled = true; + cleanup(); + reject(error); + }; + const onAbort = () => { + if (actorStarted) { + return; + } + try { + this.throwIfAborted(signal); + } catch (error) { + settleError(error); + } + }; + + signal.addEventListener("abort", onAbort, { once: true }); + queued.then(settleValue, settleError); + if (signal.aborted) { + onAbort(); + } + }); } private throwIfAborted(signal?: AbortSignal): void { diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index 5cba6f1b849..8e900ce4800 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -268,6 +268,81 @@ describe("AcpSessionManager", () => { expect(runtimeState.runTurn).toHaveBeenCalledTimes(2); }); + it("rejects a queued turn promptly when its caller aborts before the actor is free", async () => { + const runtimeState = createRuntime(); + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + hoisted.readAcpSessionEntryMock.mockReturnValue({ + sessionKey: "agent:codex:acp:session-1", + storeSessionKey: "agent:codex:acp:session-1", + acp: readySessionMeta(), + }); + + let firstTurnStarted = false; + let releaseFirstTurn: (() => void) | undefined; + runtimeState.runTurn.mockImplementation(async function* (input: { requestId: string }) { + if (input.requestId === "r1") { + firstTurnStarted = true; + await new Promise((resolve) => { + releaseFirstTurn = resolve; + }); + } + yield { type: "done" as const }; + }); + + const manager = new AcpSessionManager(); + const first = manager.runTurn({ + cfg: baseCfg, + sessionKey: "agent:codex:acp:session-1", + text: "first", + mode: "prompt", + requestId: "r1", + }); + await vi.waitFor(() => { + expect(firstTurnStarted).toBe(true); + }); + + const abortController = new AbortController(); + const second = manager.runTurn({ + cfg: baseCfg, + sessionKey: "agent:codex:acp:session-1", + text: "second", + mode: "prompt", + requestId: "r2", + signal: abortController.signal, + }); + abortController.abort(); + + const secondOutcome = await Promise.race([ + second.then( + () => ({ status: "resolved" as const }), + (error) => ({ status: "rejected" as const, error }), + ), + new Promise<{ status: "pending" }>((resolve) => { + setTimeout(() => resolve({ status: "pending" }), 100); + }), + ]); + + releaseFirstTurn?.(); + await first; + await vi.waitFor(() => { + expect(manager.getObservabilitySnapshot(baseCfg).turns.queueDepth).toBe(0); + }); + + expect(secondOutcome.status).toBe("rejected"); + if (secondOutcome.status !== "rejected") { + return; + } + expect(secondOutcome.error).toBeInstanceOf(AcpRuntimeError); + expect(secondOutcome.error).toMatchObject({ + code: "ACP_TURN_FAILED", + message: "ACP operation aborted.", + }); + expect(runtimeState.runTurn).toHaveBeenCalledTimes(1); + }); + it("times out a hung persistent turn without closing the session and lets queued work continue", async () => { vi.useFakeTimers(); try { diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 8fc7110fc4c..4b89e0f8807 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -189,6 +189,7 @@ export async function tryDispatchAcpReply(params: { cfg: OpenClawConfig; dispatcher: ReplyDispatcher; sessionKey?: string; + abortSignal?: AbortSignal; inboundAudio: boolean; sessionTtsAuto?: TtsAutoMode; ttsChannel?: string; @@ -308,6 +309,7 @@ export async function tryDispatchAcpReply(params: { attachments: attachments.length > 0 ? attachments : undefined, mode: "prompt", requestId: resolveAcpRequestId(params.ctx), + ...(params.abortSignal ? { signal: params.abortSignal } : {}), onEvent: async (event) => await projector.onEvent(event), }); diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 162f40613d1..fe359313bcd 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -980,6 +980,92 @@ describe("dispatchReplyFromConfig", () => { expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); + it("aborts ACP dispatch promptly when the caller abort signal fires", async () => { + setNoAbort(); + let releaseTurn: (() => void) | undefined; + const releasePromise = new Promise((resolve) => { + releaseTurn = resolve; + }); + const runtime = { + ensureSession: vi.fn( + async (input: { sessionKey: string; mode: string; agent: string }) => + ({ + sessionKey: input.sessionKey, + backend: "acpx", + runtimeSessionName: `${input.sessionKey}:${input.mode}`, + }) as { sessionKey: string; backend: string; runtimeSessionName: string }, + ), + runTurn: vi.fn(async function* (params: { signal?: AbortSignal }) { + await new Promise((resolve) => { + if (params.signal?.aborted) { + resolve(); + return; + } + const onAbort = () => resolve(); + params.signal?.addEventListener("abort", onAbort, { once: true }); + void releasePromise.then(resolve); + }); + yield { type: "done" }; + }), + cancel: vi.fn(async () => {}), + close: vi.fn(async () => {}), + }; + acpMocks.readAcpSessionEntry.mockReturnValue({ + sessionKey: "agent:codex-acp:session-1", + storeSessionKey: "agent:codex-acp:session-1", + cfg: {}, + storePath: "/tmp/mock-sessions.json", + entry: {}, + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime:1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }); + acpMocks.requireAcpRuntimeBackend.mockReturnValue({ + id: "acpx", + runtime, + }); + + const abortController = new AbortController(); + const cfg = { + acp: { + enabled: true, + dispatch: { enabled: true }, + }, + } as OpenClawConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "write a test", + }); + const dispatchPromise = dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher, + replyOptions: { abortSignal: abortController.signal }, + }); + await vi.waitFor(() => { + expect(runtime.runTurn).toHaveBeenCalledTimes(1); + }); + abortController.abort(); + const outcome = await Promise.race([ + dispatchPromise.then(() => "settled" as const), + new Promise<"pending">((resolve) => { + setTimeout(() => resolve("pending"), 100); + }), + ]); + releaseTurn?.(); + await dispatchPromise; + + expect(outcome).toBe("settled"); + }); + it("posts a one-time resolved-session-id notice in thread after the first ACP turn", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "text_delta", text: "hello" }, { type: "done" }]); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 9f603b30863..341f0d009a3 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -486,6 +486,7 @@ export async function dispatchReplyFromConfig(params: { cfg, dispatcher, sessionKey: acpDispatchSessionKey, + abortSignal: params.replyOptions?.abortSignal, inboundAudio, sessionTtsAuto, ttsChannel, @@ -621,6 +622,7 @@ export async function dispatchReplyFromConfig(params: { cfg, dispatcher, sessionKey: acpDispatchSessionKey, + abortSignal: params.replyOptions?.abortSignal, inboundAudio, sessionTtsAuto, ttsChannel,