From 14aa98827ab6a03e407307665c89698cf769ac74 Mon Sep 17 00:00:00 2001 From: Chunyue Wang <80630709+openperf@users.noreply.github.com> Date: Tue, 5 May 2026 06:17:00 +0800 Subject: [PATCH] fix(codex/app-server): stable mirror idempotency to prevent transcript loss (#77046) * fix(codex/app-server): stable mirror idempotency to prevent transcript loss * Changelog: note codex/app-server transcript mirror dedupe stabilization (#77046) --- CHANGELOG.md | 1 + .../codex/src/app-server/event-projector.ts | 41 ++- .../codex/src/app-server/run-attempt.ts | 9 +- .../src/app-server/transcript-mirror.test.ts | 292 ++++++++++++++++-- .../codex/src/app-server/transcript-mirror.ts | 64 +++- 5 files changed, 368 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 722f0e6c56e..ed871041fbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -546,6 +546,7 @@ Docs: https://docs.openclaw.ai - Plugins/update: keep externalized bundled npm bridge updates on the normal plugin security scanner path instead of granting source-linked official trust without artifact provenance. (#76765) Thanks @Lucenx9. - Agents/reply context: label replied-to messages as the current user message target in model-visible metadata, so short replies are grounded to their explicit reply target instead of nearby chat history. (#76817) Thanks @obviyus. - Doctor/plugins: install configured missing official plugins such as Discord and Brave during doctor/update repair, auto-enable repaired provider plugins, preserve config when a download fails, and stop auto-enable from inventing plugin entries when no manifest declares a configured channel. Fixes #76872. Thanks @jack-stormentswe. +- Codex/app-server: stabilize transcript mirror dedupe across re-mirrored turns so reordered snapshots no longer drop reasoning entries or duplicate the assistant reply. Refs #77012. (#77046) Thanks @openperf. ## 2026.5.2 diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index bae62e0ed02..d0f79c6a55f 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -28,6 +28,7 @@ import { type JsonValue, } from "./protocol.js"; import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; +import { attachCodexMirrorIdentity } from "./transcript-mirror.js"; export type CodexAppServerToolTelemetry = { didSendViaMessagingTool: boolean; @@ -185,23 +186,47 @@ export class CodexAppServerEventProjector { assistantTexts.length > 0 ? this.createAssistantMessage(assistantTexts.join("\n\n")) : undefined; + // Each snapshot entry is tagged with a stable mirror identity of the + // shape `${turnId}:${kind}`. The mirror's idempotency key is derived + // from this identity rather than from snapshot position or content + // hash, so: + // - Re-mirror of the same turn (retry) → same identity → no-op. + // - Re-emit of a prior turn's entry into a later turn's snapshot + // (the cross-turn drift mode named in #77012) → original identity + // is preserved → on-disk key still matches → also a no-op. + // - Two distinct turns where the user repeats verbatim content → + // distinct turnIds → distinct identities → both kept. + const turnId = this.turnId; const messagesSnapshot: AgentMessage[] = [ - { - role: "user", - content: this.params.prompt, - timestamp: Date.now(), - }, + attachCodexMirrorIdentity( + { + role: "user", + content: this.params.prompt, + timestamp: Date.now(), + }, + `${turnId}:prompt`, + ), ]; // Codex owns the canonical thread. These mirror records keep enough local // context for OpenClaw history, search, and future harness switching. if (reasoningText) { - messagesSnapshot.push(this.createAssistantMirrorMessage("Codex reasoning", reasoningText)); + messagesSnapshot.push( + attachCodexMirrorIdentity( + this.createAssistantMirrorMessage("Codex reasoning", reasoningText), + `${turnId}:reasoning`, + ), + ); } if (planText) { - messagesSnapshot.push(this.createAssistantMirrorMessage("Codex plan", planText)); + messagesSnapshot.push( + attachCodexMirrorIdentity( + this.createAssistantMirrorMessage("Codex plan", planText), + `${turnId}:plan`, + ), + ); } if (lastAssistant) { - messagesSnapshot.push(lastAssistant); + messagesSnapshot.push(attachCodexMirrorIdentity(lastAssistant, `${turnId}:assistant`)); } const turnFailed = this.completedTurn?.status === "failed"; const turnInterrupted = this.completedTurn?.status === "interrupted"; diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index a082ecb7325..3850b072220 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -1822,7 +1822,14 @@ async function mirrorTranscriptBestEffort(params: { agentId: params.agentId, sessionKey: params.sessionKey, messages: params.result.messagesSnapshot, - idempotencyScope: `codex-app-server:${params.threadId}:${params.turnId}`, + // Scope is thread-stable. Each entry in `messagesSnapshot` is tagged + // with a per-turn `attachCodexMirrorIdentity` value carrying its own + // turnId, so distinct turns produce distinct dedupe keys via the + // identity (not via the scope). Dropping `turnId` from the scope + // here is what lets a re-emitted prior-turn entry — which still + // carries its original `${turnId}:${kind}` identity — collide with + // its existing on-disk key and be a true no-op. + idempotencyScope: `codex-app-server:${params.threadId}`, config: params.params.config, }); } catch (error) { diff --git a/extensions/codex/src/app-server/transcript-mirror.test.ts b/extensions/codex/src/app-server/transcript-mirror.test.ts index 9415891fa69..db2cd35268c 100644 --- a/extensions/codex/src/app-server/transcript-mirror.test.ts +++ b/extensions/codex/src/app-server/transcript-mirror.test.ts @@ -1,6 +1,8 @@ +import { createHash } from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; +import type { AgentMessage } from "openclaw/plugin-sdk/agent-harness-runtime"; import { initializeGlobalHookRunner, resetGlobalHookRunner, @@ -12,7 +14,16 @@ import { makeAgentUserMessage, } from "openclaw/plugin-sdk/test-fixtures"; import { afterEach, describe, expect, it } from "vitest"; -import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js"; +import { attachCodexMirrorIdentity, mirrorCodexAppServerTranscript } from "./transcript-mirror.js"; + +type MirroredAgentMessage = Extract; + +// Mirrors transcript-mirror.ts's fallback fingerprint exactly so test +// expectations stay in sync without exposing the helper publicly. +function expectedFingerprint(message: MirroredAgentMessage): string { + const payload = JSON.stringify({ role: message.role, content: message.content }); + return createHash("sha256").update(payload).digest("hex").slice(0, 16); +} const tempDirs: string[] = []; @@ -38,20 +49,19 @@ async function makeRoot(prefix: string): Promise { describe("mirrorCodexAppServerTranscript", () => { it("mirrors user and assistant messages into the Pi transcript", async () => { const sessionFile = await createTempSessionFile(); + const userMessage = makeAgentUserMessage({ + content: [{ type: "text", text: "hello" }], + timestamp: Date.now(), + }); + const assistantMessage = makeAgentAssistantMessage({ + content: [{ type: "text", text: "hi there" }], + timestamp: Date.now() + 1, + }); await mirrorCodexAppServerTranscript({ sessionFile, sessionKey: "session-1", - messages: [ - makeAgentUserMessage({ - content: [{ type: "text", text: "hello" }], - timestamp: Date.now(), - }), - makeAgentAssistantMessage({ - content: [{ type: "text", text: "hi there" }], - timestamp: Date.now() + 1, - }), - ], + messages: [userMessage, assistantMessage], idempotencyScope: "scope-1", }); @@ -60,8 +70,10 @@ describe("mirrorCodexAppServerTranscript", () => { expect(raw).toContain('"content":[{"type":"text","text":"hello"}]'); expect(raw).toContain('"role":"assistant"'); expect(raw).toContain('"content":[{"type":"text","text":"hi there"}]'); - expect(raw).toContain('"idempotencyKey":"scope-1:user:0"'); - expect(raw).toContain('"idempotencyKey":"scope-1:assistant:1"'); + expect(raw).toContain(`"idempotencyKey":"scope-1:user:${expectedFingerprint(userMessage)}"`); + expect(raw).toContain( + `"idempotencyKey":"scope-1:assistant:${expectedFingerprint(assistantMessage)}"`, + ); }); it("creates the transcript directory on first mirror", async () => { @@ -134,22 +146,25 @@ describe("mirrorCodexAppServerTranscript", () => { ]), ); const sessionFile = await createTempSessionFile(); + const sourceMessage = makeAgentAssistantMessage({ + content: [{ type: "text", text: "hello" }], + timestamp: Date.now(), + }); await mirrorCodexAppServerTranscript({ sessionFile, sessionKey: "session-1", - messages: [ - makeAgentAssistantMessage({ - content: [{ type: "text", text: "hello" }], - timestamp: Date.now(), - }), - ], + messages: [sourceMessage], idempotencyScope: "scope-1", }); const raw = await fs.readFile(sessionFile, "utf8"); expect(raw).toContain('"content":[{"type":"text","text":"hello [hooked]"}]'); - expect(raw).toContain('"idempotencyKey":"scope-1:assistant:0"'); + // The idempotency fingerprint is derived from the pre-hook message so a + // hook rewrite cannot bypass dedupe by reshaping content on every retry. + expect(raw).toContain( + `"idempotencyKey":"scope-1:assistant:${expectedFingerprint(sourceMessage)}"`, + ); }); it("preserves the computed idempotency key when hooks rewrite message keys", async () => { @@ -167,21 +182,22 @@ describe("mirrorCodexAppServerTranscript", () => { ]), ); const sessionFile = await createTempSessionFile(); + const sourceMessage = makeAgentAssistantMessage({ + content: [{ type: "text", text: "hello" }], + timestamp: Date.now(), + }); await mirrorCodexAppServerTranscript({ sessionFile, sessionKey: "session-1", - messages: [ - makeAgentAssistantMessage({ - content: [{ type: "text", text: "hello" }], - timestamp: Date.now(), - }), - ], + messages: [sourceMessage], idempotencyScope: "scope-1", }); const raw = await fs.readFile(sessionFile, "utf8"); - expect(raw).toContain('"idempotencyKey":"scope-1:assistant:0"'); + expect(raw).toContain( + `"idempotencyKey":"scope-1:assistant:${expectedFingerprint(sourceMessage)}"`, + ); expect(raw).not.toContain("hook-rewritten-key"); }); @@ -262,4 +278,226 @@ describe("mirrorCodexAppServerTranscript", () => { expect(records[0]).toMatchObject({ id: "legacy-user", parentId: null }); expect(records[1]).toMatchObject({ parentId: "legacy-user" }); }); + + // Helpers for the identity-based regression tests below. + // + // The mirror dedupe key is now `${idempotencyScope}:${identity}`, where + // `identity` is either an explicit `attachCodexMirrorIdentity` tag (the + // production path; event-projector emits `${turnId}:${kind}`) or the + // role/content fingerprint fallback (legacy callers). + type FileMessage = { + type?: string; + message?: { role?: string; content?: Array<{ text?: string }> }; + }; + function readFileMessages(raw: string): Array<{ role?: string; text?: string }> { + return raw + .trim() + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as FileMessage) + .filter((record) => record.type === "message") + .map((record) => ({ + role: record.message?.role, + text: record.message?.content?.[0]?.text, + })); + } + + // Regression for #77012 (within-turn snapshot reordering). When mirror is + // invoked twice under the same scope/turn but the second snapshot inserts + // a reasoning record between the user prompt and the assistant reply, + // every assistant-role record after the inserted slot shifts. With the + // previous `:role:index` key, the second call's reasoning record collided + // with the first call's assistant key (both `:assistant:1`) — the + // legitimately-new reasoning entry was silently dropped, and the + // assistant content was re-appended under `:assistant:2`, producing a + // duplicate assistant entry. The identity-based key (event-projector + // tags `${turnId}:reasoning` and `${turnId}:assistant`) makes each kind + // its own dedupe slot. + it("dedupes mirrored messages despite snapshot positional shifts", async () => { + const sessionFile = await createTempSessionFile(); + const userMessage = attachCodexMirrorIdentity( + makeAgentUserMessage({ + content: [{ type: "text", text: "hello" }], + timestamp: Date.now(), + }), + "turn-1:prompt", + ); + const assistantMessage = attachCodexMirrorIdentity( + makeAgentAssistantMessage({ + content: [{ type: "text", text: "hi there" }], + timestamp: Date.now() + 1, + }), + "turn-1:assistant", + ); + + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [userMessage, assistantMessage], + idempotencyScope: "codex-app-server:thread-X", + }); + const reasoningMessage = attachCodexMirrorIdentity( + makeAgentAssistantMessage({ + content: [{ type: "text", text: "[Codex reasoning] thinking" }], + timestamp: Date.now() + 2, + }), + "turn-1:reasoning", + ); + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [userMessage, reasoningMessage, assistantMessage], + idempotencyScope: "codex-app-server:thread-X", + }); + + const messageTexts = readFileMessages(await fs.readFile(sessionFile, "utf8")).map( + (m) => m.text, + ); + expect(messageTexts).toEqual(["hello", "hi there", "[Codex reasoning] thinking"]); + }); + + // Two distinct turns where the user types the same thing must not collapse: + // each entry carries its own `${turnId}:${kind}` identity so the dedupe + // key differs even when role+content match. (Prior content-fingerprint-only + // designs would have collapsed the second user turn here.) + it("keeps repeated same-content turns distinct", async () => { + const sessionFile = await createTempSessionFile(); + const userTurn1 = attachCodexMirrorIdentity( + makeAgentUserMessage({ + content: [{ type: "text", text: "yes" }], + timestamp: Date.now(), + }), + "turn-1:prompt", + ); + const assistantTurn1 = attachCodexMirrorIdentity( + makeAgentAssistantMessage({ + content: [{ type: "text", text: "ok 1" }], + timestamp: Date.now() + 1, + }), + "turn-1:assistant", + ); + const userTurn2 = attachCodexMirrorIdentity( + makeAgentUserMessage({ + content: [{ type: "text", text: "yes" }], + timestamp: Date.now() + 2, + }), + "turn-2:prompt", + ); + const assistantTurn2 = attachCodexMirrorIdentity( + makeAgentAssistantMessage({ + content: [{ type: "text", text: "ok 2" }], + timestamp: Date.now() + 3, + }), + "turn-2:assistant", + ); + + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [userTurn1, assistantTurn1], + idempotencyScope: "codex-app-server:thread-X", + }); + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [userTurn2, assistantTurn2], + idempotencyScope: "codex-app-server:thread-X", + }); + + expect(readFileMessages(await fs.readFile(sessionFile, "utf8"))).toEqual([ + { role: "user", text: "yes" }, + { role: "assistant", text: "ok 1" }, + { role: "user", text: "yes" }, + { role: "assistant", text: "ok 2" }, + ]); + }); + + // Cross-turn re-emit: an entry first written under turn 1 may be re-emitted + // as part of a later turn's snapshot (e.g. a context-engine flow that + // bundles prior history). Because every entry carries its own original + // `${turnId}:${kind}` identity, the re-emitted entries collide with their + // existing on-disk keys and become true no-ops — instead of being + // appended again on a sibling branch (the on-disk symptom in #77012). + it("dedupes prior-turn entries re-emitted into a later turn's snapshot", async () => { + const sessionFile = await createTempSessionFile(); + const userTurn1 = attachCodexMirrorIdentity( + makeAgentUserMessage({ + content: [{ type: "text", text: "msg1" }], + timestamp: Date.now(), + }), + "turn-1:prompt", + ); + const assistantTurn1 = attachCodexMirrorIdentity( + makeAgentAssistantMessage({ + content: [{ type: "text", text: "reply1" }], + timestamp: Date.now() + 1, + }), + "turn-1:assistant", + ); + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [userTurn1, assistantTurn1], + idempotencyScope: "codex-app-server:thread-X", + }); + + const userTurn2 = attachCodexMirrorIdentity( + makeAgentUserMessage({ + content: [{ type: "text", text: "msg2" }], + timestamp: Date.now() + 2, + }), + "turn-2:prompt", + ); + const assistantTurn2 = attachCodexMirrorIdentity( + makeAgentAssistantMessage({ + content: [{ type: "text", text: "reply2" }], + timestamp: Date.now() + 3, + }), + "turn-2:assistant", + ); + // Buggy upstream: snapshot for turn 2 also includes the just-completed + // turn 1's entries (with their original identities preserved). + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [userTurn1, assistantTurn1, userTurn2, assistantTurn2], + idempotencyScope: "codex-app-server:thread-X", + }); + + expect(readFileMessages(await fs.readFile(sessionFile, "utf8"))).toEqual([ + { role: "user", text: "msg1" }, + { role: "assistant", text: "reply1" }, + { role: "user", text: "msg2" }, + { role: "assistant", text: "reply2" }, + ]); + }); + + // Backward-compat: callers that do not tag messages with a mirror identity + // (e.g. third-party harnesses or tests routed through the legacy path) + // still get the role/content fingerprint key. Distinct turns are then + // distinguished by the caller's idempotency scope. + it("falls back to the role+content fingerprint when no identity is attached", async () => { + const sessionFile = await createTempSessionFile(); + const userMessage = makeAgentUserMessage({ + content: [{ type: "text", text: "hello" }], + timestamp: Date.now(), + }); + const assistantMessage = makeAgentAssistantMessage({ + content: [{ type: "text", text: "hi there" }], + timestamp: Date.now() + 1, + }); + + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [userMessage, assistantMessage], + idempotencyScope: "scope-1", + }); + + const raw = await fs.readFile(sessionFile, "utf8"); + expect(raw).toContain(`"idempotencyKey":"scope-1:user:${expectedFingerprint(userMessage)}"`); + expect(raw).toContain( + `"idempotencyKey":"scope-1:assistant:${expectedFingerprint(assistantMessage)}"`, + ); + }); }); diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index 4b445cda106..a96cd0c3782 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import fs from "node:fs/promises"; import { acquireSessionWriteLock, @@ -9,6 +10,61 @@ import { type SessionWriteLockAcquireTimeoutConfig, } from "openclaw/plugin-sdk/agent-harness-runtime"; +type MirroredAgentMessage = Extract; + +const MIRROR_IDENTITY_META_KEY = "mirrorIdentity" as const; + +/** + * Tag a message with a stable logical identity for mirror dedupe. Callers + * should use a value that is invariant for the same logical message across + * re-emits (e.g. `${turnId}:prompt`, `${turnId}:assistant`) but distinct + * for genuinely-distinct messages (different turns, different kinds). When + * present this identity replaces the role/content fingerprint in the + * idempotency key, so the dedupe survives caller-scope rotation without + * collapsing distinct same-content turns. + */ +export function attachCodexMirrorIdentity(message: T, identity: string): T { + const record = message as unknown as Record; + const existing = record.__openclaw; + const baseMeta = + existing && typeof existing === "object" && !Array.isArray(existing) + ? (existing as Record) + : {}; + return { + ...record, + __openclaw: { ...baseMeta, [MIRROR_IDENTITY_META_KEY]: identity }, + } as unknown as T; +} + +function readMirrorIdentity(message: MirroredAgentMessage): string | undefined { + const record = message as unknown as { __openclaw?: unknown }; + const meta = record.__openclaw; + if (!meta || typeof meta !== "object" || Array.isArray(meta)) { + return undefined; + } + const id = (meta as Record)[MIRROR_IDENTITY_META_KEY]; + return typeof id === "string" && id.length > 0 ? id : undefined; +} + +// Fallback content fingerprint for callers that did not tag the message +// with a stable mirror identity. Only role and content participate; volatile +// metadata (timestamps, usage, etc.) is intentionally excluded so the +// fingerprint survives snapshot reordering inside a fixed scope. Distinct +// same-content turns are still distinguished by the caller's idempotency +// scope when callers route through this fallback. +function fingerprintMirrorMessageContent(message: MirroredAgentMessage): string { + const payload = JSON.stringify({ role: message.role, content: message.content }); + return createHash("sha256").update(payload).digest("hex").slice(0, 16); +} + +function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string { + const explicit = readMirrorIdentity(message); + if (explicit) { + return explicit; + } + return `${message.role}:${fingerprintMirrorMessageContent(message)}`; +} + export async function mirrorCodexAppServerTranscript(params: { sessionFile: string; sessionKey?: string; @@ -18,7 +74,8 @@ export async function mirrorCodexAppServerTranscript(params: { config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { const messages = params.messages.filter( - (message) => message.role === "user" || message.role === "assistant", + (message): message is MirroredAgentMessage => + message.role === "user" || message.role === "assistant", ); if (messages.length === 0) { return; @@ -30,9 +87,10 @@ export async function mirrorCodexAppServerTranscript(params: { }); try { const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile); - for (const [index, message] of messages.entries()) { + for (const message of messages) { + const dedupeIdentity = buildMirrorDedupeIdentity(message); const idempotencyKey = params.idempotencyScope - ? `${params.idempotencyScope}:${message.role}:${index}` + ? `${params.idempotencyScope}:${dedupeIdentity}` : undefined; if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) { continue;