fix: prune replay control messages

This commit is contained in:
Peter Steinberger
2026-05-15 18:33:36 +01:00
parent c7dcf79585
commit daef8e73fc
13 changed files with 279 additions and 40 deletions

View File

@@ -1,3 +1,4 @@
import { sanitizePendingFinalDeliveryText } from "../auto-reply/reply/pending-final-delivery.js";
import {
formatThinkingLevels,
isThinkingLevelSupported,
@@ -1323,10 +1324,12 @@ async function agentCommandInternal(
!isSubagentSessionKey(sessionKey)
) {
const now = Date.now();
const combinedPayload = payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n");
const combinedPayload = sanitizePendingFinalDeliveryText(
payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n"),
);
if (combinedPayload) {
const entry = sessionStore[sessionKey] ?? sessionEntry;

View File

@@ -4,6 +4,10 @@ import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { loadSessionStore, type SessionEntry } from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import {
INTERNAL_RUNTIME_CONTEXT_BEGIN,
INTERNAL_RUNTIME_CONTEXT_END,
} from "./internal-runtime-context.js";
import {
markRestartAbortedMainSessionsFromLocks,
recoverRestartAbortedMainSessions,
@@ -330,6 +334,47 @@ describe("main-session-restart-recovery", () => {
);
});
it("sanitizes durable pending final delivery payloads before resume prompts", async () => {
const sessionsDir = await makeSessionsDir();
const pendingPayload = [
"The final answer is 42.",
INTERNAL_RUNTIME_CONTEXT_BEGIN,
"internal recovery detail",
INTERNAL_RUNTIME_CONTEXT_END,
"",
"Conversation info (untrusted metadata):",
"```json",
'{"message_id":"msg-1"}',
"```",
].join("\n");
await writeStore(sessionsDir, {
"agent:main:main": {
sessionId: "main-session",
updatedAt: Date.now() - 10_000,
status: "running",
abortedLastRun: true,
pendingFinalDelivery: true,
pendingFinalDeliveryText: pendingPayload,
pendingFinalDeliveryCreatedAt: Date.now() - 5_000,
},
});
await writeTranscript(sessionsDir, "main-session", [
{ role: "user", content: "calculate the answer" },
{ role: "assistant", content: [{ type: "toolCall", id: "call-1", name: "calc" }] },
{ role: "toolResult", content: "42" },
]);
const result = await recoverRestartAbortedMainSessions({ stateDir: tmpDir });
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
expect(firstGatewayParams().message).toContain("The final answer is 42.");
expect(firstGatewayParams().message).not.toContain(INTERNAL_RUNTIME_CONTEXT_BEGIN);
expect(firstGatewayParams().message).not.toContain("Conversation info");
const store = loadSessionStore(path.join(sessionsDir, "sessions.json"));
expect(store["agent:main:main"]?.pendingFinalDeliveryText).toBe("The final answer is 42.");
});
it("does not scan ordinary running sessions without the restart-aborted marker", async () => {
const sessionsDir = await makeSessionsDir();
await writeStore(sessionsDir, {

View File

@@ -5,6 +5,7 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { sanitizePendingFinalDeliveryText } from "../auto-reply/reply/pending-final-delivery.js";
import { resolveStateDir } from "../config/paths.js";
import {
type SessionEntry,
@@ -121,8 +122,12 @@ function buildResumeMessage(pendingFinalDeliveryText?: string | null): string {
"[System] Your previous turn was interrupted by a gateway restart while " +
"OpenClaw was waiting on tool/model work. Continue from the existing " +
"transcript and finish the interrupted response.";
if (pendingFinalDeliveryText) {
return `${base}\n\nNote: The interrupted final reply was captured: "${pendingFinalDeliveryText}"`;
const sanitizedPendingText =
typeof pendingFinalDeliveryText === "string"
? sanitizePendingFinalDeliveryText(pendingFinalDeliveryText)
: "";
if (sanitizedPendingText) {
return `${base}\n\nNote: The interrupted final reply was captured: "${sanitizedPendingText}"`;
}
return base;
}
@@ -162,11 +167,15 @@ async function resumeMainSession(params: {
sessionKey: string;
pendingFinalDeliveryText?: string | null;
}): Promise<boolean> {
const sanitizedPendingText =
typeof params.pendingFinalDeliveryText === "string"
? sanitizePendingFinalDeliveryText(params.pendingFinalDeliveryText)
: "";
try {
await callGateway<{ runId: string }>({
method: "agent",
params: {
message: buildResumeMessage(params.pendingFinalDeliveryText),
message: buildResumeMessage(sanitizedPendingText),
sessionKey: params.sessionKey,
idempotencyKey: crypto.randomUUID(),
deliver: false,
@@ -185,10 +194,21 @@ async function resumeMainSession(params: {
entry.abortedLastRun = false;
entry.updatedAt = now;
if (entry.pendingFinalDelivery || entry.pendingFinalDeliveryText) {
entry.pendingFinalDeliveryLastAttemptAt = now;
entry.pendingFinalDeliveryAttemptCount =
(entry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
entry.pendingFinalDeliveryLastError = null;
if (sanitizedPendingText) {
entry.pendingFinalDeliveryLastAttemptAt = now;
entry.pendingFinalDeliveryAttemptCount =
(entry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
entry.pendingFinalDeliveryLastError = null;
entry.pendingFinalDeliveryText = sanitizedPendingText;
} else {
entry.pendingFinalDelivery = undefined;
entry.pendingFinalDeliveryText = undefined;
entry.pendingFinalDeliveryCreatedAt = undefined;
entry.pendingFinalDeliveryLastAttemptAt = undefined;
entry.pendingFinalDeliveryAttemptCount = undefined;
entry.pendingFinalDeliveryLastError = undefined;
entry.pendingFinalDeliveryContext = undefined;
}
}
store[params.sessionKey] = entry;
},
@@ -196,7 +216,7 @@ async function resumeMainSession(params: {
);
log.info(
`resumed interrupted main session: ${params.sessionKey}${
params.pendingFinalDeliveryText ? " (with pending payload)" : ""
sanitizedPendingText ? " (with pending payload)" : ""
}`,
);
return true;

View File

@@ -1,5 +1,11 @@
import type { AgentMessage } from "@earendil-works/pi-agent-core";
import { describe, expect, it } from "vitest";
import {
INTERNAL_RUNTIME_CONTEXT_BEGIN,
INTERNAL_RUNTIME_CONTEXT_END,
OPENCLAW_NEXT_TURN_RUNTIME_CONTEXT_HEADER,
OPENCLAW_RUNTIME_CONTEXT_NOTICE,
} from "../internal-runtime-context.js";
import { normalizeAssistantReplayContent } from "./replay-history.js";
const FALLBACK_TEXT = "[assistant turn failed before producing content]";
@@ -152,6 +158,36 @@ describe("normalizeAssistantReplayContent", () => {
expect(JSON.stringify(out)).not.toContain("assistant copied inbound metadata omitted");
});
it("drops standalone silent assistant replay text", () => {
const messages = [userMessage("first"), bedrockAssistant("NO_REPLY"), userMessage("second")];
const out = normalizeAssistantReplayContent(messages);
expect(out).toEqual([messages[0], messages[2]]);
});
it("strips copied runtime context from assistant replay text", () => {
const messages = [
userMessage("first"),
bedrockAssistant([
{
type: "text",
text: [
"Visible before",
INTERNAL_RUNTIME_CONTEXT_BEGIN,
"keep this internal",
INTERNAL_RUNTIME_CONTEXT_END,
OPENCLAW_NEXT_TURN_RUNTIME_CONTEXT_HEADER,
OPENCLAW_RUNTIME_CONTEXT_NOTICE,
"",
"Visible after",
].join("\n"),
},
]),
];
const out = normalizeAssistantReplayContent(messages);
const normalized = out[1] as AgentMessage & { content: unknown[] };
expect(normalized.content).toEqual([{ type: "text", text: "Visible before\n\nVisible after" }]);
});
it("drops metadata-only assistant text blocks without fabricating placeholder output", () => {
const toolCall = { type: "toolCall", id: "call_1", name: "read", arguments: {} };
const messages = [

View File

@@ -1,6 +1,7 @@
import type { AgentMessage } from "@earendil-works/pi-agent-core";
import type { SessionManager } from "@earendil-works/pi-coding-agent";
import { stripInboundMetadata } from "../../auto-reply/reply/strip-inbound-meta.js";
import { stripInternalMetadataForDisplay } from "../../auto-reply/reply/display-text-sanitize.js";
import { isSilentReplyPayloadText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { ProviderRuntimeModel } from "../../plugins/provider-runtime-model.types.js";
import {
@@ -282,8 +283,9 @@ function isTranscriptOnlyOpenclawAssistant(message: AgentMessage): boolean {
}
function normalizeAssistantReplayTextContent(message: AgentMessage, replayContent: string) {
const strippedText = stripInboundMetadata(replayContent);
if (!strippedText.trim()) {
const strippedText = stripInternalMetadataForDisplay(replayContent);
const trimmed = strippedText.trim();
if (!trimmed || isSilentReplyPayloadText(trimmed, SILENT_REPLY_TOKEN)) {
return null;
}
return {
@@ -305,13 +307,18 @@ function normalizeAssistantReplayBlockContent(message: AgentMessage, replayConte
sanitizedContent.push(block);
continue;
}
const strippedText = stripInboundMetadata(text);
const strippedText = stripInternalMetadataForDisplay(text);
if (strippedText === text) {
sanitizedContent.push(block);
if (!isSilentReplyPayloadText(text.trim(), SILENT_REPLY_TOKEN)) {
sanitizedContent.push(block);
} else {
touched = true;
}
continue;
}
touched = true;
if (strippedText.trim()) {
const trimmed = strippedText.trim();
if (trimmed && !isSilentReplyPayloadText(trimmed, SILENT_REPLY_TOKEN)) {
sanitizedContent.push({ ...block, text: strippedText });
}
}

View File

@@ -80,6 +80,7 @@ import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
import { createFollowupRunner } from "./followup-runner.js";
import { REPLY_RUN_STILL_SHUTTING_DOWN_TEXT } from "./get-reply-run-queue.js";
import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js";
import { sanitizePendingFinalDeliveryText } from "./pending-final-delivery.js";
import { drainPendingToolTasks } from "./pending-tool-task-drain.js";
import { readPostCompactionContext } from "./post-compaction-context.js";
import { resolveActiveRunQueueAction } from "./queue-policy.js";
@@ -922,11 +923,12 @@ function joinCommitmentAssistantText(payloads: ReplyPayload[]): string {
}
function buildPendingFinalDeliveryText(payloads: ReplyPayload[]): string {
return payloads
const text = payloads
.filter((payload) => payload.isReasoning !== true)
.map((payload) => payload.text)
.filter((text): text is string => Boolean(text))
.join("\n\n");
return sanitizePendingFinalDeliveryText(text);
}
function enqueueCommitmentExtractionForTurn(params: {

View File

@@ -0,0 +1,11 @@
import { stripInternalRuntimeContext } from "../../agents/internal-runtime-context.js";
import { stripEnvelope, stripMessageIdHints } from "../../shared/chat-envelope.js";
import { stripInboundMetadata } from "./strip-inbound-meta.js";
export function stripInternalMetadataForDisplay(text: string): string {
return stripInboundMetadata(stripInternalRuntimeContext(text));
}
export function stripUserEnvelopeForDisplay(text: string): string {
return stripMessageIdHints(stripEnvelope(stripInternalMetadataForDisplay(text)));
}

View File

@@ -2,6 +2,10 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import {
INTERNAL_RUNTIME_CONTEXT_BEGIN,
INTERNAL_RUNTIME_CONTEXT_END,
} from "../../agents/internal-runtime-context.js";
import type { OpenClawConfig } from "../../config/config.js";
import {
buildFastReplyCommandContext,
@@ -266,6 +270,48 @@ describe("getReplyFromConfig fast test bootstrap", () => {
expect(stored.pendingFinalDeliveryAttemptCount).toBe(1);
});
it("sanitizes stale heartbeat pending delivery before replay", async () => {
const home = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-heartbeat-pending-sanitize-"));
const storePath = path.join(home, "sessions.json");
const sessionKey = "agent:main:telegram:123";
await fs.writeFile(
storePath,
JSON.stringify({
[sessionKey]: {
sessionId: "pending-dirty-remainder",
updatedAt: Date.now(),
pendingFinalDelivery: true,
pendingFinalDeliveryText: [
"HEARTBEAT_OK",
INTERNAL_RUNTIME_CONTEXT_BEGIN,
"internal recovery detail",
INTERNAL_RUNTIME_CONTEXT_END,
"notify the user",
].join("\n"),
},
}),
"utf8",
);
const cfg = withFastReplyConfig({
agents: {
defaults: {
model: "openai/gpt-5.5",
workspace: home,
heartbeat: { ackMaxChars: 0 },
},
},
session: { store: storePath },
} as OpenClawConfig);
await expect(
getReplyFromConfig(buildGetReplyCtx(), { isHeartbeat: true }, cfg),
).resolves.toEqual({ text: "notify the user" });
const stored = JSON.parse(await fs.readFile(storePath, "utf8"))[sessionKey];
expect(stored.pendingFinalDeliveryText).toBe("notify the user");
expect(stored.pendingFinalDeliveryAttemptCount).toBe(1);
});
it("handles native /status before workspace bootstrap", async () => {
const home = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-native-status-fast-"));
const targetSessionKey = "agent:main:telegram:123";

View File

@@ -45,6 +45,7 @@ import { finalizeInboundContext } from "./inbound-context.js";
import { hasInboundMedia } from "./inbound-media.js";
import { emitPreAgentMessageHooks } from "./message-preprocess-hooks.js";
import { createFastTestModelSelectionState } from "./model-selection.js";
import { sanitizePendingFinalDeliveryText } from "./pending-final-delivery.js";
import { initSessionState } from "./session.js";
import {
isStaleHeartbeatAutoFallbackOverride,
@@ -386,7 +387,7 @@ export async function getReplyFromConfig(
} = sessionState;
if (sessionEntry?.pendingFinalDelivery && sessionEntry.pendingFinalDeliveryText) {
const text = sessionEntry.pendingFinalDeliveryText;
const text = sanitizePendingFinalDeliveryText(sessionEntry.pendingFinalDeliveryText);
// If it's a heartbeat, we definitely want to try delivering the lost reply now.
// If it's a user message, we deliver the lost reply first, then continue.
@@ -429,7 +430,8 @@ export async function getReplyFromConfig(
sessionEntry.pendingFinalDeliveryLastAttemptAt = updatedAt;
sessionEntry.pendingFinalDeliveryAttemptCount = attemptCount;
sessionEntry.pendingFinalDeliveryLastError = null;
sessionEntry.pendingFinalDeliveryText = heartbeatPending.replayText;
const replayText = sanitizePendingFinalDeliveryText(heartbeatPending.replayText);
sessionEntry.pendingFinalDeliveryText = replayText;
sessionEntry.updatedAt = updatedAt;
if (sessionKey && sessionStore) {
sessionStore[sessionKey] = sessionEntry;
@@ -440,7 +442,7 @@ export async function getReplyFromConfig(
storePath,
sessionKey,
update: async () => ({
pendingFinalDeliveryText: heartbeatPending.replayText,
pendingFinalDeliveryText: replayText,
pendingFinalDeliveryLastAttemptAt: updatedAt,
pendingFinalDeliveryAttemptCount: attemptCount,
pendingFinalDeliveryLastError: null,
@@ -448,7 +450,7 @@ export async function getReplyFromConfig(
}),
});
}
return { text: heartbeatPending.replayText };
return { text: replayText };
}
}
}

View File

@@ -0,0 +1,40 @@
import { describe, expect, it } from "vitest";
import {
INTERNAL_RUNTIME_CONTEXT_BEGIN,
INTERNAL_RUNTIME_CONTEXT_END,
} from "../../agents/internal-runtime-context.js";
import { sanitizePendingFinalDeliveryText } from "./pending-final-delivery.js";
describe("sanitizePendingFinalDeliveryText", () => {
it("strips internal metadata from durable pending delivery text", () => {
const text = [
"Visible reply",
INTERNAL_RUNTIME_CONTEXT_BEGIN,
"internal detail",
INTERNAL_RUNTIME_CONTEXT_END,
"",
"Conversation info (untrusted metadata):",
"```json",
'{"message_id":"msg-1"}',
"```",
].join("\n");
expect(sanitizePendingFinalDeliveryText(text)).toBe("Visible reply");
});
it("drops silent reply sentinel payloads", () => {
expect(sanitizePendingFinalDeliveryText(" NO_REPLY ")).toBe("");
expect(sanitizePendingFinalDeliveryText('{"action":"NO_REPLY"}')).toBe("");
});
it("strips mixed silent reply sentinels like normal delivery", () => {
expect(sanitizePendingFinalDeliveryText("NO_REPLYThe user is saying hello")).toBe(
"The user is saying hello",
);
expect(sanitizePendingFinalDeliveryText("HEARTBEAT_OK NO_REPLY")).toBe("HEARTBEAT_OK");
});
it("preserves heartbeat ack text for ack-aware classification", () => {
expect(sanitizePendingFinalDeliveryText("HEARTBEAT_OK short")).toBe("HEARTBEAT_OK short");
});
});

View File

@@ -0,0 +1,32 @@
import {
isSilentReplyPayloadText,
isSilentReplyText,
SILENT_REPLY_TOKEN,
startsWithSilentToken,
stripLeadingSilentToken,
stripSilentToken,
} from "../tokens.js";
import { stripInternalMetadataForDisplay } from "./display-text-sanitize.js";
export function sanitizePendingFinalDeliveryText(text: string): string {
let stripped = stripInternalMetadataForDisplay(text).trim();
if (isSilentReplyPayloadText(stripped, SILENT_REPLY_TOKEN)) {
return "";
}
if (stripped && !isSilentReplyText(stripped, SILENT_REPLY_TOKEN)) {
const hasLeadingSilentToken = startsWithSilentToken(stripped, SILENT_REPLY_TOKEN);
if (hasLeadingSilentToken) {
stripped = stripLeadingSilentToken(stripped, SILENT_REPLY_TOKEN);
}
if (
hasLeadingSilentToken ||
stripped.toLowerCase().includes(SILENT_REPLY_TOKEN.toLowerCase())
) {
stripped = stripSilentToken(stripped, SILENT_REPLY_TOKEN);
}
}
if (!stripped.trim()) {
return "";
}
return isSilentReplyPayloadText(stripped, SILENT_REPLY_TOKEN) ? "" : stripped.trim();
}

View File

@@ -1,9 +1,9 @@
import { stripInternalRuntimeContext } from "../agents/internal-runtime-context.js";
import {
extractInboundSenderLabel,
stripInboundMetadata,
} from "../auto-reply/reply/strip-inbound-meta.js";
import { stripEnvelope, stripMessageIdHints } from "../shared/chat-envelope.js";
stripInternalMetadataForDisplay,
stripUserEnvelopeForDisplay,
} from "../auto-reply/reply/display-text-sanitize.js";
import { extractInboundSenderLabel } from "../auto-reply/reply/strip-inbound-meta.js";
import { stripEnvelope } from "../shared/chat-envelope.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
export { stripEnvelope };
@@ -49,11 +49,9 @@ function stripEnvelopeFromContentWithRole(
if (entry.type !== "text" || typeof entry.text !== "string") {
return item;
}
const runtimeStripped = stripInternalRuntimeContext(entry.text);
const inboundStripped = stripInboundMetadata(runtimeStripped);
const stripped = stripUserEnvelope
? stripMessageIdHints(stripEnvelope(inboundStripped))
: inboundStripped;
? stripUserEnvelopeForDisplay(entry.text)
: stripInternalMetadataForDisplay(entry.text);
if (stripped === entry.text) {
return item;
}
@@ -83,11 +81,9 @@ export function stripEnvelopeFromMessage(message: unknown): unknown {
}
if (typeof entry.content === "string") {
const runtimeStripped = stripInternalRuntimeContext(entry.content);
const inboundStripped = stripInboundMetadata(runtimeStripped);
const stripped = stripUserEnvelope
? stripMessageIdHints(stripEnvelope(inboundStripped))
: inboundStripped;
? stripUserEnvelopeForDisplay(entry.content)
: stripInternalMetadataForDisplay(entry.content);
if (stripped !== entry.content) {
next.content = stripped;
changed = true;
@@ -99,11 +95,9 @@ export function stripEnvelopeFromMessage(message: unknown): unknown {
changed = true;
}
} else if (typeof entry.text === "string") {
const runtimeStripped = stripInternalRuntimeContext(entry.text);
const inboundStripped = stripInboundMetadata(runtimeStripped);
const stripped = stripUserEnvelope
? stripMessageIdHints(stripEnvelope(inboundStripped))
: inboundStripped;
? stripUserEnvelopeForDisplay(entry.text)
: stripInternalMetadataForDisplay(entry.text);
if (stripped !== entry.text) {
next.text = stripped;
changed = true;