diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cd784fc2a7..310436981b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -158,6 +158,7 @@ Docs: https://docs.openclaw.ai - Media/store: honor configured agent media limits when saving generated media and persisting outbound reply media, so the store no longer hard-stops those flows at 5 MB before the configured limit applies. (#66229) Thanks @neeravmakwana and @vincentkoc. - Plugins/setup-entry: preserve separate setup-entry secrets exports when loading bundled setup-runtime channels, so setup-mode flows keep the channel secret contract for split plugin + secrets entrypoints. (#66261) Thanks @hxy91819. - CLI/update: prune stale packaged `dist` chunks after npm upgrades, verify installed package inventory, and keep downgrade/update verification working across older releases. (#66959) Thanks @obviyus. +- Gateway/exec events: dedupe replayed `exec.finished` node events by canonical session key plus `runId` so duplicate async completion replays no longer inject duplicate completion turns into the parent session transcript. (#67281) thanks @jalehman. ## 2026.4.12 diff --git a/docs/refactor/async-exec-duplicate-completion-investigation.md b/docs/refactor/async-exec-duplicate-completion-investigation.md new file mode 100644 index 00000000000..622b34cb297 --- /dev/null +++ b/docs/refactor/async-exec-duplicate-completion-investigation.md @@ -0,0 +1,122 @@ +# Async Exec Duplicate Completion Investigation + +## Scope + +- Session: `agent:main:telegram:group:-1003774691294:topic:1` +- Symptom: the same async exec completion for session/run `keen-nexus` was recorded twice in LCM as user turns. +- Goal: identify whether this is most likely duplicate session injection or plain outbound delivery retry. + +## Conclusion + +Most likely this is **duplicate session injection**, not a pure outbound delivery retry. + +The strongest gateway-side gap is in the **node exec completion path**: + +1. A node-side exec finish emits `exec.finished` with the full `runId`. +2. Gateway `server-node-events` converts that into a system event and requests a heartbeat. +3. The heartbeat run injects the drained system event block into the agent prompt. +4. The embedded runner persists that prompt as a new user turn in the session transcript. + +If the same `exec.finished` reaches the gateway twice for the same `runId` for any reason (replay, reconnect duplicate, upstream resend, duplicated producer), OpenClaw currently has **no idempotency check keyed by `runId`/`contextKey`** on this path. The second copy will become a second user message with the same content. + +## Exact Code Path + +### 1. Producer: node exec completion event + +- `src/node-host/invoke.ts:340-360` + - `sendExecFinishedEvent(...)` emits `node.event` with event `exec.finished`. + - Payload includes `sessionKey` and full `runId`. + +### 2. Gateway event ingestion + +- `src/gateway/server-node-events.ts:574-640` + - Handles `exec.finished`. + - Builds text: + - `Exec finished (node=..., id=, code ...)` + - Enqueues it via: + - `enqueueSystemEvent(text, { sessionKey, contextKey: runId ? \`exec:${runId}\` : "exec", trusted: false })` + - Immediately requests a wake: + - `requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" }))` + +### 3. System event dedupe weakness + +- `src/infra/system-events.ts:90-115` + - `enqueueSystemEvent(...)` only suppresses **consecutive duplicate text**: + - `if (entry.lastText === cleaned) return false` + - It stores `contextKey`, but does **not** use `contextKey` for idempotency. + - After drain, duplicate suppression resets. + +This means a replayed `exec.finished` with the same `runId` can be accepted again later, even though the code already had a stable idempotency candidate (`exec:`). + +### 4. Wake handling is not the primary duplicator + +- `src/infra/heartbeat-wake.ts:79-117` + - Wakes are coalesced by `(agentId, sessionKey)`. + - Duplicate wake requests for the same target collapse to one pending wake entry. + +This makes **duplicate wake handling alone** a weaker explanation than duplicate event ingestion. + +### 5. Heartbeat consumes the event and turns it into prompt input + +- `src/infra/heartbeat-runner.ts:535-574` + - Preflight peeks pending system events and classifies exec-event runs. +- `src/auto-reply/reply/session-system-events.ts:86-90` + - `drainFormattedSystemEvents(...)` drains the queue for the session. +- `src/auto-reply/reply/get-reply-run.ts:400-427` + - The drained system event block is prepended into the agent prompt body. + +### 6. Transcript injection point + +- `src/agents/pi-embedded-runner/run/attempt.ts:2000-2017` + - `activeSession.prompt(effectivePrompt)` submits the full prompt to the embedded PI session. + - That is the point where the completion-derived prompt becomes a persisted user turn. + +So once the same system event is rebuilt into the prompt twice, duplicate LCM user messages are expected. + +## Why plain outbound delivery retry is less likely + +There is a real outbound failure path in the heartbeat runner: + +- `src/infra/heartbeat-runner.ts:1194-1242` + - The reply is generated first. + - Outbound delivery happens later via `deliverOutboundPayloads(...)`. + - Failure there returns `{ status: "failed" }`. + +However, for the same system event queue entry, this alone is **not sufficient** to explain the duplicate user turns: + +- `src/auto-reply/reply/session-system-events.ts:86-90` + - The system event queue is already drained before outbound delivery. + +So a channel send retry by itself would not recreate the exact same queued event. It could explain missing/failed external delivery, but not by itself a second identical session user message. + +## Secondary, lower-confidence possibility + +There is a full-run retry loop in the agent runner: + +- `src/auto-reply/reply/agent-runner-execution.ts:741-1473` + - Certain transient failures can retry the whole run and resubmit the same `commandBody`. + +That can duplicate a persisted user prompt **within the same reply execution** if the prompt was already appended before the retry condition triggered. + +I rank this lower than duplicate `exec.finished` ingestion because: + +- the observed gap was around 51 seconds, which looks more like a second wake/turn than an in-process retry; +- the report already mentions repeated message send failures, which points more toward a separate later turn than an immediate model/runtime retry. + +## Root Cause Hypothesis + +Highest-confidence hypothesis: + +- The `keen-nexus` completion came through the **node exec event path**. +- The same `exec.finished` was delivered to `server-node-events` twice. +- Gateway accepted both because `enqueueSystemEvent(...)` does not dedupe by `contextKey` / `runId`. +- Each accepted event triggered a heartbeat and was injected as a user turn into the PI transcript. + +## Proposed Tiny Surgical Fix + +If a fix is wanted, the smallest high-value change is: + +- make exec/system-event idempotency honor `contextKey` for a short horizon, at least for exact `(sessionKey, contextKey, text)` repeats; +- or add a dedicated dedupe in `server-node-events` for `exec.finished` keyed by `(sessionKey, runId, event kind)`. + +That would directly block replayed `exec.finished` duplicates before they become session turns. diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index b09f44e6c10..e96ed7d0db5 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -135,7 +135,7 @@ vi.mock("./server-node-events.runtime.js", () => runtimeMocks); import type { CliDeps } from "../cli/deps.js"; import type { HealthSummary } from "../commands/health.js"; import type { NodeEventContext } from "./server-node-events-types.js"; -import { handleNodeEvent } from "./server-node-events.js"; +import { handleNodeEvent, resetNodeEventDeduplicationForTests } from "./server-node-events.js"; const enqueueSystemEventMock = runtimeMocks.enqueueSystemEvent; const requestHeartbeatNowMock = runtimeMocks.requestHeartbeatNow; @@ -171,7 +171,9 @@ function buildCtx(): NodeEventContext { describe("node exec events", () => { beforeEach(() => { + resetNodeEventDeduplicationForTests(); enqueueSystemEventMock.mockClear(); + enqueueSystemEventMock.mockReturnValue(true); requestHeartbeatNowMock.mockClear(); registerApnsRegistrationVi.mockClear(); loadOrCreateDeviceIdentityMock.mockClear(); @@ -220,6 +222,37 @@ describe("node exec events", () => { expect(requestHeartbeatNowMock).toHaveBeenCalledWith({ reason: "exec-event" }); }); + it("dedupes duplicate exec.finished events for the same runId on the same session", async () => { + const ctx = buildCtx(); + const payloadJSON = JSON.stringify({ + sessionKey: "agent:main:main", + runId: "run-dup-finished", + exitCode: 0, + timedOut: false, + output: "done", + }); + + await handleNodeEvent(ctx, "node-2", { + event: "exec.finished", + payloadJSON, + }); + await handleNodeEvent(ctx, "node-2", { + event: "exec.finished", + payloadJSON, + }); + + expect(enqueueSystemEventMock).toHaveBeenCalledTimes(1); + expect(requestHeartbeatNowMock).toHaveBeenCalledTimes(1); + expect(enqueueSystemEventMock).toHaveBeenCalledWith( + "Exec finished (node=node-2 id=run-dup-finished, code 0)\ndone", + { + sessionKey: "agent:main:main", + contextKey: "exec:run-dup-finished", + trusted: false, + }, + ); + }); + it("canonicalizes exec session key before enqueue and wake", async () => { loadSessionEntryMock.mockReturnValueOnce({ ...buildSessionLookup("node-node-2"), diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 8f184f4e85a..545f0d26a72 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -39,8 +39,11 @@ const MAX_EXEC_EVENT_OUTPUT_CHARS = 180; const MAX_NOTIFICATION_EVENT_TEXT_CHARS = 120; const VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS = 1500; const MAX_RECENT_VOICE_TRANSCRIPTS = 200; +const EXEC_FINISHED_RUN_DEDUPE_WINDOW_MS = 10 * 60 * 1000; +const MAX_RECENT_EXEC_FINISHED_RUNS = 2000; const recentVoiceTranscripts = new Map(); +const recentExecFinishedRuns = new Map(); function normalizeFiniteInteger(value: unknown): number | null { return typeof value === "number" && Number.isFinite(value) ? Math.trunc(value) : null; @@ -116,6 +119,48 @@ function shouldDropDuplicateVoiceTranscript(params: { return false; } +function shouldDropDuplicateExecFinished(params: { + sessionKey: string; + runId: string; + now: number; +}): boolean { + const fingerprint = `${params.sessionKey}::${params.runId}`; + const previousTs = recentExecFinishedRuns.get(fingerprint); + if ( + typeof previousTs === "number" && + params.now - previousTs <= EXEC_FINISHED_RUN_DEDUPE_WINDOW_MS + ) { + return true; + } + + recentExecFinishedRuns.set(fingerprint, params.now); + if (recentExecFinishedRuns.size > MAX_RECENT_EXEC_FINISHED_RUNS) { + const cutoff = params.now - EXEC_FINISHED_RUN_DEDUPE_WINDOW_MS; + for (const [key, ts] of recentExecFinishedRuns) { + if (ts < cutoff) { + recentExecFinishedRuns.delete(key); + } + if (recentExecFinishedRuns.size <= MAX_RECENT_EXEC_FINISHED_RUNS) { + break; + } + } + while (recentExecFinishedRuns.size > MAX_RECENT_EXEC_FINISHED_RUNS) { + const oldestKey = recentExecFinishedRuns.keys().next().value; + if (oldestKey === undefined) { + break; + } + recentExecFinishedRuns.delete(oldestKey); + } + } + + return false; +} + +export function resetNodeEventDeduplicationForTests() { + recentVoiceTranscripts.clear(); + recentExecFinishedRuns.clear(); +} + function compactExecEventOutput(raw: string) { const normalized = raw.replace(/\s+/g, " ").trim(); if (!normalized) { @@ -618,6 +663,16 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt if (!shouldNotify) { return; } + if ( + runId && + shouldDropDuplicateExecFinished({ + sessionKey, + runId, + now: Date.now(), + }) + ) { + return; + } text = `Exec finished (node=${nodeId}${runId ? ` id=${runId}` : ""}, ${exitLabel})`; if (compactOutput) { text += `\n${compactOutput}`; @@ -629,15 +684,17 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt } } - enqueueSystemEvent(text, { + const queued = enqueueSystemEvent(text, { sessionKey, contextKey: runId ? `exec:${runId}` : "exec", trusted: false, }); - // Scope wakes only for canonical agent sessions. Synthetic node-* fallback - // keys should keep legacy unscoped behavior so enabled non-main heartbeat - // agents still run when no explicit agent session is provided. - requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" })); + if (queued) { + // Scope wakes only for canonical agent sessions. Synthetic node-* fallback + // keys should keep legacy unscoped behavior so enabled non-main heartbeat + // agents still run when no explicit agent session is provided. + requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" })); + } return; } case "push.apns.register": {