fix(agents): preserve active-turn queued user prompts (#65478)

* fix(agents): preserve active-turn queued user prompts

* Update src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Update CHANGELOG.md

* Update CHANGELOG.md

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
Vincent Koc
2026-04-12 19:02:55 +01:00
committed by GitHub
parent 4df9772b6e
commit fcae3bf943
4 changed files with 110 additions and 4 deletions

View File

@@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai
- Doctor/Discord: stop `openclaw doctor --fix` from rewriting legacy Discord preview-streaming config into the nested modern shape, so downgrades can still recover without hand-editing `channels.discord.streaming`. (#65035) Thanks @vincentkoc.
- Gateway/auth: blank the shipped example gateway credential in `.env.example` and fail startup when a copied placeholder token or password is still configured, so operators cannot accidentally launch with a publicly known secret. (#64586) Thanks @navarrotech and @vincentkoc.
- Memory/active-memory+dreaming: keep active-memory recall runs on the strongest resolved channel, consume managed dreaming heartbeat events exactly once, stop dreaming from re-ingesting its own narrative transcripts, and add explicit repair/dedupe recovery flows in CLI, doctor, and the Dreams UI.
- Agents/queueing: carry orphaned active-turn user text into the next prompt before repairing transcript ordering, so follow-up messages that arrive mid-run are no longer silently dropped. (#65388) Thanks @adminfedres and @vincentkoc.
- Gateway/keepalive: stop marking WebSocket tick broadcasts as droppable so slow or backpressured clients do not self-disconnect with `tick timeout` while long-running work is still alive. (#65256) Thanks @100yenadmin and @vincentkoc.
- Matrix/mentions: keep room mention gating strict while accepting visible `@displayName` Matrix URI labels, so `requireMention` works for non-OpenClaw Matrix clients again. (#64796) Thanks @hclsys.
- Doctor: warn when on-disk agent directories still exist under `~/.openclaw/agents/<id>/agent` but the matching `agents.list[]` entries are missing from config. (#65113) Thanks @neeravmakwana.

View File

@@ -121,6 +121,50 @@ export function shouldWarnOnOrphanedUserRepair(
return trigger === "user" || trigger === "manual";
}
function extractUserMessagePlainText(content: unknown): string | undefined {
if (typeof content === "string") {
const trimmed = content.trim();
return trimmed || undefined;
}
if (!Array.isArray(content)) {
return undefined;
}
const text = content
.flatMap((part) =>
part && typeof part === "object" && "type" in part && part.type === "text"
? [typeof part.text === "string" ? part.text : ""]
: [],
)
.join("\n")
.trim();
return text || undefined;
}
export function mergeOrphanedTrailingUserPrompt(params: {
prompt: string;
trigger: EmbeddedRunAttemptParams["trigger"];
leafMessage: { content?: unknown };
}): { prompt: string; merged: boolean } {
if (!shouldWarnOnOrphanedUserRepair(params.trigger)) {
return { prompt: params.prompt, merged: false };
}
const orphanText = extractUserMessagePlainText(params.leafMessage.content);
if (!orphanText || orphanText.length < 4 || params.prompt.includes(orphanText)) {
return { prompt: params.prompt, merged: false };
}
return {
prompt: [
"[Queued user message that arrived while the previous turn was still active]",
orphanText,
"",
params.prompt,
].join("\n"),
merged: true,
};
}
export function resolveAttemptFsWorkspaceOnly(params: {
config?: OpenClawConfig;
sessionAgentId: string;

View File

@@ -8,6 +8,7 @@ import {
buildAfterTurnRuntimeContext,
composeSystemPromptWithHookContext,
decodeHtmlEntitiesInObject,
mergeOrphanedTrailingUserPrompt,
prependSystemPromptAddition,
resetEmbeddedAgentBaseStreamFnCacheForTest,
resolveEmbeddedAgentBaseStreamFn,
@@ -230,6 +231,55 @@ describe("shouldWarnOnOrphanedUserRepair", () => {
});
});
describe("mergeOrphanedTrailingUserPrompt", () => {
it("merges an orphaned user leaf into the next user-triggered prompt when missing", () => {
expect(
mergeOrphanedTrailingUserPrompt({
prompt: "newest inbound message",
trigger: "user",
leafMessage: {
content: [{ type: "text", text: "older active-turn message" }],
} as never,
}),
).toEqual({
merged: true,
prompt:
"[Queued user message that arrived while the previous turn was still active]\n" +
"older active-turn message\n\nnewest inbound message",
});
});
it("does not duplicate orphaned user text already present in the next prompt", () => {
expect(
mergeOrphanedTrailingUserPrompt({
prompt: "summary\nolder active-turn message\nnewest inbound message",
trigger: "user",
leafMessage: {
content: "older active-turn message",
} as never,
}),
).toEqual({
merged: false,
prompt: "summary\nolder active-turn message\nnewest inbound message",
});
});
it("skips orphan prompt merging for non-user triggers", () => {
expect(
mergeOrphanedTrailingUserPrompt({
prompt: "HEARTBEAT_OK",
trigger: "heartbeat",
leafMessage: {
content: "older active-turn message",
} as never,
}),
).toEqual({
merged: false,
prompt: "HEARTBEAT_OK",
});
});
});
describe("resolveEmbeddedAgentStreamFn", () => {
it("reuses the session's original base stream across later wrapper mutations", () => {
resetEmbeddedAgentBaseStreamFnCacheForTest();
@@ -1308,9 +1358,11 @@ describe("wrapStreamFnSanitizeMalformedToolCalls", () => {
);
const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"]));
const stream = wrapped({ api: "google-gemini" } as never, { messages } as never, {} as never) as
| FakeWrappedStream
| Promise<FakeWrappedStream>;
const stream = wrapped(
{ api: "google-gemini" } as never,
{ messages } as never,
{} as never,
) as FakeWrappedStream | Promise<FakeWrappedStream>;
await Promise.resolve(stream);
expect(baseFn).toHaveBeenCalledTimes(1);

View File

@@ -179,6 +179,7 @@ import {
} from "./attempt.context-engine-helpers.js";
import {
buildAfterTurnRuntimeContext,
mergeOrphanedTrailingUserPrompt,
prependSystemPromptAddition,
resolveAttemptFsWorkspaceOnly,
resolveAttemptPrependSystemContext,
@@ -240,6 +241,7 @@ export {
} from "./attempt.thread-helpers.js";
export {
buildAfterTurnRuntimeContext,
mergeOrphanedTrailingUserPrompt,
prependSystemPromptAddition,
resolveAttemptFsWorkspaceOnly,
resolveAttemptPrependSystemContext,
@@ -1761,6 +1763,12 @@ export async function runEmbeddedAttempt(
// Repair orphaned trailing user messages so new prompts don't violate role ordering.
const leafEntry = sessionManager.getLeafEntry();
if (leafEntry?.type === "message" && leafEntry.message.role === "user") {
const orphanPromptMerge = mergeOrphanedTrailingUserPrompt({
prompt: effectivePrompt,
trigger: params.trigger,
leafMessage: leafEntry.message,
});
effectivePrompt = orphanPromptMerge.prompt;
if (leafEntry.parentId) {
sessionManager.branch(leafEntry.parentId);
} else {
@@ -1769,7 +1777,8 @@ export async function runEmbeddedAttempt(
const sessionContext = sessionManager.buildSessionContext();
activeSession.agent.state.messages = sessionContext.messages;
const orphanRepairMessage =
`Removed orphaned user message to prevent consecutive user turns. ` +
`${orphanPromptMerge.merged ? "Merged and removed" : "Removed"} orphaned user message ` +
`to prevent consecutive user turns. ` +
`runId=${params.runId} sessionId=${params.sessionId} trigger=${params.trigger}`;
if (shouldWarnOnOrphanedUserRepair(params.trigger)) {
log.warn(orphanRepairMessage);