From fcae3bf9433b98b22c1d995644e9e33f0621d8f8 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 12 Apr 2026 19:02:55 +0100 Subject: [PATCH] fix(agents): preserve active-turn queued user prompts (#65478) * fix(agents): preserve active-turn queued user prompts * Update src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Update CHANGELOG.md * Update CHANGELOG.md --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- CHANGELOG.md | 1 + .../run/attempt.prompt-helpers.ts | 44 ++++++++++++++ .../pi-embedded-runner/run/attempt.test.ts | 58 ++++++++++++++++++- src/agents/pi-embedded-runner/run/attempt.ts | 11 +++- 4 files changed, 110 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99bb4fbc242..c8984bd64d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Doctor/Discord: stop `openclaw doctor --fix` from rewriting legacy Discord preview-streaming config into the nested modern shape, so downgrades can still recover without hand-editing `channels.discord.streaming`. (#65035) Thanks @vincentkoc. - Gateway/auth: blank the shipped example gateway credential in `.env.example` and fail startup when a copied placeholder token or password is still configured, so operators cannot accidentally launch with a publicly known secret. (#64586) Thanks @navarrotech and @vincentkoc. - Memory/active-memory+dreaming: keep active-memory recall runs on the strongest resolved channel, consume managed dreaming heartbeat events exactly once, stop dreaming from re-ingesting its own narrative transcripts, and add explicit repair/dedupe recovery flows in CLI, doctor, and the Dreams UI. +- Agents/queueing: carry orphaned active-turn user text into the next prompt before repairing transcript ordering, so follow-up messages that arrive mid-run are no longer silently dropped. (#65388) Thanks @adminfedres and @vincentkoc. - Gateway/keepalive: stop marking WebSocket tick broadcasts as droppable so slow or backpressured clients do not self-disconnect with `tick timeout` while long-running work is still alive. (#65256) Thanks @100yenadmin and @vincentkoc. - Matrix/mentions: keep room mention gating strict while accepting visible `@displayName` Matrix URI labels, so `requireMention` works for non-OpenClaw Matrix clients again. (#64796) Thanks @hclsys. - Doctor: warn when on-disk agent directories still exist under `~/.openclaw/agents//agent` but the matching `agents.list[]` entries are missing from config. (#65113) Thanks @neeravmakwana. diff --git a/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts b/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts index 40c0c1d6981..767f496ff24 100644 --- a/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts +++ b/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts @@ -121,6 +121,50 @@ export function shouldWarnOnOrphanedUserRepair( return trigger === "user" || trigger === "manual"; } +function extractUserMessagePlainText(content: unknown): string | undefined { + if (typeof content === "string") { + const trimmed = content.trim(); + return trimmed || undefined; + } + if (!Array.isArray(content)) { + return undefined; + } + const text = content + .flatMap((part) => + part && typeof part === "object" && "type" in part && part.type === "text" + ? [typeof part.text === "string" ? part.text : ""] + : [], + ) + .join("\n") + .trim(); + return text || undefined; +} + +export function mergeOrphanedTrailingUserPrompt(params: { + prompt: string; + trigger: EmbeddedRunAttemptParams["trigger"]; + leafMessage: { content?: unknown }; +}): { prompt: string; merged: boolean } { + if (!shouldWarnOnOrphanedUserRepair(params.trigger)) { + return { prompt: params.prompt, merged: false }; + } + + const orphanText = extractUserMessagePlainText(params.leafMessage.content); + if (!orphanText || orphanText.length < 4 || params.prompt.includes(orphanText)) { + return { prompt: params.prompt, merged: false }; + } + + return { + prompt: [ + "[Queued user message that arrived while the previous turn was still active]", + orphanText, + "", + params.prompt, + ].join("\n"), + merged: true, + }; +} + export function resolveAttemptFsWorkspaceOnly(params: { config?: OpenClawConfig; sessionAgentId: string; diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 17de57f5ef9..c073e102096 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -8,6 +8,7 @@ import { buildAfterTurnRuntimeContext, composeSystemPromptWithHookContext, decodeHtmlEntitiesInObject, + mergeOrphanedTrailingUserPrompt, prependSystemPromptAddition, resetEmbeddedAgentBaseStreamFnCacheForTest, resolveEmbeddedAgentBaseStreamFn, @@ -230,6 +231,55 @@ describe("shouldWarnOnOrphanedUserRepair", () => { }); }); +describe("mergeOrphanedTrailingUserPrompt", () => { + it("merges an orphaned user leaf into the next user-triggered prompt when missing", () => { + expect( + mergeOrphanedTrailingUserPrompt({ + prompt: "newest inbound message", + trigger: "user", + leafMessage: { + content: [{ type: "text", text: "older active-turn message" }], + } as never, + }), + ).toEqual({ + merged: true, + prompt: + "[Queued user message that arrived while the previous turn was still active]\n" + + "older active-turn message\n\nnewest inbound message", + }); + }); + + it("does not duplicate orphaned user text already present in the next prompt", () => { + expect( + mergeOrphanedTrailingUserPrompt({ + prompt: "summary\nolder active-turn message\nnewest inbound message", + trigger: "user", + leafMessage: { + content: "older active-turn message", + } as never, + }), + ).toEqual({ + merged: false, + prompt: "summary\nolder active-turn message\nnewest inbound message", + }); + }); + + it("skips orphan prompt merging for non-user triggers", () => { + expect( + mergeOrphanedTrailingUserPrompt({ + prompt: "HEARTBEAT_OK", + trigger: "heartbeat", + leafMessage: { + content: "older active-turn message", + } as never, + }), + ).toEqual({ + merged: false, + prompt: "HEARTBEAT_OK", + }); + }); +}); + describe("resolveEmbeddedAgentStreamFn", () => { it("reuses the session's original base stream across later wrapper mutations", () => { resetEmbeddedAgentBaseStreamFnCacheForTest(); @@ -1308,9 +1358,11 @@ describe("wrapStreamFnSanitizeMalformedToolCalls", () => { ); const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); - const stream = wrapped({ api: "google-gemini" } as never, { messages } as never, {} as never) as - | FakeWrappedStream - | Promise; + const stream = wrapped( + { api: "google-gemini" } as never, + { messages } as never, + {} as never, + ) as FakeWrappedStream | Promise; await Promise.resolve(stream); expect(baseFn).toHaveBeenCalledTimes(1); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 8100bd06db1..6ce4251ce76 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -179,6 +179,7 @@ import { } from "./attempt.context-engine-helpers.js"; import { buildAfterTurnRuntimeContext, + mergeOrphanedTrailingUserPrompt, prependSystemPromptAddition, resolveAttemptFsWorkspaceOnly, resolveAttemptPrependSystemContext, @@ -240,6 +241,7 @@ export { } from "./attempt.thread-helpers.js"; export { buildAfterTurnRuntimeContext, + mergeOrphanedTrailingUserPrompt, prependSystemPromptAddition, resolveAttemptFsWorkspaceOnly, resolveAttemptPrependSystemContext, @@ -1761,6 +1763,12 @@ export async function runEmbeddedAttempt( // Repair orphaned trailing user messages so new prompts don't violate role ordering. const leafEntry = sessionManager.getLeafEntry(); if (leafEntry?.type === "message" && leafEntry.message.role === "user") { + const orphanPromptMerge = mergeOrphanedTrailingUserPrompt({ + prompt: effectivePrompt, + trigger: params.trigger, + leafMessage: leafEntry.message, + }); + effectivePrompt = orphanPromptMerge.prompt; if (leafEntry.parentId) { sessionManager.branch(leafEntry.parentId); } else { @@ -1769,7 +1777,8 @@ export async function runEmbeddedAttempt( const sessionContext = sessionManager.buildSessionContext(); activeSession.agent.state.messages = sessionContext.messages; const orphanRepairMessage = - `Removed orphaned user message to prevent consecutive user turns. ` + + `${orphanPromptMerge.merged ? "Merged and removed" : "Removed"} orphaned user message ` + + `to prevent consecutive user turns. ` + `runId=${params.runId} sessionId=${params.sessionId} trigger=${params.trigger}`; if (shouldWarnOnOrphanedUserRepair(params.trigger)) { log.warn(orphanRepairMessage);