From 5dcf526a4344340220d2d8e91a3c59b92391b0ce Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Wed, 15 Apr 2026 13:06:18 -0700 Subject: [PATCH] fix: dedupe replayed exec.finished node events (#67281) * docs: add async exec duplicate completion investigation Add an internal refactor note tracing the node exec completion to system event to heartbeat to transcript path for duplicate async exec injections. Document the most likely gateway-side gap as missing idempotency for replayed exec.finished events, and note why plain outbound delivery retry is a weaker fit for duplicate user turns. Regeneration-Prompt: | Investigate a live duplicate async exec completion that appeared as two identical user turns in an OpenClaw session. Trace the completion path from exec producers into enqueueSystemEvent, heartbeat wake scheduling, prompt assembly, and embedded transcript persistence. Decide whether duplicate wake handling, outbound delivery retry, or duplicate completion event ingestion is the more likely cause, cite the exact code locations, and capture the smallest plausible fix seam without making runtime changes. * fix: dedupe replayed exec finished node events Add a narrow idempotency guard in the gateway node-event handler for repeated exec.finished events with the same canonical session key and runId. This blocks replayed async exec completions from being enqueued and heartbeated twice into the parent session. Also only request a heartbeat when the system event was actually queued, and add a regression test for duplicate runId injection. Regeneration-Prompt: | Prevent duplicate async exec completion events from being injected twice into the parent session. Keep the scope tight around the highest-confidence path: node exec.finished events entering gateway server-node-events and becoming system-event-driven heartbeat prompts. Add a small idempotency guard keyed by canonical session plus exec runId, avoid broader delivery or retry changes unless needed, and add regression coverage that fails if the same exec.finished replay is enqueued and woken twice. * fix: note exec finished replay dedupe --- CHANGELOG.md | 1 + ...exec-duplicate-completion-investigation.md | 122 ++++++++++++++++++ src/gateway/server-node-events.test.ts | 35 ++++- src/gateway/server-node-events.ts | 67 +++++++++- 4 files changed, 219 insertions(+), 6 deletions(-) create mode 100644 docs/refactor/async-exec-duplicate-completion-investigation.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cd784fc2a7..310436981b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -158,6 +158,7 @@ Docs: https://docs.openclaw.ai - Media/store: honor configured agent media limits when saving generated media and persisting outbound reply media, so the store no longer hard-stops those flows at 5 MB before the configured limit applies. (#66229) Thanks @neeravmakwana and @vincentkoc. - Plugins/setup-entry: preserve separate setup-entry secrets exports when loading bundled setup-runtime channels, so setup-mode flows keep the channel secret contract for split plugin + secrets entrypoints. (#66261) Thanks @hxy91819. - CLI/update: prune stale packaged `dist` chunks after npm upgrades, verify installed package inventory, and keep downgrade/update verification working across older releases. (#66959) Thanks @obviyus. +- Gateway/exec events: dedupe replayed `exec.finished` node events by canonical session key plus `runId` so duplicate async completion replays no longer inject duplicate completion turns into the parent session transcript. (#67281) thanks @jalehman. ## 2026.4.12 diff --git a/docs/refactor/async-exec-duplicate-completion-investigation.md b/docs/refactor/async-exec-duplicate-completion-investigation.md new file mode 100644 index 00000000000..622b34cb297 --- /dev/null +++ b/docs/refactor/async-exec-duplicate-completion-investigation.md @@ -0,0 +1,122 @@ +# Async Exec Duplicate Completion Investigation + +## Scope + +- Session: `agent:main:telegram:group:-1003774691294:topic:1` +- Symptom: the same async exec completion for session/run `keen-nexus` was recorded twice in LCM as user turns. +- Goal: identify whether this is most likely duplicate session injection or plain outbound delivery retry. + +## Conclusion + +Most likely this is **duplicate session injection**, not a pure outbound delivery retry. + +The strongest gateway-side gap is in the **node exec completion path**: + +1. A node-side exec finish emits `exec.finished` with the full `runId`. +2. Gateway `server-node-events` converts that into a system event and requests a heartbeat. +3. The heartbeat run injects the drained system event block into the agent prompt. +4. The embedded runner persists that prompt as a new user turn in the session transcript. + +If the same `exec.finished` reaches the gateway twice for the same `runId` for any reason (replay, reconnect duplicate, upstream resend, duplicated producer), OpenClaw currently has **no idempotency check keyed by `runId`/`contextKey`** on this path. The second copy will become a second user message with the same content. + +## Exact Code Path + +### 1. Producer: node exec completion event + +- `src/node-host/invoke.ts:340-360` + - `sendExecFinishedEvent(...)` emits `node.event` with event `exec.finished`. + - Payload includes `sessionKey` and full `runId`. + +### 2. Gateway event ingestion + +- `src/gateway/server-node-events.ts:574-640` + - Handles `exec.finished`. + - Builds text: + - `Exec finished (node=..., id=, code ...)` + - Enqueues it via: + - `enqueueSystemEvent(text, { sessionKey, contextKey: runId ? \`exec:${runId}\` : "exec", trusted: false })` + - Immediately requests a wake: + - `requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" }))` + +### 3. System event dedupe weakness + +- `src/infra/system-events.ts:90-115` + - `enqueueSystemEvent(...)` only suppresses **consecutive duplicate text**: + - `if (entry.lastText === cleaned) return false` + - It stores `contextKey`, but does **not** use `contextKey` for idempotency. + - After drain, duplicate suppression resets. + +This means a replayed `exec.finished` with the same `runId` can be accepted again later, even though the code already had a stable idempotency candidate (`exec:`). + +### 4. Wake handling is not the primary duplicator + +- `src/infra/heartbeat-wake.ts:79-117` + - Wakes are coalesced by `(agentId, sessionKey)`. + - Duplicate wake requests for the same target collapse to one pending wake entry. + +This makes **duplicate wake handling alone** a weaker explanation than duplicate event ingestion. + +### 5. Heartbeat consumes the event and turns it into prompt input + +- `src/infra/heartbeat-runner.ts:535-574` + - Preflight peeks pending system events and classifies exec-event runs. +- `src/auto-reply/reply/session-system-events.ts:86-90` + - `drainFormattedSystemEvents(...)` drains the queue for the session. +- `src/auto-reply/reply/get-reply-run.ts:400-427` + - The drained system event block is prepended into the agent prompt body. + +### 6. Transcript injection point + +- `src/agents/pi-embedded-runner/run/attempt.ts:2000-2017` + - `activeSession.prompt(effectivePrompt)` submits the full prompt to the embedded PI session. + - That is the point where the completion-derived prompt becomes a persisted user turn. + +So once the same system event is rebuilt into the prompt twice, duplicate LCM user messages are expected. + +## Why plain outbound delivery retry is less likely + +There is a real outbound failure path in the heartbeat runner: + +- `src/infra/heartbeat-runner.ts:1194-1242` + - The reply is generated first. + - Outbound delivery happens later via `deliverOutboundPayloads(...)`. + - Failure there returns `{ status: "failed" }`. + +However, for the same system event queue entry, this alone is **not sufficient** to explain the duplicate user turns: + +- `src/auto-reply/reply/session-system-events.ts:86-90` + - The system event queue is already drained before outbound delivery. + +So a channel send retry by itself would not recreate the exact same queued event. It could explain missing/failed external delivery, but not by itself a second identical session user message. + +## Secondary, lower-confidence possibility + +There is a full-run retry loop in the agent runner: + +- `src/auto-reply/reply/agent-runner-execution.ts:741-1473` + - Certain transient failures can retry the whole run and resubmit the same `commandBody`. + +That can duplicate a persisted user prompt **within the same reply execution** if the prompt was already appended before the retry condition triggered. + +I rank this lower than duplicate `exec.finished` ingestion because: + +- the observed gap was around 51 seconds, which looks more like a second wake/turn than an in-process retry; +- the report already mentions repeated message send failures, which points more toward a separate later turn than an immediate model/runtime retry. + +## Root Cause Hypothesis + +Highest-confidence hypothesis: + +- The `keen-nexus` completion came through the **node exec event path**. +- The same `exec.finished` was delivered to `server-node-events` twice. +- Gateway accepted both because `enqueueSystemEvent(...)` does not dedupe by `contextKey` / `runId`. +- Each accepted event triggered a heartbeat and was injected as a user turn into the PI transcript. + +## Proposed Tiny Surgical Fix + +If a fix is wanted, the smallest high-value change is: + +- make exec/system-event idempotency honor `contextKey` for a short horizon, at least for exact `(sessionKey, contextKey, text)` repeats; +- or add a dedicated dedupe in `server-node-events` for `exec.finished` keyed by `(sessionKey, runId, event kind)`. + +That would directly block replayed `exec.finished` duplicates before they become session turns. diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index b09f44e6c10..e96ed7d0db5 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -135,7 +135,7 @@ vi.mock("./server-node-events.runtime.js", () => runtimeMocks); import type { CliDeps } from "../cli/deps.js"; import type { HealthSummary } from "../commands/health.js"; import type { NodeEventContext } from "./server-node-events-types.js"; -import { handleNodeEvent } from "./server-node-events.js"; +import { handleNodeEvent, resetNodeEventDeduplicationForTests } from "./server-node-events.js"; const enqueueSystemEventMock = runtimeMocks.enqueueSystemEvent; const requestHeartbeatNowMock = runtimeMocks.requestHeartbeatNow; @@ -171,7 +171,9 @@ function buildCtx(): NodeEventContext { describe("node exec events", () => { beforeEach(() => { + resetNodeEventDeduplicationForTests(); enqueueSystemEventMock.mockClear(); + enqueueSystemEventMock.mockReturnValue(true); requestHeartbeatNowMock.mockClear(); registerApnsRegistrationVi.mockClear(); loadOrCreateDeviceIdentityMock.mockClear(); @@ -220,6 +222,37 @@ describe("node exec events", () => { expect(requestHeartbeatNowMock).toHaveBeenCalledWith({ reason: "exec-event" }); }); + it("dedupes duplicate exec.finished events for the same runId on the same session", async () => { + const ctx = buildCtx(); + const payloadJSON = JSON.stringify({ + sessionKey: "agent:main:main", + runId: "run-dup-finished", + exitCode: 0, + timedOut: false, + output: "done", + }); + + await handleNodeEvent(ctx, "node-2", { + event: "exec.finished", + payloadJSON, + }); + await handleNodeEvent(ctx, "node-2", { + event: "exec.finished", + payloadJSON, + }); + + expect(enqueueSystemEventMock).toHaveBeenCalledTimes(1); + expect(requestHeartbeatNowMock).toHaveBeenCalledTimes(1); + expect(enqueueSystemEventMock).toHaveBeenCalledWith( + "Exec finished (node=node-2 id=run-dup-finished, code 0)\ndone", + { + sessionKey: "agent:main:main", + contextKey: "exec:run-dup-finished", + trusted: false, + }, + ); + }); + it("canonicalizes exec session key before enqueue and wake", async () => { loadSessionEntryMock.mockReturnValueOnce({ ...buildSessionLookup("node-node-2"), diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 8f184f4e85a..545f0d26a72 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -39,8 +39,11 @@ const MAX_EXEC_EVENT_OUTPUT_CHARS = 180; const MAX_NOTIFICATION_EVENT_TEXT_CHARS = 120; const VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS = 1500; const MAX_RECENT_VOICE_TRANSCRIPTS = 200; +const EXEC_FINISHED_RUN_DEDUPE_WINDOW_MS = 10 * 60 * 1000; +const MAX_RECENT_EXEC_FINISHED_RUNS = 2000; const recentVoiceTranscripts = new Map(); +const recentExecFinishedRuns = new Map(); function normalizeFiniteInteger(value: unknown): number | null { return typeof value === "number" && Number.isFinite(value) ? Math.trunc(value) : null; @@ -116,6 +119,48 @@ function shouldDropDuplicateVoiceTranscript(params: { return false; } +function shouldDropDuplicateExecFinished(params: { + sessionKey: string; + runId: string; + now: number; +}): boolean { + const fingerprint = `${params.sessionKey}::${params.runId}`; + const previousTs = recentExecFinishedRuns.get(fingerprint); + if ( + typeof previousTs === "number" && + params.now - previousTs <= EXEC_FINISHED_RUN_DEDUPE_WINDOW_MS + ) { + return true; + } + + recentExecFinishedRuns.set(fingerprint, params.now); + if (recentExecFinishedRuns.size > MAX_RECENT_EXEC_FINISHED_RUNS) { + const cutoff = params.now - EXEC_FINISHED_RUN_DEDUPE_WINDOW_MS; + for (const [key, ts] of recentExecFinishedRuns) { + if (ts < cutoff) { + recentExecFinishedRuns.delete(key); + } + if (recentExecFinishedRuns.size <= MAX_RECENT_EXEC_FINISHED_RUNS) { + break; + } + } + while (recentExecFinishedRuns.size > MAX_RECENT_EXEC_FINISHED_RUNS) { + const oldestKey = recentExecFinishedRuns.keys().next().value; + if (oldestKey === undefined) { + break; + } + recentExecFinishedRuns.delete(oldestKey); + } + } + + return false; +} + +export function resetNodeEventDeduplicationForTests() { + recentVoiceTranscripts.clear(); + recentExecFinishedRuns.clear(); +} + function compactExecEventOutput(raw: string) { const normalized = raw.replace(/\s+/g, " ").trim(); if (!normalized) { @@ -618,6 +663,16 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt if (!shouldNotify) { return; } + if ( + runId && + shouldDropDuplicateExecFinished({ + sessionKey, + runId, + now: Date.now(), + }) + ) { + return; + } text = `Exec finished (node=${nodeId}${runId ? ` id=${runId}` : ""}, ${exitLabel})`; if (compactOutput) { text += `\n${compactOutput}`; @@ -629,15 +684,17 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt } } - enqueueSystemEvent(text, { + const queued = enqueueSystemEvent(text, { sessionKey, contextKey: runId ? `exec:${runId}` : "exec", trusted: false, }); - // Scope wakes only for canonical agent sessions. Synthetic node-* fallback - // keys should keep legacy unscoped behavior so enabled non-main heartbeat - // agents still run when no explicit agent session is provided. - requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" })); + if (queued) { + // Scope wakes only for canonical agent sessions. Synthetic node-* fallback + // keys should keep legacy unscoped behavior so enabled non-main heartbeat + // agents still run when no explicit agent session is provided. + requestHeartbeatNow(scopedHeartbeatWakeOptions(sessionKey, { reason: "exec-event" })); + } return; } case "push.apns.register": {