mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:30:43 +00:00
fix(codex/app-server): stable mirror idempotency to prevent transcript loss (#77046)
* fix(codex/app-server): stable mirror idempotency to prevent transcript loss * Changelog: note codex/app-server transcript mirror dedupe stabilization (#77046)
This commit is contained in:
@@ -546,6 +546,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins/update: keep externalized bundled npm bridge updates on the normal plugin security scanner path instead of granting source-linked official trust without artifact provenance. (#76765) Thanks @Lucenx9.
|
||||
- Agents/reply context: label replied-to messages as the current user message target in model-visible metadata, so short replies are grounded to their explicit reply target instead of nearby chat history. (#76817) Thanks @obviyus.
|
||||
- Doctor/plugins: install configured missing official plugins such as Discord and Brave during doctor/update repair, auto-enable repaired provider plugins, preserve config when a download fails, and stop auto-enable from inventing plugin entries when no manifest declares a configured channel. Fixes #76872. Thanks @jack-stormentswe.
|
||||
- Codex/app-server: stabilize transcript mirror dedupe across re-mirrored turns so reordered snapshots no longer drop reasoning entries or duplicate the assistant reply. Refs #77012. (#77046) Thanks @openperf.
|
||||
|
||||
## 2026.5.2
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
type JsonValue,
|
||||
} from "./protocol.js";
|
||||
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
|
||||
import { attachCodexMirrorIdentity } from "./transcript-mirror.js";
|
||||
|
||||
export type CodexAppServerToolTelemetry = {
|
||||
didSendViaMessagingTool: boolean;
|
||||
@@ -185,23 +186,47 @@ export class CodexAppServerEventProjector {
|
||||
assistantTexts.length > 0
|
||||
? this.createAssistantMessage(assistantTexts.join("\n\n"))
|
||||
: undefined;
|
||||
// Each snapshot entry is tagged with a stable mirror identity of the
|
||||
// shape `${turnId}:${kind}`. The mirror's idempotency key is derived
|
||||
// from this identity rather than from snapshot position or content
|
||||
// hash, so:
|
||||
// - Re-mirror of the same turn (retry) → same identity → no-op.
|
||||
// - Re-emit of a prior turn's entry into a later turn's snapshot
|
||||
// (the cross-turn drift mode named in #77012) → original identity
|
||||
// is preserved → on-disk key still matches → also a no-op.
|
||||
// - Two distinct turns where the user repeats verbatim content →
|
||||
// distinct turnIds → distinct identities → both kept.
|
||||
const turnId = this.turnId;
|
||||
const messagesSnapshot: AgentMessage[] = [
|
||||
{
|
||||
role: "user",
|
||||
content: this.params.prompt,
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
attachCodexMirrorIdentity(
|
||||
{
|
||||
role: "user",
|
||||
content: this.params.prompt,
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
`${turnId}:prompt`,
|
||||
),
|
||||
];
|
||||
// Codex owns the canonical thread. These mirror records keep enough local
|
||||
// context for OpenClaw history, search, and future harness switching.
|
||||
if (reasoningText) {
|
||||
messagesSnapshot.push(this.createAssistantMirrorMessage("Codex reasoning", reasoningText));
|
||||
messagesSnapshot.push(
|
||||
attachCodexMirrorIdentity(
|
||||
this.createAssistantMirrorMessage("Codex reasoning", reasoningText),
|
||||
`${turnId}:reasoning`,
|
||||
),
|
||||
);
|
||||
}
|
||||
if (planText) {
|
||||
messagesSnapshot.push(this.createAssistantMirrorMessage("Codex plan", planText));
|
||||
messagesSnapshot.push(
|
||||
attachCodexMirrorIdentity(
|
||||
this.createAssistantMirrorMessage("Codex plan", planText),
|
||||
`${turnId}:plan`,
|
||||
),
|
||||
);
|
||||
}
|
||||
if (lastAssistant) {
|
||||
messagesSnapshot.push(lastAssistant);
|
||||
messagesSnapshot.push(attachCodexMirrorIdentity(lastAssistant, `${turnId}:assistant`));
|
||||
}
|
||||
const turnFailed = this.completedTurn?.status === "failed";
|
||||
const turnInterrupted = this.completedTurn?.status === "interrupted";
|
||||
|
||||
@@ -1822,7 +1822,14 @@ async function mirrorTranscriptBestEffort(params: {
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
messages: params.result.messagesSnapshot,
|
||||
idempotencyScope: `codex-app-server:${params.threadId}:${params.turnId}`,
|
||||
// Scope is thread-stable. Each entry in `messagesSnapshot` is tagged
|
||||
// with a per-turn `attachCodexMirrorIdentity` value carrying its own
|
||||
// turnId, so distinct turns produce distinct dedupe keys via the
|
||||
// identity (not via the scope). Dropping `turnId` from the scope
|
||||
// here is what lets a re-emitted prior-turn entry — which still
|
||||
// carries its original `${turnId}:${kind}` identity — collide with
|
||||
// its existing on-disk key and be a true no-op.
|
||||
idempotencyScope: `codex-app-server:${params.threadId}`,
|
||||
config: params.params.config,
|
||||
});
|
||||
} catch (error) {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { AgentMessage } from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
import {
|
||||
initializeGlobalHookRunner,
|
||||
resetGlobalHookRunner,
|
||||
@@ -12,7 +14,16 @@ import {
|
||||
makeAgentUserMessage,
|
||||
} from "openclaw/plugin-sdk/test-fixtures";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
|
||||
import { attachCodexMirrorIdentity, mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
|
||||
|
||||
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" }>;
|
||||
|
||||
// Mirrors transcript-mirror.ts's fallback fingerprint exactly so test
|
||||
// expectations stay in sync without exposing the helper publicly.
|
||||
function expectedFingerprint(message: MirroredAgentMessage): string {
|
||||
const payload = JSON.stringify({ role: message.role, content: message.content });
|
||||
return createHash("sha256").update(payload).digest("hex").slice(0, 16);
|
||||
}
|
||||
|
||||
const tempDirs: string[] = [];
|
||||
|
||||
@@ -38,20 +49,19 @@ async function makeRoot(prefix: string): Promise<string> {
|
||||
describe("mirrorCodexAppServerTranscript", () => {
|
||||
it("mirrors user and assistant messages into the Pi transcript", async () => {
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const userMessage = makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
const assistantMessage = makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hi there" }],
|
||||
timestamp: Date.now() + 1,
|
||||
});
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hi there" }],
|
||||
timestamp: Date.now() + 1,
|
||||
}),
|
||||
],
|
||||
messages: [userMessage, assistantMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
@@ -60,8 +70,10 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
expect(raw).toContain('"content":[{"type":"text","text":"hello"}]');
|
||||
expect(raw).toContain('"role":"assistant"');
|
||||
expect(raw).toContain('"content":[{"type":"text","text":"hi there"}]');
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:user:0"');
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:1"');
|
||||
expect(raw).toContain(`"idempotencyKey":"scope-1:user:${expectedFingerprint(userMessage)}"`);
|
||||
expect(raw).toContain(
|
||||
`"idempotencyKey":"scope-1:assistant:${expectedFingerprint(assistantMessage)}"`,
|
||||
);
|
||||
});
|
||||
|
||||
it("creates the transcript directory on first mirror", async () => {
|
||||
@@ -134,22 +146,25 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
]),
|
||||
);
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const sourceMessage = makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
],
|
||||
messages: [sourceMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
const raw = await fs.readFile(sessionFile, "utf8");
|
||||
expect(raw).toContain('"content":[{"type":"text","text":"hello [hooked]"}]');
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:0"');
|
||||
// The idempotency fingerprint is derived from the pre-hook message so a
|
||||
// hook rewrite cannot bypass dedupe by reshaping content on every retry.
|
||||
expect(raw).toContain(
|
||||
`"idempotencyKey":"scope-1:assistant:${expectedFingerprint(sourceMessage)}"`,
|
||||
);
|
||||
});
|
||||
|
||||
it("preserves the computed idempotency key when hooks rewrite message keys", async () => {
|
||||
@@ -167,21 +182,22 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
]),
|
||||
);
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const sourceMessage = makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
],
|
||||
messages: [sourceMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
const raw = await fs.readFile(sessionFile, "utf8");
|
||||
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:0"');
|
||||
expect(raw).toContain(
|
||||
`"idempotencyKey":"scope-1:assistant:${expectedFingerprint(sourceMessage)}"`,
|
||||
);
|
||||
expect(raw).not.toContain("hook-rewritten-key");
|
||||
});
|
||||
|
||||
@@ -262,4 +278,226 @@ describe("mirrorCodexAppServerTranscript", () => {
|
||||
expect(records[0]).toMatchObject({ id: "legacy-user", parentId: null });
|
||||
expect(records[1]).toMatchObject({ parentId: "legacy-user" });
|
||||
});
|
||||
|
||||
// Helpers for the identity-based regression tests below.
|
||||
//
|
||||
// The mirror dedupe key is now `${idempotencyScope}:${identity}`, where
|
||||
// `identity` is either an explicit `attachCodexMirrorIdentity` tag (the
|
||||
// production path; event-projector emits `${turnId}:${kind}`) or the
|
||||
// role/content fingerprint fallback (legacy callers).
|
||||
type FileMessage = {
|
||||
type?: string;
|
||||
message?: { role?: string; content?: Array<{ text?: string }> };
|
||||
};
|
||||
function readFileMessages(raw: string): Array<{ role?: string; text?: string }> {
|
||||
return raw
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter(Boolean)
|
||||
.map((line) => JSON.parse(line) as FileMessage)
|
||||
.filter((record) => record.type === "message")
|
||||
.map((record) => ({
|
||||
role: record.message?.role,
|
||||
text: record.message?.content?.[0]?.text,
|
||||
}));
|
||||
}
|
||||
|
||||
// Regression for #77012 (within-turn snapshot reordering). When mirror is
|
||||
// invoked twice under the same scope/turn but the second snapshot inserts
|
||||
// a reasoning record between the user prompt and the assistant reply,
|
||||
// every assistant-role record after the inserted slot shifts. With the
|
||||
// previous `:role:index` key, the second call's reasoning record collided
|
||||
// with the first call's assistant key (both `:assistant:1`) — the
|
||||
// legitimately-new reasoning entry was silently dropped, and the
|
||||
// assistant content was re-appended under `:assistant:2`, producing a
|
||||
// duplicate assistant entry. The identity-based key (event-projector
|
||||
// tags `${turnId}:reasoning` and `${turnId}:assistant`) makes each kind
|
||||
// its own dedupe slot.
|
||||
it("dedupes mirrored messages despite snapshot positional shifts", async () => {
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const userMessage = attachCodexMirrorIdentity(
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
"turn-1:prompt",
|
||||
);
|
||||
const assistantMessage = attachCodexMirrorIdentity(
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hi there" }],
|
||||
timestamp: Date.now() + 1,
|
||||
}),
|
||||
"turn-1:assistant",
|
||||
);
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, assistantMessage],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
});
|
||||
const reasoningMessage = attachCodexMirrorIdentity(
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "[Codex reasoning] thinking" }],
|
||||
timestamp: Date.now() + 2,
|
||||
}),
|
||||
"turn-1:reasoning",
|
||||
);
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, reasoningMessage, assistantMessage],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
});
|
||||
|
||||
const messageTexts = readFileMessages(await fs.readFile(sessionFile, "utf8")).map(
|
||||
(m) => m.text,
|
||||
);
|
||||
expect(messageTexts).toEqual(["hello", "hi there", "[Codex reasoning] thinking"]);
|
||||
});
|
||||
|
||||
// Two distinct turns where the user types the same thing must not collapse:
|
||||
// each entry carries its own `${turnId}:${kind}` identity so the dedupe
|
||||
// key differs even when role+content match. (Prior content-fingerprint-only
|
||||
// designs would have collapsed the second user turn here.)
|
||||
it("keeps repeated same-content turns distinct", async () => {
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const userTurn1 = attachCodexMirrorIdentity(
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "yes" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
"turn-1:prompt",
|
||||
);
|
||||
const assistantTurn1 = attachCodexMirrorIdentity(
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "ok 1" }],
|
||||
timestamp: Date.now() + 1,
|
||||
}),
|
||||
"turn-1:assistant",
|
||||
);
|
||||
const userTurn2 = attachCodexMirrorIdentity(
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "yes" }],
|
||||
timestamp: Date.now() + 2,
|
||||
}),
|
||||
"turn-2:prompt",
|
||||
);
|
||||
const assistantTurn2 = attachCodexMirrorIdentity(
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "ok 2" }],
|
||||
timestamp: Date.now() + 3,
|
||||
}),
|
||||
"turn-2:assistant",
|
||||
);
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn1, assistantTurn1],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
});
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn2, assistantTurn2],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
});
|
||||
|
||||
expect(readFileMessages(await fs.readFile(sessionFile, "utf8"))).toEqual([
|
||||
{ role: "user", text: "yes" },
|
||||
{ role: "assistant", text: "ok 1" },
|
||||
{ role: "user", text: "yes" },
|
||||
{ role: "assistant", text: "ok 2" },
|
||||
]);
|
||||
});
|
||||
|
||||
// Cross-turn re-emit: an entry first written under turn 1 may be re-emitted
|
||||
// as part of a later turn's snapshot (e.g. a context-engine flow that
|
||||
// bundles prior history). Because every entry carries its own original
|
||||
// `${turnId}:${kind}` identity, the re-emitted entries collide with their
|
||||
// existing on-disk keys and become true no-ops — instead of being
|
||||
// appended again on a sibling branch (the on-disk symptom in #77012).
|
||||
it("dedupes prior-turn entries re-emitted into a later turn's snapshot", async () => {
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const userTurn1 = attachCodexMirrorIdentity(
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "msg1" }],
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
"turn-1:prompt",
|
||||
);
|
||||
const assistantTurn1 = attachCodexMirrorIdentity(
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "reply1" }],
|
||||
timestamp: Date.now() + 1,
|
||||
}),
|
||||
"turn-1:assistant",
|
||||
);
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn1, assistantTurn1],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
});
|
||||
|
||||
const userTurn2 = attachCodexMirrorIdentity(
|
||||
makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "msg2" }],
|
||||
timestamp: Date.now() + 2,
|
||||
}),
|
||||
"turn-2:prompt",
|
||||
);
|
||||
const assistantTurn2 = attachCodexMirrorIdentity(
|
||||
makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "reply2" }],
|
||||
timestamp: Date.now() + 3,
|
||||
}),
|
||||
"turn-2:assistant",
|
||||
);
|
||||
// Buggy upstream: snapshot for turn 2 also includes the just-completed
|
||||
// turn 1's entries (with their original identities preserved).
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [userTurn1, assistantTurn1, userTurn2, assistantTurn2],
|
||||
idempotencyScope: "codex-app-server:thread-X",
|
||||
});
|
||||
|
||||
expect(readFileMessages(await fs.readFile(sessionFile, "utf8"))).toEqual([
|
||||
{ role: "user", text: "msg1" },
|
||||
{ role: "assistant", text: "reply1" },
|
||||
{ role: "user", text: "msg2" },
|
||||
{ role: "assistant", text: "reply2" },
|
||||
]);
|
||||
});
|
||||
|
||||
// Backward-compat: callers that do not tag messages with a mirror identity
|
||||
// (e.g. third-party harnesses or tests routed through the legacy path)
|
||||
// still get the role/content fingerprint key. Distinct turns are then
|
||||
// distinguished by the caller's idempotency scope.
|
||||
it("falls back to the role+content fingerprint when no identity is attached", async () => {
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const userMessage = makeAgentUserMessage({
|
||||
content: [{ type: "text", text: "hello" }],
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
const assistantMessage = makeAgentAssistantMessage({
|
||||
content: [{ type: "text", text: "hi there" }],
|
||||
timestamp: Date.now() + 1,
|
||||
});
|
||||
|
||||
await mirrorCodexAppServerTranscript({
|
||||
sessionFile,
|
||||
sessionKey: "session-1",
|
||||
messages: [userMessage, assistantMessage],
|
||||
idempotencyScope: "scope-1",
|
||||
});
|
||||
|
||||
const raw = await fs.readFile(sessionFile, "utf8");
|
||||
expect(raw).toContain(`"idempotencyKey":"scope-1:user:${expectedFingerprint(userMessage)}"`);
|
||||
expect(raw).toContain(
|
||||
`"idempotencyKey":"scope-1:assistant:${expectedFingerprint(assistantMessage)}"`,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
@@ -9,6 +10,61 @@ import {
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
} from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
|
||||
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" }>;
|
||||
|
||||
const MIRROR_IDENTITY_META_KEY = "mirrorIdentity" as const;
|
||||
|
||||
/**
|
||||
* Tag a message with a stable logical identity for mirror dedupe. Callers
|
||||
* should use a value that is invariant for the same logical message across
|
||||
* re-emits (e.g. `${turnId}:prompt`, `${turnId}:assistant`) but distinct
|
||||
* for genuinely-distinct messages (different turns, different kinds). When
|
||||
* present this identity replaces the role/content fingerprint in the
|
||||
* idempotency key, so the dedupe survives caller-scope rotation without
|
||||
* collapsing distinct same-content turns.
|
||||
*/
|
||||
export function attachCodexMirrorIdentity<T extends AgentMessage>(message: T, identity: string): T {
|
||||
const record = message as unknown as Record<string, unknown>;
|
||||
const existing = record.__openclaw;
|
||||
const baseMeta =
|
||||
existing && typeof existing === "object" && !Array.isArray(existing)
|
||||
? (existing as Record<string, unknown>)
|
||||
: {};
|
||||
return {
|
||||
...record,
|
||||
__openclaw: { ...baseMeta, [MIRROR_IDENTITY_META_KEY]: identity },
|
||||
} as unknown as T;
|
||||
}
|
||||
|
||||
function readMirrorIdentity(message: MirroredAgentMessage): string | undefined {
|
||||
const record = message as unknown as { __openclaw?: unknown };
|
||||
const meta = record.__openclaw;
|
||||
if (!meta || typeof meta !== "object" || Array.isArray(meta)) {
|
||||
return undefined;
|
||||
}
|
||||
const id = (meta as Record<string, unknown>)[MIRROR_IDENTITY_META_KEY];
|
||||
return typeof id === "string" && id.length > 0 ? id : undefined;
|
||||
}
|
||||
|
||||
// Fallback content fingerprint for callers that did not tag the message
|
||||
// with a stable mirror identity. Only role and content participate; volatile
|
||||
// metadata (timestamps, usage, etc.) is intentionally excluded so the
|
||||
// fingerprint survives snapshot reordering inside a fixed scope. Distinct
|
||||
// same-content turns are still distinguished by the caller's idempotency
|
||||
// scope when callers route through this fallback.
|
||||
function fingerprintMirrorMessageContent(message: MirroredAgentMessage): string {
|
||||
const payload = JSON.stringify({ role: message.role, content: message.content });
|
||||
return createHash("sha256").update(payload).digest("hex").slice(0, 16);
|
||||
}
|
||||
|
||||
function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string {
|
||||
const explicit = readMirrorIdentity(message);
|
||||
if (explicit) {
|
||||
return explicit;
|
||||
}
|
||||
return `${message.role}:${fingerprintMirrorMessageContent(message)}`;
|
||||
}
|
||||
|
||||
export async function mirrorCodexAppServerTranscript(params: {
|
||||
sessionFile: string;
|
||||
sessionKey?: string;
|
||||
@@ -18,7 +74,8 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<void> {
|
||||
const messages = params.messages.filter(
|
||||
(message) => message.role === "user" || message.role === "assistant",
|
||||
(message): message is MirroredAgentMessage =>
|
||||
message.role === "user" || message.role === "assistant",
|
||||
);
|
||||
if (messages.length === 0) {
|
||||
return;
|
||||
@@ -30,9 +87,10 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
});
|
||||
try {
|
||||
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
|
||||
for (const [index, message] of messages.entries()) {
|
||||
for (const message of messages) {
|
||||
const dedupeIdentity = buildMirrorDedupeIdentity(message);
|
||||
const idempotencyKey = params.idempotencyScope
|
||||
? `${params.idempotencyScope}:${message.role}:${index}`
|
||||
? `${params.idempotencyScope}:${dedupeIdentity}`
|
||||
: undefined;
|
||||
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
|
||||
continue;
|
||||
|
||||
Reference in New Issue
Block a user