From 91f45d9c8a109cbeb5be2dcc9ea8dd6718e59613 Mon Sep 17 00:00:00 2001 From: Zennn <89544177+udaymanish6@users.noreply.github.com> Date: Sat, 16 May 2026 17:39:26 -0400 Subject: [PATCH] fix(gateway): dedupe exec followup continuations (#82717) Co-authored-by: Miya --- CHANGELOG.md | 1 + .../bash-tools.exec-approval-followup.test.ts | 54 +- .../bash-tools.exec-approval-followup.ts | 97 +- src/gateway/server-maintenance.test.ts | 29 + src/gateway/server-maintenance.ts | 26 +- src/gateway/server-methods/agent.test.ts | 233 +++ src/gateway/server-methods/agent.ts | 1534 +++++++++-------- 7 files changed, 1229 insertions(+), 745 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07f152b6958..ed83080a03b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - CLI/config: show concise human config-write output with an indented backup path instead of printing checksum-heavy overwrite audit details by default. - CLI/docs: call the canonical lowercase docs MCP search tool and surface MCP errors instead of returning empty search results. Fixes #82702. (#82704) Thanks @hclsys. - QA-Lab: ignore heartbeat-only operational transcripts when capturing runtime parity cells so background checks cannot replace the scenario reply. (#80323) Thanks @100yenadmin. +- Gateway/exec approvals: wait for accepted async approval follow-up runs instead of direct-fallback sending duplicate completions when retries use different nonce keys. Fixes #82711. (#82717) Thanks @udaymanish6. - CLI/config: add `--dry-run` support to `openclaw config unset`, with `--json` output and allow-exec validation parity with `config set`/`config patch` dry-run handling. (#81895) Thanks @giodl73-repo. - Memory-core: retry disabled dreaming cron cleanup until cron is available after startup, so persisted managed dreaming jobs are removed after restart. Fixes #82383. (#82389) Thanks @neeravmakwana. - Providers/xAI: keep retired Grok 3, Grok 4 Fast, Grok 4.1 Fast, and Grok Code slugs out of model pickers while preserving compatibility resolution for existing configs. diff --git a/src/agents/bash-tools.exec-approval-followup.test.ts b/src/agents/bash-tools.exec-approval-followup.test.ts index da42ba65b09..92c9217f91e 100644 --- a/src/agents/bash-tools.exec-approval-followup.test.ts +++ b/src/agents/bash-tools.exec-approval-followup.test.ts @@ -1,7 +1,7 @@ import { afterEach, describe, expect, it, vi } from "vitest"; vi.mock("./tools/gateway.js", () => ({ - callGatewayTool: vi.fn(async () => ({ ok: true })), + callGatewayTool: vi.fn(async () => ({ status: "ok" })), })); vi.mock("../infra/outbound/message.js", () => ({ @@ -42,7 +42,22 @@ function expectGatewayAgentFollowup(expected: Record) { for (const [key, value] of Object.entries(expected)) { expect(params[key]).toBe(value); } - expect(call[3]).toEqual({ expectFinal: true }); + expect(call[3]).toBeUndefined(); + return params; +} + +function expectGatewayAgentWait(expected: Record) { + const call = (callGatewayTool as { mock?: { calls?: unknown[][] } }).mock?.calls?.[1]; + if (!call) { + throw new Error("expected agent.wait call"); + } + expect(call[0]).toBe("agent.wait"); + requireRecord(call[1], "gateway wait context"); + const params = requireRecord(call[2], "gateway wait params"); + for (const [key, value] of Object.entries(expected)) { + expect(params[key]).toBe(value); + } + expect(call[3]).toBeUndefined(); return params; } @@ -134,6 +149,41 @@ describe("exec approval followup", () => { expect(sendMessage).not.toHaveBeenCalled(); }); + it("waits for accepted agent followups without direct fallback", async () => { + vi.mocked(callGatewayTool) + .mockResolvedValueOnce({ + runId: "exec-approval-followup:req-wait:nonce:nonce-wait", + status: "accepted", + }) + .mockResolvedValueOnce({ + runId: "exec-approval-followup:req-wait:nonce:nonce-wait", + status: "ok", + }); + + await sendExecApprovalFollowup({ + approvalId: "req-wait", + sessionKey: "agent:main:telegram:direct:123", + turnSourceChannel: "telegram", + turnSourceTo: "123", + turnSourceAccountId: "default", + resultText: "Exec finished (gateway id=req-wait, session=sess_1, code 0)\nall good", + idempotencyKey: "exec-approval-followup:req-wait:nonce:nonce-wait", + }); + + expectGatewayAgentFollowup({ + sessionKey: "agent:main:telegram:direct:123", + deliver: true, + channel: "telegram", + to: "123", + idempotencyKey: "exec-approval-followup:req-wait:nonce:nonce-wait", + }); + expectGatewayAgentWait({ + runId: "exec-approval-followup:req-wait:nonce:nonce-wait", + timeoutMs: 60_000, + }); + expect(sendMessage).not.toHaveBeenCalled(); + }); + it("falls back to sanitized direct external delivery only when no session exists", async () => { await sendExecApprovalFollowup({ approvalId: "req-no-session", diff --git a/src/agents/bash-tools.exec-approval-followup.ts b/src/agents/bash-tools.exec-approval-followup.ts index 9e4190bf980..a6b84601ecc 100644 --- a/src/agents/bash-tools.exec-approval-followup.ts +++ b/src/agents/bash-tools.exec-approval-followup.ts @@ -4,7 +4,10 @@ import { } from "../infra/outbound/best-effort-delivery.js"; import { sendMessage } from "../infra/outbound/message.js"; import { isCronSessionKey, isSubagentSessionKey } from "../sessions/session-key-utils.js"; -import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; +import { + normalizeLowercaseStringOrEmpty, + normalizeOptionalString, +} from "../shared/string-coerce.js"; import { isGatewayMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js"; import { buildExecApprovalFollowupIdempotencyKey } from "./bash-tools.exec-approval-followup-state.js"; import { @@ -124,6 +127,51 @@ function buildSessionResumeFallbackPrefix(): string { return "Automatic session resume failed, so sending the status directly.\n\n"; } +function readGatewayStatus(value: unknown): string | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? normalizeOptionalString((value as { status?: unknown }).status) + : undefined; +} + +function readGatewayRunId(value: unknown): string | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? normalizeOptionalString((value as { runId?: unknown }).runId) + : undefined; +} + +function buildFollowupWaitError(params: { status?: string; error?: unknown }): Error { + const suffix = + typeof params.error === "string" && params.error.trim() + ? `: ${params.error.trim()}` + : params.status + ? `: ${params.status}` + : ""; + return new Error(`exec approval followup session resume failed${suffix}`); +} + +function isSuccessfulFollowupStatus(status: string | undefined): boolean { + return status === "ok"; +} + +async function waitForAgentFollowupRun(params: { + runId: string; + timeoutMs: number; +}): Promise { + const wait = await callGatewayTool( + "agent.wait", + { timeoutMs: params.timeoutMs + 2_000 }, + { + runId: params.runId, + timeoutMs: params.timeoutMs, + }, + ); + const status = readGatewayStatus(wait); + if (isSuccessfulFollowupStatus(status)) { + return; + } + throw buildFollowupWaitError({ status, error: wait.error }); +} + function shouldPrefixDirectFollowupWithSessionResumeFailure(params: { resultText: string; sessionError: unknown; @@ -249,25 +297,34 @@ export async function sendExecApprovalFollowup( if (sessionKey && params.direct !== true) { try { - await callGatewayTool( - "agent", - { timeoutMs: 60_000 }, - buildAgentFollowupArgs({ - approvalId: params.approvalId, - sessionKey, - resultText, - deliveryTarget, - sessionOnlyOriginChannel, - turnSourceChannel: params.turnSourceChannel, - turnSourceTo: params.turnSourceTo, - turnSourceAccountId: params.turnSourceAccountId, - turnSourceThreadId: params.turnSourceThreadId, - internalRuntimeHandoffId: params.internalRuntimeHandoffId, - idempotencyKey: params.idempotencyKey, - }), - { expectFinal: true }, - ); - return true; + const agentArgs = buildAgentFollowupArgs({ + approvalId: params.approvalId, + sessionKey, + resultText, + deliveryTarget, + sessionOnlyOriginChannel, + turnSourceChannel: params.turnSourceChannel, + turnSourceTo: params.turnSourceTo, + turnSourceAccountId: params.turnSourceAccountId, + turnSourceThreadId: params.turnSourceThreadId, + internalRuntimeHandoffId: params.internalRuntimeHandoffId, + idempotencyKey: params.idempotencyKey, + }); + const accepted = await callGatewayTool("agent", { timeoutMs: 60_000 }, agentArgs); + const status = readGatewayStatus(accepted); + if (isSuccessfulFollowupStatus(status)) { + return true; + } + if (status === "accepted" || status === "in_flight" || status === "pending") { + const runId = + readGatewayRunId(accepted) ?? normalizeOptionalString(agentArgs.idempotencyKey); + if (!runId) { + throw buildFollowupWaitError({ status: "missing-run-id" }); + } + await waitForAgentFollowupRun({ runId, timeoutMs: 60_000 }); + return true; + } + throw buildFollowupWaitError({ status, error: accepted.error }); } catch (err) { sessionError = err; } diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index c9c6f071fec..3c0c0925069 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -306,6 +306,35 @@ describe("startGatewayMaintenanceTimers", () => { stopMaintenanceTimers(timers); }); + it("keeps active exec approval dedupe aliases past the normal ttl", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const now = Date.now(); + const runId = "exec-approval-followup:req-active:nonce:retry-1"; + deps.chatAbortControllers.set(runId, createActiveRun("agent:main:main", "agent")); + deps.dedupe.set("agent:exec-approval-followup:req-active", { + ts: now - DEDUPE_TTL_MS - 1, + ok: true, + payload: { runId, status: "accepted" }, + }); + deps.dedupe.set("agent:exec-approval-followup:req-stale", { + ts: now - DEDUPE_TTL_MS - 1, + ok: true, + payload: { runId: "exec-approval-followup:req-stale:nonce:retry-1", status: "accepted" }, + }); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.dedupe.has("agent:exec-approval-followup:req-active")).toBe(true); + expect(deps.dedupe.has("agent:exec-approval-followup:req-stale")).toBe(false); + + stopMaintenanceTimers(timers); + }); + it("evicts dedupe overflow by oldest timestamp even after reinsertion", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 3cef87e179a..011bbd9599b 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -88,11 +88,29 @@ export function startGatewayMaintenanceTimers(params: { const dedupeCleanup = setInterval(() => { const AGENT_RUN_SEQ_MAX = 10_000; const now = Date.now(); - const isActiveRunDedupeKey = (key: string) => { + const resolveDedupeRunId = (key: string, entry: DedupeEntry) => { + if (!key.startsWith("agent:") && !key.startsWith("chat:")) { + return undefined; + } + const keyRunId = key.slice(key.indexOf(":") + 1); + if (keyRunId) { + const directEntry = params.chatAbortControllers.get(keyRunId); + if (directEntry) { + return keyRunId; + } + } + const payload = entry.payload; + return payload && typeof payload === "object" && !Array.isArray(payload) + ? typeof (payload as { runId?: unknown }).runId === "string" + ? (payload as { runId: string }).runId.trim() || undefined + : undefined + : undefined; + }; + const isActiveRunDedupeKey = (key: string, dedupeEntry: DedupeEntry) => { if (!key.startsWith("agent:") && !key.startsWith("chat:")) { return false; } - const runId = key.slice(key.indexOf(":") + 1); + const runId = resolveDedupeRunId(key, dedupeEntry); const entry = runId ? params.chatAbortControllers.get(runId) : undefined; if (!entry) { return false; @@ -100,7 +118,7 @@ export function startGatewayMaintenanceTimers(params: { return key.startsWith("agent:") ? entry.kind === "agent" : entry.kind !== "agent"; }; for (const [k, v] of params.dedupe) { - if (isActiveRunDedupeKey(k)) { + if (isActiveRunDedupeKey(k, v)) { continue; } if (now - v.ts > DEDUPE_TTL_MS) { @@ -110,7 +128,7 @@ export function startGatewayMaintenanceTimers(params: { if (params.dedupe.size > DEDUPE_MAX) { const excess = params.dedupe.size - DEDUPE_MAX; const oldestKeys = [...params.dedupe.entries()] - .filter(([key]) => !isActiveRunDedupeKey(key)) + .filter(([key, entry]) => !isActiveRunDedupeKey(key, entry)) .toSorted(([, left], [, right]) => left.ts - right.ts) .slice(0, excess) .map(([key]) => key); diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index e7ff6a7fe72..81ba93bcd72 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -1696,6 +1696,239 @@ describe("gateway agent handler", () => { expect(callArgs.bashElevated).toEqual(bashElevated); }); + it("dedupes elevated exec approval followups across nonce idempotency keys", async () => { + const bashElevated = { + enabled: true, + allowed: true, + defaultLevel: "on" as const, + }; + const firstRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-duplicate", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + const secondRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-duplicate", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + if (!firstRegistration || !secondRegistration) { + throw new Error("expected runtime handoff ids"); + } + mockMainSessionEntry({ + sessionId: "existing-session-id", + lastChannel: "telegram", + lastTo: "123", + }); + mocks.agentCommand.mockImplementation(() => new Promise(() => {})); + const context = makeContext(); + const agentCommandCallsBefore = mocks.agentCommand.mock.calls.length; + + await invokeAgent( + { + message: "exec followup", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: firstRegistration.idempotencyKey, + internalRuntimeHandoffId: firstRegistration.handoffId, + }, + { reqId: "exec-followup-duplicate-1", client: backendGatewayClient(), context }, + ); + expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore + 1); + + const secondRespond = await invokeAgent( + { + message: "exec followup duplicate", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: secondRegistration.idempotencyKey, + internalRuntimeHandoffId: secondRegistration.handoffId, + }, + { + reqId: "exec-followup-duplicate-2", + client: backendGatewayClient(), + context, + flushDispatch: false, + }, + ); + await flushScheduledDispatchStep(); + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore + 1); + expect(mockCallArg(secondRespond, 0, 3)).toEqual({ cached: true }); + }); + + it("reserves exec approval followup dedupe before awaited session work", async () => { + const bashElevated = { + enabled: true, + allowed: true, + defaultLevel: "on" as const, + }; + const firstRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-overlap", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + const secondRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-overlap", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + if (!firstRegistration || !secondRegistration) { + throw new Error("expected runtime handoff ids"); + } + mockMainSessionEntry({ + sessionId: "existing-session-id", + lastChannel: "telegram", + lastTo: "123", + }); + let releaseFirstSessionWrite: (() => void) | undefined; + let sessionWriteCalls = 0; + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + sessionWriteCalls += 1; + if (sessionWriteCalls === 1) { + await new Promise((resolve) => { + releaseFirstSessionWrite = resolve; + }); + } + const store = { + "agent:main:main": buildExistingMainStoreEntry({ + lastChannel: "telegram", + lastTo: "123", + }), + }; + return await updater(store); + }); + mocks.agentCommand.mockImplementation(() => new Promise(() => {})); + const context = makeContext(); + const agentCommandCallsBefore = mocks.agentCommand.mock.calls.length; + + const first = invokeAgent( + { + message: "exec followup", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: firstRegistration.idempotencyKey, + internalRuntimeHandoffId: firstRegistration.handoffId, + }, + { + reqId: "exec-followup-overlap-1", + client: backendGatewayClient(), + context, + flushDispatch: false, + }, + ); + await waitForAssertion(() => expect(sessionWriteCalls).toBe(1)); + + const secondRespond = await invokeAgent( + { + message: "exec followup duplicate", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: secondRegistration.idempotencyKey, + internalRuntimeHandoffId: secondRegistration.handoffId, + }, + { + reqId: "exec-followup-overlap-2", + client: backendGatewayClient(), + context, + flushDispatch: false, + }, + ); + + expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore); + expect(sessionWriteCalls).toBe(1); + expect(mockCallArg(secondRespond, 0, 1)).toMatchObject({ + runId: firstRegistration.idempotencyKey, + status: "accepted", + }); + expect(mockCallArg(secondRespond, 0, 3)).toEqual({ cached: true }); + + releaseFirstSessionWrite?.(); + await first; + await flushScheduledDispatchStep(); + await flushScheduledDispatchStep(); + + expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore + 1); + }); + + it("clears reserved exec approval dedupe when pre-run session work fails", async () => { + const bashElevated = { + enabled: true, + allowed: true, + defaultLevel: "on" as const, + }; + const firstRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-pre-run-fail", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + const secondRegistration = registerExecApprovalFollowupRuntimeHandoff({ + approvalId: "req-elevated-pre-run-fail", + sessionKey: "agent:main:telegram:direct:123", + bashElevated, + }); + if (!firstRegistration || !secondRegistration) { + throw new Error("expected runtime handoff ids"); + } + mockMainSessionEntry({ + sessionId: "existing-session-id", + lastChannel: "telegram", + lastTo: "123", + }); + const context = makeContext(); + const agentCommandCallsBefore = mocks.agentCommand.mock.calls.length; + mocks.updateSessionStore.mockRejectedValueOnce(new Error("session write failed")); + + await expect( + invokeAgent( + { + message: "exec followup", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: firstRegistration.idempotencyKey, + internalRuntimeHandoffId: firstRegistration.handoffId, + }, + { + reqId: "exec-followup-pre-run-fail-1", + client: backendGatewayClient(), + context, + flushDispatch: false, + }, + ), + ).rejects.toThrow("session write failed"); + + expect(context.dedupe.get(`agent:${firstRegistration.idempotencyKey}`)).toBeUndefined(); + expect( + context.dedupe.get("agent:exec-approval-followup:req-elevated-pre-run-fail"), + ).toBeUndefined(); + expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore); + + const secondRespond = await invokeAgent( + { + message: "exec followup retry", + sessionKey: "agent:main:telegram:direct:123", + channel: "telegram", + idempotencyKey: secondRegistration.idempotencyKey, + internalRuntimeHandoffId: secondRegistration.handoffId, + }, + { + reqId: "exec-followup-pre-run-fail-2", + client: backendGatewayClient(), + context, + flushDispatch: false, + }, + ); + + expect(mockCallArg(secondRespond, 0, 1)).toMatchObject({ + runId: secondRegistration.idempotencyKey, + status: "accepted", + }); + await flushScheduledDispatchStep(); + await flushScheduledDispatchStep(); + expect(mocks.agentCommand).toHaveBeenCalledTimes(agentCommandCallsBefore + 1); + }); + it("does not consume exec approval runtime handoffs from non-backend callers", async () => { const bashElevated = { enabled: true, diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index ac99ca7de94..9366926e16a 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -421,10 +421,58 @@ function tryFinalizeTrackedAgentTask(params: { } } +function resolveAgentDedupeKeys(params: { + idempotencyKey: string; + execApprovalFollowupApprovalId?: string; +}): string[] { + const keys = [`agent:${params.idempotencyKey}`]; + const approvalId = params.execApprovalFollowupApprovalId?.trim(); + if (approvalId) { + keys.push(`agent:exec-approval-followup:${approvalId}`); + } + return [...new Set(keys)]; +} + +function readGatewayDedupeEntry(params: { + dedupe: GatewayRequestContext["dedupe"]; + keys: readonly string[]; +}) { + for (const key of params.keys) { + const entry = params.dedupe.get(key); + if (entry) { + return entry; + } + } + return undefined; +} + +function setGatewayDedupeEntries(params: { + dedupe: GatewayRequestContext["dedupe"]; + keys: readonly string[]; + entry: Parameters[0]["entry"]; +}) { + for (const key of params.keys) { + setGatewayDedupeEntry({ + dedupe: params.dedupe, + key, + entry: params.entry, + }); + } +} + +function deleteGatewayDedupeEntries(params: { + dedupe: GatewayRequestContext["dedupe"]; + keys: readonly string[]; +}) { + for (const key of params.keys) { + params.dedupe.delete(key); + } +} + function dispatchAgentRunFromGateway(params: { ingressOpts: Parameters[0]; runId: string; - idempotencyKey: string; + dedupeKeys: readonly string[]; /** * Controller whose signal is wired into `ingressOpts.abortSignal`. Used on * completion to drop the matching `chatAbortControllers` entry without @@ -477,9 +525,9 @@ function dispatchAgentRunFromGateway(params: { ...(aborted ? { stopReason: result?.meta?.stopReason ?? "rpc" } : {}), result, }; - setGatewayDedupeEntry({ + setGatewayDedupeEntries({ dedupe: params.context.dedupe, - key: `agent:${params.idempotencyKey}`, + keys: params.dedupeKeys, entry: { ts: Date.now(), ok: true, @@ -508,9 +556,9 @@ function dispatchAgentRunFromGateway(params: { summary: aborted ? "aborted" : renderedErr, ...(aborted ? { stopReason: "rpc" } : {}), }; - setGatewayDedupeEntry({ + setGatewayDedupeEntries({ dedupe: params.context.dedupe, - key: `agent:${params.idempotencyKey}`, + keys: params.dedupeKeys, entry: { ts: Date.now(), ok: aborted, @@ -634,6 +682,7 @@ export const agentHandlers: GatewayRequestHandlers = { const modelOverride = allowModelOverride ? request.model : undefined; const cfg = context.getRuntimeConfig(); const idem = request.idempotencyKey; + const runId = idem; const execApprovalFollowupApprovalId = parseExecApprovalFollowupApprovalId(idem); if (execApprovalFollowupApprovalId && !canUseInternalRuntimeHandoff) { respond( @@ -656,13 +705,51 @@ export const agentHandlers: GatewayRequestHandlers = { let resolvedGroupSpace: string | undefined = normalizedSpawned.groupSpace; let spawnedByValue: string | undefined; const inputProvenance = normalizeInputProvenance(request.inputProvenance); - const cached = context.dedupe.get(`agent:${idem}`); + const agentDedupeKeys = resolveAgentDedupeKeys({ + idempotencyKey: idem, + execApprovalFollowupApprovalId, + }); + const cached = readGatewayDedupeEntry({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + }); if (cached) { respond(cached.ok, cached.payload, cached.error, { cached: true, }); return; } + let agentDedupeReserved = false; + let agentRunAccepted = false; + const reserveExecApprovalFollowupDedupe = () => { + if (agentDedupeReserved || !execApprovalFollowupApprovalId) { + return; + } + setGatewayDedupeEntries({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + entry: { + ts: Date.now(), + ok: true, + payload: { + runId, + status: "accepted" as const, + acceptedAt: Date.now(), + }, + }, + }); + agentDedupeReserved = true; + }; + const clearUnacceptedExecApprovalFollowupDedupe = () => { + if (!agentDedupeReserved || agentRunAccepted) { + return; + } + deleteGatewayDedupeEntries({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + }); + agentDedupeReserved = false; + }; const normalizedAttachments = normalizeRpcAttachmentsToChatAttachments(request.attachments); const requestedBestEffortDeliver = typeof request.bestEffortDeliver === "boolean" ? request.bestEffortDeliver : undefined; @@ -805,769 +892,778 @@ export const agentHandlers: GatewayRequestHandlers = { return; } } - const voiceWakeTrigger = normalizeOptionalString(request.voiceWakeTrigger) ?? ""; - const replyTo = normalizeOptionalString(request.replyTo) ?? ""; - const to = normalizeOptionalString(request.to) ?? ""; - const explicitVoiceWakeSessionTarget = - !agentId && requestedSessionKeyRaw - ? (() => { - const { cfg: sessionCfg, canonicalKey } = loadSessionEntry(requestedSessionKeyRaw); - const routedAgentId = resolveAgentIdFromSessionKey(canonicalKey); - const defaultAgentId = normalizeAgentId(resolveDefaultAgentId(sessionCfg)); - if (routedAgentId !== defaultAgentId) { - return true; - } - const mainSessionKey = resolveAgentMainSessionKey({ - cfg: sessionCfg, - agentId: routedAgentId, - }); - return canonicalKey !== mainSessionKey; - })() - : false; - const canAutoRouteVoiceWake = - !agentId && !explicitVoiceWakeSessionTarget && !requestedSessionId && !replyTo && !to; - const hasVoiceWakeTriggerField = Object.prototype.hasOwnProperty.call( - request, - "voiceWakeTrigger", - ); - if (hasVoiceWakeTriggerField && canAutoRouteVoiceWake) { - try { - const routingConfig = await loadVoiceWakeRoutingConfig(); - const route = resolveVoiceWakeRouteByTrigger({ - trigger: voiceWakeTrigger || undefined, - config: routingConfig, - }); - if ("agentId" in route) { - if (knownAgents.includes(route.agentId)) { - agentId = route.agentId; - requestedSessionKey = resolveExplicitAgentSessionKey({ - cfg, - agentId, - }); - } else { - context.logGateway.warn( - `voicewake routing ignored unknown agentId="${route.agentId}" trigger="${voiceWakeTrigger}"`, - ); - } - } else if ("sessionKey" in route) { - if (classifySessionKeyShape(route.sessionKey) !== "malformed_agent") { - const canonicalRouteSession = loadSessionEntry(route.sessionKey).canonicalKey; - const routedAgentId = resolveAgentIdFromSessionKey(canonicalRouteSession); - if (knownAgents.includes(routedAgentId)) { - requestedSessionKey = canonicalRouteSession; - agentId = routedAgentId; + // Exec approval followups can retry with a fresh nonce for the same approval id. + // Reserve the stable alias before awaited session/delivery work so overlaps dedupe. + reserveExecApprovalFollowupDedupe(); + try { + const voiceWakeTrigger = normalizeOptionalString(request.voiceWakeTrigger) ?? ""; + const replyTo = normalizeOptionalString(request.replyTo) ?? ""; + const to = normalizeOptionalString(request.to) ?? ""; + const explicitVoiceWakeSessionTarget = + !agentId && requestedSessionKeyRaw + ? (() => { + const { cfg: sessionCfg, canonicalKey } = loadSessionEntry(requestedSessionKeyRaw); + const routedAgentId = resolveAgentIdFromSessionKey(canonicalKey); + const defaultAgentId = normalizeAgentId(resolveDefaultAgentId(sessionCfg)); + if (routedAgentId !== defaultAgentId) { + return true; + } + const mainSessionKey = resolveAgentMainSessionKey({ + cfg: sessionCfg, + agentId: routedAgentId, + }); + return canonicalKey !== mainSessionKey; + })() + : false; + const canAutoRouteVoiceWake = + !agentId && !explicitVoiceWakeSessionTarget && !requestedSessionId && !replyTo && !to; + const hasVoiceWakeTriggerField = Object.prototype.hasOwnProperty.call( + request, + "voiceWakeTrigger", + ); + if (hasVoiceWakeTriggerField && canAutoRouteVoiceWake) { + try { + const routingConfig = await loadVoiceWakeRoutingConfig(); + const route = resolveVoiceWakeRouteByTrigger({ + trigger: voiceWakeTrigger || undefined, + config: routingConfig, + }); + if ("agentId" in route) { + if (knownAgents.includes(route.agentId)) { + agentId = route.agentId; + requestedSessionKey = resolveExplicitAgentSessionKey({ + cfg, + agentId, + }); } else { context.logGateway.warn( - `voicewake routing ignored unknown session agent="${routedAgentId}" sessionKey="${canonicalRouteSession}" trigger="${voiceWakeTrigger}"`, + `voicewake routing ignored unknown agentId="${route.agentId}" trigger="${voiceWakeTrigger}"`, + ); + } + } else if ("sessionKey" in route) { + if (classifySessionKeyShape(route.sessionKey) !== "malformed_agent") { + const canonicalRouteSession = loadSessionEntry(route.sessionKey).canonicalKey; + const routedAgentId = resolveAgentIdFromSessionKey(canonicalRouteSession); + if (knownAgents.includes(routedAgentId)) { + requestedSessionKey = canonicalRouteSession; + agentId = routedAgentId; + } else { + context.logGateway.warn( + `voicewake routing ignored unknown session agent="${routedAgentId}" sessionKey="${canonicalRouteSession}" trigger="${voiceWakeTrigger}"`, + ); + } + } else { + context.logGateway.warn( + `voicewake routing ignored malformed sessionKey="${route.sessionKey}" trigger="${voiceWakeTrigger}"`, ); } - } else { - context.logGateway.warn( - `voicewake routing ignored malformed sessionKey="${route.sessionKey}" trigger="${voiceWakeTrigger}"`, - ); } - } - } catch (err) { - context.logGateway.warn(`voicewake routing load failed: ${formatForLog(err)}`); - } - } - let resolvedSessionId = requestedSessionId; - let sessionEntry: SessionEntry | undefined; - let bestEffortDeliver = requestedBestEffortDeliver ?? false; - let cfgForAgent: OpenClawConfig | undefined; - let resolvedSessionKey = requestedSessionKey; - let isNewSession = false; - let skipTimestampInjection = false; - let shouldPrependStartupContext = false; - - const resetCommandMatch = message.match(RESET_COMMAND_RE); - if (resetCommandMatch && requestedSessionKey) { - if (!canResetSession) { - respond( - false, - undefined, - errorShape(ErrorCodes.INVALID_REQUEST, `missing scope: ${ADMIN_SCOPE}`), - ); - return; - } - const resetReason = - normalizeOptionalLowercaseString(resetCommandMatch[1]) === "new" ? "new" : "reset"; - const resetResult = await runSessionResetFromAgent({ - key: requestedSessionKey, - reason: resetReason, - }); - if (!resetResult.ok) { - respond(false, undefined, resetResult.error); - return; - } - requestedSessionKey = resetResult.key; - resolvedSessionId = resetResult.sessionId ?? resolvedSessionId; - const postResetMessage = normalizeOptionalString(resetCommandMatch[2]) ?? ""; - if (postResetMessage) { - message = postResetMessage; - } else { - const resetLoadedSession = loadSessionEntry(requestedSessionKey); - const resetCfg = resetLoadedSession?.cfg ?? cfg; - const resetSessionEntry = resetLoadedSession?.entry; - const resetSpawnedBy = canonicalizeSpawnedByForAgent( - resetCfg, - resolveAgentIdFromSessionKey(requestedSessionKey), - resetSessionEntry?.spawnedBy, - ); - const { runtimeWorkspaceDir, isCanonicalWorkspace } = resolveSessionRuntimeWorkspace({ - cfg: resetCfg, - sessionKey: requestedSessionKey, - sessionEntry: resetSessionEntry, - spawnedBy: resetSpawnedBy, - }); - const resetSessionAgentId = resolveAgentIdFromSessionKey(requestedSessionKey); - const resetBaseModelRef = resolveSessionModelRef( - resetCfg, - resetSessionEntry, - resetSessionAgentId, - ); - const resetEffectiveModelRef = { - provider: providerOverride || resetBaseModelRef.provider, - model: modelOverride || resetBaseModelRef.model, - }; - const bareResetPromptState = await resolveBareSessionResetPromptState({ - cfg: resetCfg, - workspaceDir: runtimeWorkspaceDir, - isPrimaryRun: - !isSubagentSessionKey(requestedSessionKey) && !isAcpSessionKey(requestedSessionKey), - isCanonicalWorkspace, - hasBootstrapFileAccess: resolveBareResetBootstrapFileAccess({ - cfg: resetCfg, - agentId: resetSessionAgentId, - sessionKey: requestedSessionKey, - workspaceDir: runtimeWorkspaceDir, - modelProvider: resetEffectiveModelRef.provider, - modelId: resetEffectiveModelRef.model, - }), - }); - // Keep bare /new and /reset behavior aligned with chat.send: - // reset first, then run a fresh-session greeting prompt in-place. - // Date is embedded in the prompt so agents read the correct daily - // memory files; skip further timestamp injection to avoid duplication. - message = bareResetPromptState.prompt; - skipTimestampInjection = true; - shouldPrependStartupContext = - bareResetPromptState.shouldPrependStartupContext && - shouldApplyStartupContext({ cfg, action: resetReason }); - } - } - - // Inject timestamp into user-authored messages that don't already have one. - // Channel messages (Discord, Telegram, etc.) get timestamps via envelope - // formatting in a separate code path — they never reach this handler. - // See: https://github.com/openclaw/openclaw/issues/3658 - if (!skipTimestampInjection && !isRawModelRun && inputProvenance?.kind !== "inter_session") { - message = injectTimestamp(message, timestampOptsFromConfig(cfg)); - } - - if (requestedSessionKey) { - const { cfg, storePath, entry, canonicalKey } = loadSessionEntry(requestedSessionKey); - cfgForAgent = cfg; - const now = Date.now(); - const resetPolicy = resolveSessionResetPolicy({ - sessionCfg: cfg.session, - resetType: resolveSessionResetType({ sessionKey: canonicalKey }), - resetOverride: resolveChannelResetConfig({ - sessionCfg: cfg.session, - channel: entry?.lastChannel ?? entry?.channel ?? request.channel, - }), - }); - const freshness = entry - ? evaluateSessionFreshness({ - updatedAt: entry.updatedAt, - ...resolveSessionLifecycleTimestamps({ - entry, - storePath, - agentId: resolveAgentIdFromSessionKey(canonicalKey), - }), - now, - policy: resetPolicy, - }) - : undefined; - const canReuseSession = Boolean(entry?.sessionId) && (freshness?.fresh ?? false); - const usableRequestedSessionId = - requestedSessionId && (!entry?.sessionId || canReuseSession) - ? requestedSessionId - : undefined; - const sessionId = usableRequestedSessionId - ? usableRequestedSessionId - : ((canReuseSession ? entry?.sessionId : undefined) ?? randomUUID()); - isNewSession = - !entry || - (!canReuseSession && !usableRequestedSessionId) || - Boolean(usableRequestedSessionId && entry?.sessionId !== usableRequestedSessionId); - const touchInteraction = - request.bootstrapContextRunKind !== "cron" && - request.bootstrapContextRunKind !== "heartbeat" && - !request.internalEvents?.length; - const labelValue = normalizeOptionalString(request.label) || entry?.label; - const pluginOwnerId = - entry === undefined - ? normalizeOptionalString(client?.internal?.pluginRuntimeOwnerId) - : normalizeOptionalString(entry.pluginOwnerId); - const sessionAgent = resolveAgentIdFromSessionKey(canonicalKey); - spawnedByValue = canonicalizeSpawnedByForAgent(cfg, sessionAgent, entry?.spawnedBy); - const storedGroup = normalizeTrustedGroupMetadata(entry); - let inheritedGroup: TrustedGroupMetadata | undefined; - if ( - spawnedByValue && - (!storedGroup.groupId || !storedGroup.groupChannel || !storedGroup.groupSpace) - ) { - try { - const parentEntry = loadSessionEntry(spawnedByValue)?.entry; - inheritedGroup = normalizeTrustedGroupMetadata({ - groupId: parentEntry?.groupId, - groupChannel: parentEntry?.groupChannel, - groupSpace: parentEntry?.space, - }); - } catch { - inheritedGroup = undefined; + } catch (err) { + context.logGateway.warn(`voicewake routing load failed: ${formatForLog(err)}`); } } - const trustedGroup = resolveTrustedGroupMetadata({ - sessionKey: canonicalKey, - spawnedBy: spawnedByValue, - stored: storedGroup, - inherited: inheritedGroup, - }); - const validatedGroup = trustedGroup.groupId - ? resolveTrustedGroupId({ - groupId: trustedGroup.groupId, - sessionKey: canonicalKey, - spawnedBy: spawnedByValue, - }) - : undefined; - if (validatedGroup?.dropped) { - resolvedGroupId = undefined; - resolvedGroupChannel = undefined; - resolvedGroupSpace = undefined; - } else { - const trustRequestSelectors = - Boolean(trustedGroup.groupId) && - requestGroupMatchesTrusted({ - requestGroupId: normalizedSpawned.groupId, - trustedGroupId: trustedGroup.groupId, - }); - resolvedGroupId = trustedGroup.groupId; - resolvedGroupChannel = - trustedGroup.groupChannel ?? - (trustRequestSelectors ? normalizedSpawned.groupChannel : undefined); - resolvedGroupSpace = - trustedGroup.groupSpace ?? - (trustRequestSelectors ? normalizedSpawned.groupSpace : undefined); - } - const deliveryFields = normalizeSessionDeliveryFields(entry); - // When the session has no delivery context yet (e.g. a freshly-spawned subagent - // with deliver: false), seed it from the request's channel/to/threadId params. - // Without this, subagent sessions end up with a channel-only deliveryContext - // and no `to`/`threadId`, which causes announce delivery to either target the - // wrong channel (when the parent's lastTo drifts) or fail entirely. - const requestDeliveryHint = normalizeDeliveryContext({ - channel: request.channel?.trim(), - to: request.to?.trim(), - accountId: request.accountId?.trim(), - // Pass threadId directly — normalizeDeliveryContext handles both - // string and numeric threadIds (e.g., Matrix uses integers). - threadId: request.threadId, - }); - const effectiveDelivery = mergeDeliveryContext( - deliveryFields.deliveryContext, - requestDeliveryHint, - ); - const effectiveDeliveryFields = normalizeSessionDeliveryFields({ - deliveryContext: effectiveDelivery, - }); - const nextEntryPatch: SessionEntry = { - sessionId, - updatedAt: now, - sessionStartedAt: isNewSession - ? now - : (entry?.sessionStartedAt ?? - resolveSessionLifecycleTimestamps({ - entry, - storePath, - agentId: resolveAgentIdFromSessionKey(canonicalKey), - }).sessionStartedAt), - lastInteractionAt: touchInteraction ? now : entry?.lastInteractionAt, - thinkingLevel: entry?.thinkingLevel, - fastMode: entry?.fastMode, - verboseLevel: entry?.verboseLevel, - traceLevel: entry?.traceLevel, - reasoningLevel: entry?.reasoningLevel, - systemSent: entry?.systemSent, - sendPolicy: entry?.sendPolicy, - skillsSnapshot: entry?.skillsSnapshot, - deliveryContext: effectiveDeliveryFields.deliveryContext, - lastChannel: effectiveDeliveryFields.lastChannel ?? entry?.lastChannel, - lastTo: effectiveDeliveryFields.lastTo ?? entry?.lastTo, - lastAccountId: effectiveDeliveryFields.lastAccountId ?? entry?.lastAccountId, - lastThreadId: effectiveDeliveryFields.lastThreadId ?? entry?.lastThreadId, - modelOverride: entry?.modelOverride, - providerOverride: entry?.providerOverride, - label: labelValue, - spawnedBy: spawnedByValue, - spawnedWorkspaceDir: entry?.spawnedWorkspaceDir, - spawnDepth: entry?.spawnDepth, - channel: entry?.channel ?? request.channel?.trim(), - groupId: resolvedGroupId, - groupChannel: resolvedGroupChannel, - space: resolvedGroupSpace, - ...(pluginOwnerId ? { pluginOwnerId } : {}), - sessionFile: - entry?.sessionId && entry.sessionId !== sessionId ? undefined : entry?.sessionFile, - cliSessionIds: entry?.cliSessionIds, - cliSessionBindings: entry?.cliSessionBindings, - claudeCliSessionId: entry?.claudeCliSessionId, - }; - sessionEntry = mergeSessionEntry(entry, nextEntryPatch); - if (request.deliver === true) { - const sendPolicy = resolveSendPolicy({ - cfg, - entry: sessionEntry, - sessionKey: canonicalKey, - channel: sessionEntry?.channel, - chatType: sessionEntry?.chatType, - }); - if (sendPolicy === "deny") { + let resolvedSessionId = requestedSessionId; + let sessionEntry: SessionEntry | undefined; + let bestEffortDeliver = requestedBestEffortDeliver ?? false; + let cfgForAgent: OpenClawConfig | undefined; + let resolvedSessionKey = requestedSessionKey; + let isNewSession = false; + let skipTimestampInjection = false; + let shouldPrependStartupContext = false; + + const resetCommandMatch = message.match(RESET_COMMAND_RE); + if (resetCommandMatch && requestedSessionKey) { + if (!canResetSession) { respond( false, undefined, - errorShape(ErrorCodes.INVALID_REQUEST, "send blocked by session policy"), + errorShape(ErrorCodes.INVALID_REQUEST, `missing scope: ${ADMIN_SCOPE}`), ); return; } - } - resolvedSessionId = sessionId; - const canonicalSessionKey = canonicalKey; - resolvedSessionKey = canonicalSessionKey; - const agentId = resolveAgentIdFromSessionKey(canonicalSessionKey); - const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); - if (storePath) { - const persisted = await updateSessionStore(storePath, (store) => { - const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({ - cfg, - key: requestedSessionKey, - store, + const resetReason = + normalizeOptionalLowercaseString(resetCommandMatch[1]) === "new" ? "new" : "reset"; + const resetResult = await runSessionResetFromAgent({ + key: requestedSessionKey, + reason: resetReason, + }); + if (!resetResult.ok) { + respond(false, undefined, resetResult.error); + return; + } + requestedSessionKey = resetResult.key; + resolvedSessionId = resetResult.sessionId ?? resolvedSessionId; + const postResetMessage = normalizeOptionalString(resetCommandMatch[2]) ?? ""; + if (postResetMessage) { + message = postResetMessage; + } else { + const resetLoadedSession = loadSessionEntry(requestedSessionKey); + const resetCfg = resetLoadedSession?.cfg ?? cfg; + const resetSessionEntry = resetLoadedSession?.entry; + const resetSpawnedBy = canonicalizeSpawnedByForAgent( + resetCfg, + resolveAgentIdFromSessionKey(requestedSessionKey), + resetSessionEntry?.spawnedBy, + ); + const { runtimeWorkspaceDir, isCanonicalWorkspace } = resolveSessionRuntimeWorkspace({ + cfg: resetCfg, + sessionKey: requestedSessionKey, + sessionEntry: resetSessionEntry, + spawnedBy: resetSpawnedBy, }); - const merged = mergeSessionEntry(store[primaryKey], nextEntryPatch); - store[primaryKey] = merged; - return merged; - }); - sessionEntry = persisted; - } - if (canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global") { - context.addChatRun(idem, { - sessionKey: canonicalSessionKey, - clientRunId: idem, - }); - if (requestedBestEffortDeliver === undefined) { - bestEffortDeliver = true; + const resetSessionAgentId = resolveAgentIdFromSessionKey(requestedSessionKey); + const resetBaseModelRef = resolveSessionModelRef( + resetCfg, + resetSessionEntry, + resetSessionAgentId, + ); + const resetEffectiveModelRef = { + provider: providerOverride || resetBaseModelRef.provider, + model: modelOverride || resetBaseModelRef.model, + }; + const bareResetPromptState = await resolveBareSessionResetPromptState({ + cfg: resetCfg, + workspaceDir: runtimeWorkspaceDir, + isPrimaryRun: + !isSubagentSessionKey(requestedSessionKey) && !isAcpSessionKey(requestedSessionKey), + isCanonicalWorkspace, + hasBootstrapFileAccess: resolveBareResetBootstrapFileAccess({ + cfg: resetCfg, + agentId: resetSessionAgentId, + sessionKey: requestedSessionKey, + workspaceDir: runtimeWorkspaceDir, + modelProvider: resetEffectiveModelRef.provider, + modelId: resetEffectiveModelRef.model, + }), + }); + // Keep bare /new and /reset behavior aligned with chat.send: + // reset first, then run a fresh-session greeting prompt in-place. + // Date is embedded in the prompt so agents read the correct daily + // memory files; skip further timestamp injection to avoid duplication. + message = bareResetPromptState.prompt; + skipTimestampInjection = true; + shouldPrependStartupContext = + bareResetPromptState.shouldPrependStartupContext && + shouldApplyStartupContext({ cfg, action: resetReason }); } } - registerAgentRunContext(idem, { sessionKey: canonicalSessionKey }); - } - const runId = idem; - const connId = typeof client?.connId === "string" ? client.connId : undefined; - const wantsToolEvents = hasGatewayClientCap( - client?.connect?.caps, - GATEWAY_CLIENT_CAPS.TOOL_EVENTS, - ); - if (connId && wantsToolEvents) { - context.registerToolEventRecipient(runId, connId); - // Register for any other active runs *in the same session* so - // late-joining clients (e.g. page refresh mid-response) receive - // in-progress tool events without leaking cross-session data. - for (const [activeRunId, active] of context.chatAbortControllers) { - if (activeRunId !== runId && active.sessionKey === requestedSessionKey) { - context.registerToolEventRecipient(activeRunId, connId); - } + // Inject timestamp into user-authored messages that don't already have one. + // Channel messages (Discord, Telegram, etc.) get timestamps via envelope + // formatting in a separate code path — they never reach this handler. + // See: https://github.com/openclaw/openclaw/issues/3658 + if (!skipTimestampInjection && !isRawModelRun && inputProvenance?.kind !== "inter_session") { + message = injectTimestamp(message, timestampOptsFromConfig(cfg)); } - } - const wantsDelivery = request.deliver === true; - const explicitTo = - normalizeOptionalString(request.replyTo) ?? normalizeOptionalString(request.to); - const explicitThreadId = normalizeOptionalString(request.threadId); - const turnSourceChannel = normalizeOptionalString(request.channel); - const turnSourceTo = normalizeOptionalString(request.to); - const turnSourceAccountId = normalizeOptionalString(request.accountId); - const deliveryPlan = resolveAgentDeliveryPlan({ - sessionEntry, - requestedChannel: request.replyChannel ?? request.channel, - explicitTo, - explicitThreadId, - accountId: request.replyAccountId ?? request.accountId, - wantsDelivery, - turnSourceChannel, - turnSourceTo, - turnSourceAccountId, - turnSourceThreadId: explicitThreadId, - }); - - let resolvedChannel = deliveryPlan.resolvedChannel; - let deliveryTargetMode = deliveryPlan.deliveryTargetMode; - let resolvedAccountId = deliveryPlan.resolvedAccountId; - let resolvedTo = deliveryPlan.resolvedTo; - let effectivePlan = deliveryPlan; - let deliveryDowngradeReason: string | null = null; - let deliveryTargetResolutionError: Error | undefined; - - if (wantsDelivery && resolvedChannel === INTERNAL_MESSAGE_CHANNEL) { - const cfgResolved = cfgForAgent ?? cfg; - try { - const selection = await resolveMessageChannelSelection({ cfg: cfgResolved }); - resolvedChannel = selection.channel; - deliveryTargetMode = deliveryTargetMode ?? "implicit"; - effectivePlan = { - ...deliveryPlan, - resolvedChannel, - deliveryTargetMode, - resolvedAccountId, + if (requestedSessionKey) { + const { cfg, storePath, entry, canonicalKey } = loadSessionEntry(requestedSessionKey); + cfgForAgent = cfg; + const now = Date.now(); + const resetPolicy = resolveSessionResetPolicy({ + sessionCfg: cfg.session, + resetType: resolveSessionResetType({ sessionKey: canonicalKey }), + resetOverride: resolveChannelResetConfig({ + sessionCfg: cfg.session, + channel: entry?.lastChannel ?? entry?.channel ?? request.channel, + }), + }); + const freshness = entry + ? evaluateSessionFreshness({ + updatedAt: entry.updatedAt, + ...resolveSessionLifecycleTimestamps({ + entry, + storePath, + agentId: resolveAgentIdFromSessionKey(canonicalKey), + }), + now, + policy: resetPolicy, + }) + : undefined; + const canReuseSession = Boolean(entry?.sessionId) && (freshness?.fresh ?? false); + const usableRequestedSessionId = + requestedSessionId && (!entry?.sessionId || canReuseSession) + ? requestedSessionId + : undefined; + const sessionId = usableRequestedSessionId + ? usableRequestedSessionId + : ((canReuseSession ? entry?.sessionId : undefined) ?? randomUUID()); + isNewSession = + !entry || + (!canReuseSession && !usableRequestedSessionId) || + Boolean(usableRequestedSessionId && entry?.sessionId !== usableRequestedSessionId); + const touchInteraction = + request.bootstrapContextRunKind !== "cron" && + request.bootstrapContextRunKind !== "heartbeat" && + !request.internalEvents?.length; + const labelValue = normalizeOptionalString(request.label) || entry?.label; + const pluginOwnerId = + entry === undefined + ? normalizeOptionalString(client?.internal?.pluginRuntimeOwnerId) + : normalizeOptionalString(entry.pluginOwnerId); + const sessionAgent = resolveAgentIdFromSessionKey(canonicalKey); + spawnedByValue = canonicalizeSpawnedByForAgent(cfg, sessionAgent, entry?.spawnedBy); + const storedGroup = normalizeTrustedGroupMetadata(entry); + let inheritedGroup: TrustedGroupMetadata | undefined; + if ( + spawnedByValue && + (!storedGroup.groupId || !storedGroup.groupChannel || !storedGroup.groupSpace) + ) { + try { + const parentEntry = loadSessionEntry(spawnedByValue)?.entry; + inheritedGroup = normalizeTrustedGroupMetadata({ + groupId: parentEntry?.groupId, + groupChannel: parentEntry?.groupChannel, + groupSpace: parentEntry?.space, + }); + } catch { + inheritedGroup = undefined; + } + } + const trustedGroup = resolveTrustedGroupMetadata({ + sessionKey: canonicalKey, + spawnedBy: spawnedByValue, + stored: storedGroup, + inherited: inheritedGroup, + }); + const validatedGroup = trustedGroup.groupId + ? resolveTrustedGroupId({ + groupId: trustedGroup.groupId, + sessionKey: canonicalKey, + spawnedBy: spawnedByValue, + }) + : undefined; + if (validatedGroup?.dropped) { + resolvedGroupId = undefined; + resolvedGroupChannel = undefined; + resolvedGroupSpace = undefined; + } else { + const trustRequestSelectors = + Boolean(trustedGroup.groupId) && + requestGroupMatchesTrusted({ + requestGroupId: normalizedSpawned.groupId, + trustedGroupId: trustedGroup.groupId, + }); + resolvedGroupId = trustedGroup.groupId; + resolvedGroupChannel = + trustedGroup.groupChannel ?? + (trustRequestSelectors ? normalizedSpawned.groupChannel : undefined); + resolvedGroupSpace = + trustedGroup.groupSpace ?? + (trustRequestSelectors ? normalizedSpawned.groupSpace : undefined); + } + const deliveryFields = normalizeSessionDeliveryFields(entry); + // When the session has no delivery context yet (e.g. a freshly-spawned subagent + // with deliver: false), seed it from the request's channel/to/threadId params. + // Without this, subagent sessions end up with a channel-only deliveryContext + // and no `to`/`threadId`, which causes announce delivery to either target the + // wrong channel (when the parent's lastTo drifts) or fail entirely. + const requestDeliveryHint = normalizeDeliveryContext({ + channel: request.channel?.trim(), + to: request.to?.trim(), + accountId: request.accountId?.trim(), + // Pass threadId directly — normalizeDeliveryContext handles both + // string and numeric threadIds (e.g., Matrix uses integers). + threadId: request.threadId, + }); + const effectiveDelivery = mergeDeliveryContext( + deliveryFields.deliveryContext, + requestDeliveryHint, + ); + const effectiveDeliveryFields = normalizeSessionDeliveryFields({ + deliveryContext: effectiveDelivery, + }); + const nextEntryPatch: SessionEntry = { + sessionId, + updatedAt: now, + sessionStartedAt: isNewSession + ? now + : (entry?.sessionStartedAt ?? + resolveSessionLifecycleTimestamps({ + entry, + storePath, + agentId: resolveAgentIdFromSessionKey(canonicalKey), + }).sessionStartedAt), + lastInteractionAt: touchInteraction ? now : entry?.lastInteractionAt, + thinkingLevel: entry?.thinkingLevel, + fastMode: entry?.fastMode, + verboseLevel: entry?.verboseLevel, + traceLevel: entry?.traceLevel, + reasoningLevel: entry?.reasoningLevel, + systemSent: entry?.systemSent, + sendPolicy: entry?.sendPolicy, + skillsSnapshot: entry?.skillsSnapshot, + deliveryContext: effectiveDeliveryFields.deliveryContext, + lastChannel: effectiveDeliveryFields.lastChannel ?? entry?.lastChannel, + lastTo: effectiveDeliveryFields.lastTo ?? entry?.lastTo, + lastAccountId: effectiveDeliveryFields.lastAccountId ?? entry?.lastAccountId, + lastThreadId: effectiveDeliveryFields.lastThreadId ?? entry?.lastThreadId, + modelOverride: entry?.modelOverride, + providerOverride: entry?.providerOverride, + label: labelValue, + spawnedBy: spawnedByValue, + spawnedWorkspaceDir: entry?.spawnedWorkspaceDir, + spawnDepth: entry?.spawnDepth, + channel: entry?.channel ?? request.channel?.trim(), + groupId: resolvedGroupId, + groupChannel: resolvedGroupChannel, + space: resolvedGroupSpace, + ...(pluginOwnerId ? { pluginOwnerId } : {}), + sessionFile: + entry?.sessionId && entry.sessionId !== sessionId ? undefined : entry?.sessionFile, + cliSessionIds: entry?.cliSessionIds, + cliSessionBindings: entry?.cliSessionBindings, + claudeCliSessionId: entry?.claudeCliSessionId, }; - } catch (err) { + sessionEntry = mergeSessionEntry(entry, nextEntryPatch); + if (request.deliver === true) { + const sendPolicy = resolveSendPolicy({ + cfg, + entry: sessionEntry, + sessionKey: canonicalKey, + channel: sessionEntry?.channel, + chatType: sessionEntry?.chatType, + }); + if (sendPolicy === "deny") { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, "send blocked by session policy"), + ); + return; + } + } + resolvedSessionId = sessionId; + const canonicalSessionKey = canonicalKey; + resolvedSessionKey = canonicalSessionKey; + const agentId = resolveAgentIdFromSessionKey(canonicalSessionKey); + const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); + if (storePath) { + const requestedStoreKey = requestedSessionKey; + const persisted = await updateSessionStore(storePath, (store) => { + const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({ + cfg, + key: requestedStoreKey, + store, + }); + const merged = mergeSessionEntry(store[primaryKey], nextEntryPatch); + store[primaryKey] = merged; + return merged; + }); + sessionEntry = persisted; + } + if (canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global") { + context.addChatRun(idem, { + sessionKey: canonicalSessionKey, + clientRunId: idem, + }); + if (requestedBestEffortDeliver === undefined) { + bestEffortDeliver = true; + } + } + registerAgentRunContext(idem, { sessionKey: canonicalSessionKey }); + } + + const connId = typeof client?.connId === "string" ? client.connId : undefined; + const wantsToolEvents = hasGatewayClientCap( + client?.connect?.caps, + GATEWAY_CLIENT_CAPS.TOOL_EVENTS, + ); + if (connId && wantsToolEvents) { + context.registerToolEventRecipient(runId, connId); + // Register for any other active runs *in the same session* so + // late-joining clients (e.g. page refresh mid-response) receive + // in-progress tool events without leaking cross-session data. + for (const [activeRunId, active] of context.chatAbortControllers) { + if (activeRunId !== runId && active.sessionKey === requestedSessionKey) { + context.registerToolEventRecipient(activeRunId, connId); + } + } + } + + const wantsDelivery = request.deliver === true; + const explicitTo = + normalizeOptionalString(request.replyTo) ?? normalizeOptionalString(request.to); + const explicitThreadId = normalizeOptionalString(request.threadId); + const turnSourceChannel = normalizeOptionalString(request.channel); + const turnSourceTo = normalizeOptionalString(request.to); + const turnSourceAccountId = normalizeOptionalString(request.accountId); + const deliveryPlan = resolveAgentDeliveryPlan({ + sessionEntry, + requestedChannel: request.replyChannel ?? request.channel, + explicitTo, + explicitThreadId, + accountId: request.replyAccountId ?? request.accountId, + wantsDelivery, + turnSourceChannel, + turnSourceTo, + turnSourceAccountId, + turnSourceThreadId: explicitThreadId, + }); + + let resolvedChannel = deliveryPlan.resolvedChannel; + let deliveryTargetMode = deliveryPlan.deliveryTargetMode; + let resolvedAccountId = deliveryPlan.resolvedAccountId; + let resolvedTo = deliveryPlan.resolvedTo; + let effectivePlan = deliveryPlan; + let deliveryDowngradeReason: string | null = null; + let deliveryTargetResolutionError: Error | undefined; + + if (wantsDelivery && resolvedChannel === INTERNAL_MESSAGE_CHANNEL) { + const cfgResolved = cfgForAgent ?? cfg; + try { + const selection = await resolveMessageChannelSelection({ cfg: cfgResolved }); + resolvedChannel = selection.channel; + deliveryTargetMode = deliveryTargetMode ?? "implicit"; + effectivePlan = { + ...deliveryPlan, + resolvedChannel, + deliveryTargetMode, + resolvedAccountId, + }; + } catch (err) { + const shouldDowngrade = shouldDowngradeDeliveryToSessionOnly({ + wantsDelivery, + bestEffortDeliver, + resolvedChannel, + }); + if (!shouldDowngrade) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, formatForLog(err))); + return; + } + deliveryDowngradeReason = String(err); + } + } + + if (!resolvedTo && isDeliverableMessageChannel(resolvedChannel)) { + const cfgResolved = cfgForAgent ?? cfg; + const fallback = resolveAgentOutboundTarget({ + cfg: cfgResolved, + plan: effectivePlan, + targetMode: deliveryTargetMode ?? "implicit", + validateExplicitTarget: false, + }); + if (fallback.resolvedTarget?.ok) { + resolvedTo = fallback.resolvedTo; + } else if (fallback.resolvedTarget && !fallback.resolvedTarget.ok) { + deliveryTargetResolutionError = fallback.resolvedTarget.error; + } + } + + if (wantsDelivery && isDeliverableMessageChannel(resolvedChannel) && !resolvedTo) { + if (!bestEffortDeliver) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + deliveryTargetResolutionError + ? String(deliveryTargetResolutionError) + : `delivery target is required for ${resolvedChannel}: pass --to/--reply-to or configure a default target`, + ), + ); + return; + } + context.logGateway.info( + deliveryTargetResolutionError + ? `agent delivery target missing (bestEffortDeliver): ${String(deliveryTargetResolutionError)}` + : "agent delivery target missing (bestEffortDeliver): no deliverable target", + ); + } + + if (wantsDelivery && resolvedChannel === INTERNAL_MESSAGE_CHANNEL) { const shouldDowngrade = shouldDowngradeDeliveryToSessionOnly({ wantsDelivery, bestEffortDeliver, resolvedChannel, }); if (!shouldDowngrade) { - respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, formatForLog(err))); + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "delivery channel is required: pass --channel/--reply-channel or use a main session with a previous channel", + ), + ); return; } - deliveryDowngradeReason = String(err); - } - } - - if (!resolvedTo && isDeliverableMessageChannel(resolvedChannel)) { - const cfgResolved = cfgForAgent ?? cfg; - const fallback = resolveAgentOutboundTarget({ - cfg: cfgResolved, - plan: effectivePlan, - targetMode: deliveryTargetMode ?? "implicit", - validateExplicitTarget: false, - }); - if (fallback.resolvedTarget?.ok) { - resolvedTo = fallback.resolvedTo; - } else if (fallback.resolvedTarget && !fallback.resolvedTarget.ok) { - deliveryTargetResolutionError = fallback.resolvedTarget.error; - } - } - - if (wantsDelivery && isDeliverableMessageChannel(resolvedChannel) && !resolvedTo) { - if (!bestEffortDeliver) { - respond( - false, - undefined, - errorShape( - ErrorCodes.INVALID_REQUEST, - deliveryTargetResolutionError - ? String(deliveryTargetResolutionError) - : `delivery target is required for ${resolvedChannel}: pass --to/--reply-to or configure a default target`, - ), + context.logGateway.info( + deliveryDowngradeReason + ? `agent delivery downgraded to session-only (bestEffortDeliver): ${deliveryDowngradeReason}` + : "agent delivery downgraded to session-only (bestEffortDeliver): no deliverable channel", ); - return; } - context.logGateway.info( - deliveryTargetResolutionError - ? `agent delivery target missing (bestEffortDeliver): ${String(deliveryTargetResolutionError)}` - : "agent delivery target missing (bestEffortDeliver): no deliverable target", - ); - } - if (wantsDelivery && resolvedChannel === INTERNAL_MESSAGE_CHANNEL) { - const shouldDowngrade = shouldDowngradeDeliveryToSessionOnly({ - wantsDelivery, - bestEffortDeliver, - resolvedChannel, + const normalizedTurnSource = normalizeMessageChannel(turnSourceChannel); + const turnSourceMessageChannel = + normalizedTurnSource && isKnownGatewayChannel(normalizedTurnSource) + ? normalizedTurnSource + : undefined; + const originMessageChannel = + turnSourceMessageChannel ?? + (client?.connect && isWebchatConnect(client.connect) + ? INTERNAL_MESSAGE_CHANNEL + : resolvedChannel); + + const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL; + + // Register before the accepted ack so an immediate chat.abort/sessions.abort + // cannot race the active-run entry. Agent RPC runs use the agent timeout; + // chat.send keeps the shorter chat cleanup cap. + const now = Date.now(); + const timeoutMs = resolveAgentTimeoutMs({ + cfg: cfgForAgent ?? cfg, + overrideSeconds: typeof request.timeout === "number" ? request.timeout : undefined, }); - if (!shouldDowngrade) { - respond( - false, - undefined, - errorShape( - ErrorCodes.INVALID_REQUEST, - "delivery channel is required: pass --channel/--reply-channel or use a main session with a previous channel", - ), - ); - return; - } - context.logGateway.info( - deliveryDowngradeReason - ? `agent delivery downgraded to session-only (bestEffortDeliver): ${deliveryDowngradeReason}` - : "agent delivery downgraded to session-only (bestEffortDeliver): no deliverable channel", - ); - } - - const normalizedTurnSource = normalizeMessageChannel(turnSourceChannel); - const turnSourceMessageChannel = - normalizedTurnSource && isKnownGatewayChannel(normalizedTurnSource) - ? normalizedTurnSource - : undefined; - const originMessageChannel = - turnSourceMessageChannel ?? - (client?.connect && isWebchatConnect(client.connect) - ? INTERNAL_MESSAGE_CHANNEL - : resolvedChannel); - - const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL; - - // Register before the accepted ack so an immediate chat.abort/sessions.abort - // cannot race the active-run entry. Agent RPC runs use the agent timeout; - // chat.send keeps the shorter chat cleanup cap. - const now = Date.now(); - const timeoutMs = resolveAgentTimeoutMs({ - cfg: cfgForAgent ?? cfg, - overrideSeconds: typeof request.timeout === "number" ? request.timeout : undefined, - }); - const activeModelProvider = - providerOverride ?? - resolveSessionModelRef( - cfgForAgent ?? cfg, - sessionEntry, - resolvedSessionKey - ? resolveAgentIdFromSessionKey(resolvedSessionKey) - : (agentId ?? resolveDefaultAgentId(cfgForAgent ?? cfg)), - ).provider; - const activeAuthProvider = resolveProviderIdForAuth(activeModelProvider, { - config: cfgForAgent ?? cfg, - }); - const activeRunAbort = registerChatAbortController({ - chatAbortControllers: context.chatAbortControllers, - runId, - sessionId: resolvedSessionId ?? runId, - sessionKey: resolvedSessionKey, - timeoutMs, - now, - expiresAtMs: resolveAgentRunExpiresAtMs({ now, timeoutMs }), - ownerConnId: typeof client?.connId === "string" ? client.connId : undefined, - ownerDeviceId: - typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined, - providerId: activeModelProvider, - authProviderId: activeAuthProvider, - kind: "agent", - }); - if (!activeRunAbort.registered && context.chatAbortControllers.has(runId)) { - respond(true, { runId, status: "in_flight" as const }, undefined, { - cached: true, + const activeModelProvider = + providerOverride ?? + resolveSessionModelRef( + cfgForAgent ?? cfg, + sessionEntry, + resolvedSessionKey + ? resolveAgentIdFromSessionKey(resolvedSessionKey) + : (agentId ?? resolveDefaultAgentId(cfgForAgent ?? cfg)), + ).provider; + const activeAuthProvider = resolveProviderIdForAuth(activeModelProvider, { + config: cfgForAgent ?? cfg, + }); + const activeRunAbort = registerChatAbortController({ + chatAbortControllers: context.chatAbortControllers, runId, + sessionId: resolvedSessionId ?? runId, + sessionKey: resolvedSessionKey, + timeoutMs, + now, + expiresAtMs: resolveAgentRunExpiresAtMs({ now, timeoutMs }), + ownerConnId: typeof client?.connId === "string" ? client.connId : undefined, + ownerDeviceId: + typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined, + providerId: activeModelProvider, + authProviderId: activeAuthProvider, + kind: "agent", }); - return; - } + if (!activeRunAbort.registered && context.chatAbortControllers.has(runId)) { + agentRunAccepted = true; + respond(true, { runId, status: "in_flight" as const }, undefined, { + cached: true, + runId, + }); + return; + } - const accepted = { - runId, - status: "accepted" as const, - acceptedAt: Date.now(), - }; - // Store an in-flight ack so retries do not spawn a second run. - setGatewayDedupeEntry({ - dedupe: context.dedupe, - key: `agent:${idem}`, - entry: { - ts: Date.now(), - ok: true, - payload: accepted, - }, - }); - respond(true, accepted, undefined, { runId }); - // Give the accepted frame one event-loop turn to flush before the runner - // starts potentially heavy synchronous prompt/context setup. The dispatch - // is scheduled out of this request handler so immediate agent.wait calls - // can reach the gateway before the pre-turn runner monopolizes the loop. - void (async () => { - await yieldAfterAgentAcceptedAck(); + const accepted = { + runId, + status: "accepted" as const, + acceptedAt: Date.now(), + }; + agentRunAccepted = true; + // Store an in-flight ack so retries do not spawn a second run. + setGatewayDedupeEntries({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + entry: { + ts: Date.now(), + ok: true, + payload: accepted, + }, + }); + respond(true, accepted, undefined, { runId }); + // Give the accepted frame one event-loop turn to flush before the runner + // starts potentially heavy synchronous prompt/context setup. The dispatch + // is scheduled out of this request handler so immediate agent.wait calls + // can reach the gateway before the pre-turn runner monopolizes the loop. + void (async () => { + await yieldAfterAgentAcceptedAck(); - let dispatched = false; - try { - if (resolvedSessionKey) { - await reactivateCompletedSubagentSession({ - sessionKey: resolvedSessionKey, - runId, - }); - } - - if (requestedSessionKey && resolvedSessionKey && isNewSession) { - emitSessionsChanged(context, { - sessionKey: resolvedSessionKey, - reason: "create", - }); - } - if (resolvedSessionKey) { - emitSessionsChanged(context, { - sessionKey: resolvedSessionKey, - reason: "send", - }); - } - - if (shouldPrependStartupContext && resolvedSessionKey) { - const startupCfg = cfgForAgent ?? cfg; - if ( - !shouldSkipStartupContextForSpawnedSandbox({ - cfg: startupCfg, + let dispatched = false; + try { + if (resolvedSessionKey) { + await reactivateCompletedSubagentSession({ sessionKey: resolvedSessionKey, - spawnedBy: spawnedByValue, - }) - ) { - const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({ - cfg: startupCfg, + runId, + }); + } + + if (requestedSessionKey && resolvedSessionKey && isNewSession) { + emitSessionsChanged(context, { sessionKey: resolvedSessionKey, - sessionEntry, - spawnedBy: spawnedByValue, + reason: "create", }); - const startupContextPrelude = await buildSessionStartupContextPrelude({ - workspaceDir: runtimeWorkspaceDir, - cfg: startupCfg, + } + if (resolvedSessionKey) { + emitSessionsChanged(context, { + sessionKey: resolvedSessionKey, + reason: "send", }); - if (startupContextPrelude) { - message = `${startupContextPrelude}\n\n${message}`; + } + + if (shouldPrependStartupContext && resolvedSessionKey) { + const startupCfg = cfgForAgent ?? cfg; + if ( + !shouldSkipStartupContextForSpawnedSandbox({ + cfg: startupCfg, + sessionKey: resolvedSessionKey, + spawnedBy: spawnedByValue, + }) + ) { + const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({ + cfg: startupCfg, + sessionKey: resolvedSessionKey, + sessionEntry, + spawnedBy: spawnedByValue, + }); + const startupContextPrelude = await buildSessionStartupContextPrelude({ + workspaceDir: runtimeWorkspaceDir, + cfg: startupCfg, + }); + if (startupContextPrelude) { + message = `${startupContextPrelude}\n\n${message}`; + } } } - } - if (!isRawModelRun) { - message = annotateInterSessionPromptText(message, inputProvenance); - } + if (!isRawModelRun) { + message = annotateInterSessionPromptText(message, inputProvenance); + } - const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; - const ingressAgentId = - agentId && - (!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId) - ? agentId - : undefined; - let execApprovalFollowupRuntimeHandoff = - canUseInternalRuntimeHandoff && execApprovalFollowupApprovalId - ? consumeExecApprovalFollowupRuntimeHandoff({ - handoffId: request.internalRuntimeHandoffId, - approvalId: execApprovalFollowupApprovalId, - idempotencyKey: idem, - sessionKey: resolvedSessionKey, - }) - : undefined; - if ( - !execApprovalFollowupRuntimeHandoff && - canUseInternalRuntimeHandoff && - execApprovalFollowupApprovalId && - requestedSessionKeyRaw && - requestedSessionKeyRaw !== resolvedSessionKey - ) { - execApprovalFollowupRuntimeHandoff = consumeExecApprovalFollowupRuntimeHandoff({ - handoffId: request.internalRuntimeHandoffId, - approvalId: execApprovalFollowupApprovalId, - idempotencyKey: idem, - sessionKey: requestedSessionKeyRaw, - }); - } - const execApprovalFollowupElevatedDefaults = - execApprovalFollowupRuntimeHandoff?.bashElevated; + const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; + const ingressAgentId = + agentId && + (!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId) + ? agentId + : undefined; + let execApprovalFollowupRuntimeHandoff = + canUseInternalRuntimeHandoff && execApprovalFollowupApprovalId + ? consumeExecApprovalFollowupRuntimeHandoff({ + handoffId: request.internalRuntimeHandoffId, + approvalId: execApprovalFollowupApprovalId, + idempotencyKey: idem, + sessionKey: resolvedSessionKey, + }) + : undefined; + if ( + !execApprovalFollowupRuntimeHandoff && + canUseInternalRuntimeHandoff && + execApprovalFollowupApprovalId && + requestedSessionKeyRaw && + requestedSessionKeyRaw !== resolvedSessionKey + ) { + execApprovalFollowupRuntimeHandoff = consumeExecApprovalFollowupRuntimeHandoff({ + handoffId: request.internalRuntimeHandoffId, + approvalId: execApprovalFollowupApprovalId, + idempotencyKey: idem, + sessionKey: requestedSessionKeyRaw, + }); + } + const execApprovalFollowupElevatedDefaults = + execApprovalFollowupRuntimeHandoff?.bashElevated; - dispatchAgentRunFromGateway({ - ingressOpts: { - message, - images, - imageOrder, - agentId: ingressAgentId, - provider: providerOverride, - model: modelOverride, - to: resolvedTo, - sessionId: resolvedSessionId, - sessionKey: resolvedSessionKey, - thinking: request.thinking, - deliver, - deliveryTargetMode, - channel: resolvedChannel, - accountId: resolvedAccountId, - threadId: resolvedThreadId, - runContext: { - messageChannel: originMessageChannel, + dispatchAgentRunFromGateway({ + ingressOpts: { + message, + images, + imageOrder, + agentId: ingressAgentId, + provider: providerOverride, + model: modelOverride, + to: resolvedTo, + sessionId: resolvedSessionId, + sessionKey: resolvedSessionKey, + thinking: request.thinking, + deliver, + deliveryTargetMode, + channel: resolvedChannel, accountId: resolvedAccountId, + threadId: resolvedThreadId, + runContext: { + messageChannel: originMessageChannel, + accountId: resolvedAccountId, + groupId: resolvedGroupId, + groupChannel: resolvedGroupChannel, + groupSpace: resolvedGroupSpace, + currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined, + }, + ...(execApprovalFollowupElevatedDefaults + ? { bashElevated: execApprovalFollowupElevatedDefaults } + : {}), groupId: resolvedGroupId, groupChannel: resolvedGroupChannel, groupSpace: resolvedGroupSpace, - currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined, - }, - ...(execApprovalFollowupElevatedDefaults - ? { bashElevated: execApprovalFollowupElevatedDefaults } - : {}), - groupId: resolvedGroupId, - groupChannel: resolvedGroupChannel, - groupSpace: resolvedGroupSpace, - spawnedBy: spawnedByValue, - timeout: request.timeout?.toString(), - bestEffortDeliver, - messageChannel: originMessageChannel, - runId, - lane: request.lane, - modelRun: request.modelRun === true, - promptMode: request.promptMode, - extraSystemPrompt: request.extraSystemPrompt, - bootstrapContextMode: request.bootstrapContextMode, - bootstrapContextRunKind: request.bootstrapContextRunKind, - acpTurnSource: request.acpTurnSource, - internalEvents: request.internalEvents, - inputProvenance, - sourceReplyDeliveryMode: request.sourceReplyDeliveryMode, - suppressPromptPersistence: shouldSuppressAgentPromptPersistence({ - inputProvenance, - internalEvents: request.internalEvents, - }), - cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd, - abortSignal: activeRunAbort.controller.signal, - onActiveModelSelected: ({ provider }) => { - updateChatRunProvider(context.chatAbortControllers, { - runId, - providerId: provider, - authProviderId: resolveProviderIdForAuth(provider, { - config: cfgForAgent ?? cfg, - }), - }); - }, - // Internal-only: allow workspace override for spawned subagent runs. - workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({ spawnedBy: spawnedByValue, - workspaceDir: sessionEntry?.spawnedWorkspaceDir, - }), - senderIsOwner, - allowModelOverride, - }, - runId, - idempotencyKey: idem, - abortController: activeRunAbort.controller, - respond, - context, - }); - dispatched = true; - } catch (err) { - const error = errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)); - const payload = { - runId, - status: "error" as const, - summary: formatForLog(err), - }; - setGatewayDedupeEntry({ - dedupe: context.dedupe, - key: `agent:${idem}`, - entry: { - ts: Date.now(), - ok: false, - payload, - error, - }, - }); - respond(false, payload, error, { - runId, - error: formatForLog(err), - }); - } finally { - if (!dispatched) { - activeRunAbort.cleanup(); + timeout: request.timeout?.toString(), + bestEffortDeliver, + messageChannel: originMessageChannel, + runId, + lane: request.lane, + modelRun: request.modelRun === true, + promptMode: request.promptMode, + extraSystemPrompt: request.extraSystemPrompt, + bootstrapContextMode: request.bootstrapContextMode, + bootstrapContextRunKind: request.bootstrapContextRunKind, + acpTurnSource: request.acpTurnSource, + internalEvents: request.internalEvents, + inputProvenance, + sourceReplyDeliveryMode: request.sourceReplyDeliveryMode, + suppressPromptPersistence: shouldSuppressAgentPromptPersistence({ + inputProvenance, + internalEvents: request.internalEvents, + }), + cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd, + abortSignal: activeRunAbort.controller.signal, + onActiveModelSelected: ({ provider }) => { + updateChatRunProvider(context.chatAbortControllers, { + runId, + providerId: provider, + authProviderId: resolveProviderIdForAuth(provider, { + config: cfgForAgent ?? cfg, + }), + }); + }, + // Internal-only: allow workspace override for spawned subagent runs. + workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({ + spawnedBy: spawnedByValue, + workspaceDir: sessionEntry?.spawnedWorkspaceDir, + }), + senderIsOwner, + allowModelOverride, + }, + runId, + dedupeKeys: agentDedupeKeys, + abortController: activeRunAbort.controller, + respond, + context, + }); + dispatched = true; + } catch (err) { + const error = errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)); + const payload = { + runId, + status: "error" as const, + summary: formatForLog(err), + }; + setGatewayDedupeEntries({ + dedupe: context.dedupe, + keys: agentDedupeKeys, + entry: { + ts: Date.now(), + ok: false, + payload, + error, + }, + }); + respond(false, payload, error, { + runId, + error: formatForLog(err), + }); + } finally { + if (!dispatched) { + activeRunAbort.cleanup(); + } } - } - })(); + })(); + } finally { + clearUnacceptedExecApprovalFollowupDedupe(); + } }, "agent.identity.get": ({ params, respond, context }) => { if (!validateAgentIdentityParams(params)) {