diff --git a/CHANGELOG.md b/CHANGELOG.md
index 360a1549707..2ddcafb6fba 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@ Docs: https://docs.openclaw.ai
### Changes
- Dependencies: refresh workspace runtime, plugin, and tooling packages, including ACP, Pi, AWS SDK, TypeBox, pnpm, oxlint, oxfmt, jsdom, pdfjs, ciao, and tokenjuice, while keeping patched ACP behavior and lint gates current. Thanks @mariozechner.
+- Messages/queue: make `steer` drain all pending Pi steering messages at the next model boundary, keep legacy one-at-a-time steering as `queue`, and add a dedicated steering queue docs page. Thanks @vincentkoc.
- Messages/queue: default active-run queueing to `steer` with a 500ms followup fallback debounce, and document the queue modes, precedence, and drop policies on the command queue page. Thanks @vincentkoc.
- Providers/NVIDIA: add the NVIDIA provider with API-key onboarding, setup docs, static catalog metadata, and literal model-ref picker support so NVIDIA hosted models can be selected with their provider prefix intact. (#71204) Thanks @eleqtrizit.
- Memory/wiki: add agent-facing people wiki metadata, canonical aliases, person cards, relationship graphs, privacy/provenance reports, evidence-kind drilldown, and search modes for person lookup, question routing, source evidence, and raw claims. Thanks @vincentkoc.
diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256
index 21e0608e96d..32b2e75197e 100644
--- a/docs/.generated/config-baseline.sha256
+++ b/docs/.generated/config-baseline.sha256
@@ -1,4 +1,4 @@
-2af6bef21f530dc64e0379f7631bed410aee1d5c86604ef9fb149f546cfcb0e8 config-baseline.json
-8d75df355b7f6e44b9c2f195d9df86130beb697e26061469df7d60b7e8a2f204 config-baseline.core.json
+dbb3d39ddeb8a9ce03459d69774db7e457d33f47c585e0f8c7130b14b92cfcff config-baseline.json
+2197788a6a1e677fb34a4971ac4f254116c243753452397824e08431f8740aab config-baseline.core.json
fab66aa304db5697e87259165ad261006719eb6e6cdbd25f957fcba2b7b324e9 config-baseline.channel.json
c4231c2194206547af8ad94342dc00aadb734f43cb49cc79d4c46bdbb80c3f95 config-baseline.plugin.json
diff --git a/docs/.i18n/glossary.zh-CN.json b/docs/.i18n/glossary.zh-CN.json
index 466000477f1..ff1616612c9 100644
--- a/docs/.i18n/glossary.zh-CN.json
+++ b/docs/.i18n/glossary.zh-CN.json
@@ -51,6 +51,10 @@
"source": "Agent loop",
"target": "Agent loop"
},
+ {
+ "source": "Steering queue",
+ "target": "Steering queue"
+ },
{
"source": "Models",
"target": "Models"
diff --git a/docs/concepts/agent.md b/docs/concepts/agent.md
index 3277b7d4ff5..d7280dd0e60 100644
--- a/docs/concepts/agent.md
+++ b/docs/concepts/agent.md
@@ -86,13 +86,15 @@ Legacy session folders from other tools are not read.
When queue mode is `steer`, inbound messages are injected into the current run.
Queued steering is delivered **after the current assistant turn finishes
-executing its tool calls**, before the next LLM call. Steering no longer skips
-remaining tool calls from the current assistant message; it injects the queued
-message at the next model boundary instead.
+executing its tool calls**, before the next LLM call. Pi drains all pending
+steering messages together for `steer`; legacy `queue` drains one message per
+model boundary. Steering no longer skips remaining tool calls from the current
+assistant message.
When queue mode is `followup` or `collect`, inbound messages are held until the
current turn ends, then a new agent turn starts with the queued payloads. See
-[Queue](/concepts/queue) for mode + debounce/cap behavior.
+[Queue](/concepts/queue) and [Steering queue](/concepts/queue-steering) for mode
+and boundary behavior.
Block streaming sends completed assistant blocks as soon as they finish; it is
**off by default** (`agents.defaults.blockStreamingDefault: "off"`).
diff --git a/docs/concepts/messages.md b/docs/concepts/messages.md
index c25a6a7ba6b..adbf5d243f4 100644
--- a/docs/concepts/messages.md
+++ b/docs/concepts/messages.md
@@ -127,9 +127,9 @@ current run, or collected for a followup turn.
- Default mode is `steer`, with a 500ms followup debounce when steering falls
back to queued followup delivery.
- Modes: `steer`, `followup`, `collect`, `steer-backlog`, `interrupt`, and the
- legacy `queue` alias.
+ legacy one-at-a-time `queue` mode.
-Details: [Command queue](/concepts/queue).
+Details: [Command queue](/concepts/queue) and [Steering queue](/concepts/queue-steering).
## Channel run ownership
diff --git a/docs/concepts/queue-steering.md b/docs/concepts/queue-steering.md
new file mode 100644
index 00000000000..74419e0e4c5
--- /dev/null
+++ b/docs/concepts/queue-steering.md
@@ -0,0 +1,90 @@
+---
+summary: "How active-run steering queues messages at runtime boundaries"
+read_when:
+ - Explaining how steer behaves while an agent is using tools
+ - Changing active-run queue behavior or runtime steering integration
+ - Comparing steer, queue, collect, and followup modes
+title: "Steering queue"
+---
+
+When a message arrives while a session run is already streaming, OpenClaw can
+send that message into the active runtime instead of starting another run for
+the same session. The public modes are runtime-neutral; Pi and the native Codex
+app-server harness implement the delivery details differently.
+
+## Runtime boundary
+
+Steering does not interrupt a tool call that is already running. Pi checks for
+queued steering messages at model boundaries:
+
+1. The assistant asks for tool calls.
+2. Pi executes the current assistant message's tool-call batch.
+3. Pi emits the turn end event.
+4. Pi drains queued steering messages.
+5. Pi appends those messages as user messages before the next LLM call.
+
+This keeps tool results paired with the assistant message that requested them,
+then lets the next model call see the latest user input.
+
+The native Codex app-server harness exposes `turn/steer` instead of Pi's
+internal steering queue. OpenClaw adapts the same modes there:
+
+- `steer` batches queued messages for the configured quiet window, then sends a
+ single `turn/steer` request with all collected user input in arrival order.
+- `queue` keeps the legacy serialized shape by sending separate `turn/steer`
+ requests.
+- `followup`, `collect`, `steer-backlog`, and `interrupt` stay OpenClaw-owned
+ queue behavior around the active Codex turn.
+
+Codex review and manual compaction turns reject same-turn steering. When a
+runtime cannot accept steering, OpenClaw falls back to the followup queue where
+that mode allows it.
+
+## Modes
+
+| Mode | Active-run behavior | Later followup behavior |
+| --------------- | ---------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------- |
+| `steer` | Injects all queued steering messages together at the next runtime boundary. This is the default. | Falls back to followup only when steering is unavailable. |
+| `queue` | Legacy one-at-a-time steering. Pi injects one queued message per model boundary; Codex sends separate `turn/steer` requests. | Falls back to followup only when steering is unavailable. |
+| `steer-backlog` | Same active-run steering behavior as `steer`. | Also keeps the same message for a later followup turn. |
+| `followup` | Does not steer the current run. | Runs queued messages later. |
+| `collect` | Does not steer the current run. | Coalesces compatible queued messages into one later turn after the debounce window. |
+| `interrupt` | Aborts the active run, then starts the newest message. | None. |
+
+## Burst example
+
+If four users send messages while the agent is executing a tool call:
+
+- `steer`: the active runtime receives all four messages in arrival order before
+ its next model decision. Pi drains them at the next model boundary; Codex
+ receives them as one batched `turn/steer`.
+- `queue`: legacy serialized steering. Pi injects one queued message at a time;
+ Codex receives separate `turn/steer` requests.
+- `collect`: OpenClaw waits until the active run ends, then creates a followup
+ turn with compatible queued messages after the debounce window.
+
+## Scope
+
+Steering always targets the current active session run. It does not create a new
+session, change the active run's tool policy, or split messages by sender. In
+multi-user channels, inbound prompts already include sender and route context, so
+the next model call can see who sent each message.
+
+Use `collect` when you want OpenClaw to build a later followup turn that can
+coalesce compatible messages and preserve followup queue drop policy. Use
+`queue` only when you need the older one-at-a-time steering behavior.
+
+## Debounce
+
+`messages.queue.debounceMs` applies to followup delivery, including `collect`,
+`followup`, `steer-backlog`, and `steer` fallback when active-run steering is not
+available. For Pi, active `steer` itself does not use the debounce timer because
+Pi naturally batches messages until the next model boundary. For the native
+Codex harness, OpenClaw uses the same debounce value as the quiet window before
+sending the batched `turn/steer`.
+
+## Related
+
+- [Command queue](/concepts/queue)
+- [Messages](/concepts/messages)
+- [Agent loop](/concepts/agent-loop)
diff --git a/docs/concepts/queue.md b/docs/concepts/queue.md
index a3ff4ec6ee2..50e7d7ae542 100644
--- a/docs/concepts/queue.md
+++ b/docs/concepts/queue.md
@@ -31,24 +31,28 @@ When unset, all inbound channel surfaces use:
- `drop: "summarize"`
`steer` is the default because it keeps the active model turn responsive without
-starting a second session run. If the current run cannot accept steering,
+starting a second session run. It drains all steering messages that arrived
+before the next model boundary. If the current run cannot accept steering,
OpenClaw falls back to a followup queue entry.
## Queue modes
Inbound messages can steer the current run, wait for a followup turn, or do both:
-- `steer`: queue a steering message into the active Pi run. Pi delivers it **after the current assistant turn finishes executing its tool calls**, before the next LLM call. If the run is not actively streaming or steering is unavailable, OpenClaw falls back to a followup queue entry.
+- `steer`: queue steering messages into the active runtime. Pi delivers all pending steering messages **after the current assistant turn finishes executing its tool calls**, before the next LLM call; Codex app-server receives one batched `turn/steer`. If the run is not actively streaming or steering is unavailable, OpenClaw falls back to a followup queue entry.
+- `queue` (legacy): old one-at-a-time steering. Pi delivers one queued steering message at each model boundary; Codex app-server receives separate `turn/steer` requests. Prefer `steer` unless you need the previous serialized behavior.
- `followup`: enqueue each message for a later agent turn after the current run ends.
- `collect`: coalesce queued messages into a **single** followup turn after the quiet window. If messages target different channels/threads, they drain individually to preserve routing.
- `steer-backlog` (aka `steer+backlog`): steer now **and** preserve the same message for a followup turn.
- `interrupt` (legacy): abort the active run for that session, then run the newest message.
-- `queue` (legacy alias): same as `steer`.
Steer-backlog means you can get a followup response after the steered run, so
streaming surfaces can look like duplicates. Prefer `collect`/`steer` if you want
one response per inbound message.
+For runtime-specific timing and dependency behavior, see
+[Steering queue](/concepts/queue-steering).
+
Configure globally or per channel via `messages.queue`:
```json5
@@ -67,7 +71,7 @@ Configure globally or per channel via `messages.queue`:
## Queue options
-Options apply to `followup`, `collect`, and `steer-backlog` (and to `steer` when it falls back to followup):
+Options apply to `followup`, `collect`, and `steer-backlog` (and to `steer` or legacy `queue` when steering falls back to followup):
- `debounceMs`: quiet window before draining queued followups. Bare numbers are milliseconds; units `ms`, `s`, `m`, `h`, and `d` are accepted by `/queue` options.
- `cap`: max queued messages per session. Values below `1` are ignored.
@@ -115,4 +119,5 @@ keys.
## Related
- [Session management](/concepts/session)
+- [Steering queue](/concepts/queue-steering)
- [Retry policy](/concepts/retry)
diff --git a/docs/docs.json b/docs/docs.json
index d977961ab50..57a9b05bead 100644
--- a/docs/docs.json
+++ b/docs/docs.json
@@ -1164,7 +1164,8 @@
"concepts/messages",
"concepts/streaming",
"concepts/retry",
- "concepts/queue"
+ "concepts/queue",
+ "concepts/queue-steering"
]
}
]
diff --git a/docs/gateway/config-agents.md b/docs/gateway/config-agents.md
index a7baf7a80e2..b1861d7c24b 100644
--- a/docs/gateway/config-agents.md
+++ b/docs/gateway/config-agents.md
@@ -1226,7 +1226,7 @@ See [Multi-Agent Sandbox & Tools](/tools/multi-agent-sandbox-tools) for preceden
ackReactionScope: "group-mentions", // group-mentions | group-all | direct | all
removeAckAfterReply: false,
queue: {
- mode: "steer", // steer | followup | collect | steer-backlog | steer+backlog | queue | interrupt
+ mode: "steer", // steer | queue (legacy one-at-a-time) | followup | collect | steer-backlog | steer+backlog | interrupt
debounceMs: 500,
cap: 20,
drop: "summarize", // old | new | summarize
diff --git a/docs/help/faq.md b/docs/help/faq.md
index a9c228787b2..c9db380d743 100644
--- a/docs/help/faq.md
+++ b/docs/help/faq.md
@@ -1943,13 +1943,14 @@ lives on the [Models FAQ](/help/faq-models).
Queue mode controls how new messages interact with an in-flight run. Use `/queue` to change modes:
- - `steer` - queue steering for the next model boundary in the current run
+ - `steer` - queue all pending steering for the next model boundary in the current run
+ - `queue` - legacy one-at-a-time steering
- `followup` - run messages one at a time
- `collect` - batch messages and reply once
- `steer-backlog` - steer now, then process backlog
- `interrupt` - abort current run and start fresh
- Default mode is `steer`. You can add options like `debounce:0.5s cap:25 drop:summarize` for followup modes. See [Command queue](/concepts/queue).
+ Default mode is `steer`. You can add options like `debounce:0.5s cap:25 drop:summarize` for followup modes. See [Command queue](/concepts/queue) and [Steering queue](/concepts/queue-steering).
diff --git a/docs/plugins/codex-harness.md b/docs/plugins/codex-harness.md
index dd9a835bfe7..d6223cf8186 100644
--- a/docs/plugins/codex-harness.md
+++ b/docs/plugins/codex-harness.md
@@ -946,6 +946,14 @@ originating chat, and the next queued follow-up message answers that native
server request instead of being steered as extra context. Other MCP elicitation
requests still fail closed.
+Active-run queue steering maps onto Codex app-server `turn/steer`. With the
+default `messages.queue.mode: "steer"`, OpenClaw batches queued chat messages
+for the configured quiet window and sends them as one `turn/steer` request in
+arrival order. Legacy `queue` mode sends separate `turn/steer` requests. Codex
+review and manual compaction turns can reject same-turn steering, in which case
+OpenClaw uses the followup queue when the selected mode allows fallback. See
+[Steering queue](/concepts/queue-steering).
+
When the selected model uses the Codex harness, native thread compaction is
delegated to Codex app-server. OpenClaw keeps a transcript mirror for channel
history, search, `/new`, `/reset`, and future model or harness switching. The
diff --git a/docs/tools/slash-commands.md b/docs/tools/slash-commands.md
index 05543cd6ab4..3d5c880630c 100644
--- a/docs/tools/slash-commands.md
+++ b/docs/tools/slash-commands.md
@@ -142,7 +142,7 @@ Current source-of-truth:
- `/exec host= security= ask= node=` shows or sets exec defaults.
- `/model [name|#|status]` shows or sets the model.
- `/models [provider] [page] [limit=|size=|all]` lists configured/auth-available providers or models for a provider; add `all` to browse that provider's full catalog.
- - `/queue ` manages queue behavior (`steer`, `followup`, `collect`, `steer-backlog`, `interrupt`) plus options like `debounce:0.5s cap:25 drop:summarize`; `/queue default` or `/queue reset` clears the session override. See [Command queue](/concepts/queue).
+ - `/queue ` manages queue behavior (`steer`, legacy `queue`, `followup`, `collect`, `steer-backlog`, `interrupt`) plus options like `debounce:0.5s cap:25 drop:summarize`; `/queue default` or `/queue reset` clears the session override. See [Command queue](/concepts/queue) and [Steering queue](/concepts/queue-steering).
diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts
index 3c09a6dac73..c8955c8c847 100644
--- a/extensions/codex/src/app-server/run-attempt.test.ts
+++ b/extensions/codex/src/app-server/run-attempt.test.ts
@@ -1056,6 +1056,81 @@ describe("runCodexAppServerAttempt", () => {
);
});
+ it("batches default queued steering before sending turn/steer", async () => {
+ const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
+
+ const run = runCodexAppServerAttempt(
+ createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
+ );
+ await waitForMethod("turn/start");
+
+ expect(queueAgentHarnessMessage("session-1", "first", { debounceMs: 5 })).toBe(true);
+ expect(queueAgentHarnessMessage("session-1", "second", { debounceMs: 5 })).toBe(true);
+
+ await vi.waitFor(
+ () =>
+ expect(requests.filter((entry) => entry.method === "turn/steer")).toEqual([
+ {
+ method: "turn/steer",
+ params: {
+ threadId: "thread-1",
+ expectedTurnId: "turn-1",
+ input: [
+ { type: "text", text: "first", text_elements: [] },
+ { type: "text", text: "second", text_elements: [] },
+ ],
+ },
+ },
+ ]),
+ { interval: 1 },
+ );
+
+ await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
+ await run;
+ });
+
+ it("keeps legacy queue steering as separate turn/steer requests", async () => {
+ const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
+
+ const run = runCodexAppServerAttempt(
+ createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
+ );
+ await waitForMethod("turn/start");
+
+ expect(queueAgentHarnessMessage("session-1", "first", { steeringMode: "one-at-a-time" })).toBe(
+ true,
+ );
+ expect(queueAgentHarnessMessage("session-1", "second", { steeringMode: "one-at-a-time" })).toBe(
+ true,
+ );
+
+ await vi.waitFor(
+ () =>
+ expect(requests.filter((entry) => entry.method === "turn/steer")).toEqual([
+ {
+ method: "turn/steer",
+ params: {
+ threadId: "thread-1",
+ expectedTurnId: "turn-1",
+ input: [{ type: "text", text: "first", text_elements: [] }],
+ },
+ },
+ {
+ method: "turn/steer",
+ params: {
+ threadId: "thread-1",
+ expectedTurnId: "turn-1",
+ input: [{ type: "text", text: "second", text_elements: [] }],
+ },
+ },
+ ]),
+ { interval: 1 },
+ );
+
+ await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
+ await run;
+ });
+
it("routes MCP approval elicitations through the native bridge", async () => {
let notify: (notification: CodexServerNotification) => Promise = async () => undefined;
let handleRequest:
diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts
index 08274fce69a..ee763dfd190 100644
--- a/extensions/codex/src/app-server/run-attempt.ts
+++ b/extensions/codex/src/app-server/run-attempt.ts
@@ -59,6 +59,7 @@ import {
readCodexDynamicToolCallParams,
} from "./protocol-validators.js";
import {
+ type CodexUserInput,
isJsonObject,
type CodexServerNotification,
type CodexDynamicToolCallParams,
@@ -86,6 +87,7 @@ import { filterToolsForVisionInputs } from "./vision-tools.js";
const CODEX_DYNAMIC_TOOL_TIMEOUT_MS = 30_000;
const CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS = 60_000;
+const CODEX_STEER_ALL_DEBOUNCE_MS = 500;
type OpenClawCodingToolsOptions = NonNullable<
Parameters<(typeof import("openclaw/plugin-sdk/agent-harness"))["createOpenClawCodingTools"]>[0]
@@ -123,6 +125,91 @@ function collectTerminalAssistantText(result: EmbeddedRunAttemptResult): string
return result.assistantTexts.join("\n\n").trim();
}
+type CodexSteeringQueueOptions = {
+ steeringMode?: "all" | "one-at-a-time";
+ debounceMs?: number;
+};
+
+function createCodexSteeringQueue(params: {
+ client: CodexAppServerClient;
+ threadId: string;
+ turnId: string;
+ answerPendingUserInput: (text: string) => boolean;
+ signal: AbortSignal;
+}) {
+ let batchedTexts: string[] = [];
+ let batchTimer: NodeJS.Timeout | undefined;
+ let sendChain: Promise = Promise.resolve();
+
+ const clearBatchTimer = () => {
+ if (batchTimer) {
+ clearTimeout(batchTimer);
+ batchTimer = undefined;
+ }
+ };
+
+ const sendTexts = async (texts: string[]) => {
+ if (texts.length === 0 || params.signal.aborted) {
+ return;
+ }
+ await params.client.request("turn/steer", {
+ threadId: params.threadId,
+ expectedTurnId: params.turnId,
+ input: texts.map(toCodexTextInput),
+ });
+ };
+
+ const enqueueSend = (texts: string[]) => {
+ sendChain = sendChain
+ .then(() => sendTexts(texts))
+ .catch((error: unknown) => {
+ embeddedAgentLog.debug("codex app-server queued steer failed", { error });
+ });
+ return sendChain;
+ };
+
+ const flushBatch = () => {
+ clearBatchTimer();
+ const texts = batchedTexts;
+ batchedTexts = [];
+ return enqueueSend(texts);
+ };
+
+ return {
+ async queue(text: string, options?: CodexSteeringQueueOptions) {
+ if (params.answerPendingUserInput(text)) {
+ return;
+ }
+ if (options?.steeringMode === "one-at-a-time") {
+ await flushBatch();
+ await enqueueSend([text]);
+ return;
+ }
+ batchedTexts.push(text);
+ clearBatchTimer();
+ const debounceMs = normalizeCodexSteerDebounceMs(options?.debounceMs);
+ batchTimer = setTimeout(() => {
+ batchTimer = undefined;
+ void flushBatch();
+ }, debounceMs);
+ },
+ cancel() {
+ clearBatchTimer();
+ batchedTexts = [];
+ },
+ };
+}
+
+function normalizeCodexSteerDebounceMs(value: number | undefined): number {
+ return typeof value === "number" && Number.isFinite(value) && value >= 0
+ ? Math.floor(value)
+ : CODEX_STEER_ALL_DEBOUNCE_MS;
+}
+
+function toCodexTextInput(text: string): CodexUserInput {
+ return { type: "text", text, text_elements: [] };
+}
+
export async function runCodexAppServerAttempt(
params: EmbeddedRunAttemptParams,
options: {
@@ -727,18 +814,17 @@ export async function runCodexAppServerAttempt(
});
}
+ const steeringQueue = createCodexSteeringQueue({
+ client,
+ threadId: thread.threadId,
+ turnId: activeTurnId,
+ answerPendingUserInput: (text) => userInputBridge?.handleQueuedMessage(text) ?? false,
+ signal: runAbortController.signal,
+ });
const handle = {
kind: "embedded" as const,
- queueMessage: async (text: string) => {
- if (userInputBridge?.handleQueuedMessage(text)) {
- return;
- }
- await client.request("turn/steer", {
- threadId: thread.threadId,
- expectedTurnId: activeTurnId,
- input: [{ type: "text", text, text_elements: [] }],
- });
- },
+ queueMessage: async (text: string, options?: CodexSteeringQueueOptions) =>
+ steeringQueue.queue(text, options),
isStreaming: () => !completed,
isCompacting: () => projector?.isCompacting() ?? false,
cancel: () => runAbortController.abort("cancelled"),
@@ -913,6 +999,7 @@ export async function runCodexAppServerAttempt(
nativeHookRelay?.unregister();
runAbortController.signal.removeEventListener("abort", abortListener);
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
+ steeringQueue.cancel();
clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
}
}
diff --git a/src/agents/pi-embedded-runner/run-state.ts b/src/agents/pi-embedded-runner/run-state.ts
index 9858b88b651..373cf3ddaec 100644
--- a/src/agents/pi-embedded-runner/run-state.ts
+++ b/src/agents/pi-embedded-runner/run-state.ts
@@ -6,13 +6,18 @@ import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
export type EmbeddedPiQueueHandle = {
kind?: "embedded";
- queueMessage: (text: string) => Promise;
+ queueMessage: (text: string, options?: EmbeddedPiQueueMessageOptions) => Promise;
isStreaming: () => boolean;
isCompacting: () => boolean;
cancel?: (reason?: "user_abort" | "restart" | "superseded") => void;
abort: () => void;
};
+export type EmbeddedPiQueueMessageOptions = {
+ steeringMode?: "all" | "one-at-a-time";
+ debounceMs?: number;
+};
+
export type ActiveEmbeddedRunSnapshot = {
transcriptLeafId: string | null;
messages?: unknown[];
diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts
index 05c6fce9f4b..70d6030d07e 100644
--- a/src/agents/pi-embedded-runner/run/attempt.ts
+++ b/src/agents/pi-embedded-runner/run/attempt.ts
@@ -2156,7 +2156,10 @@ export async function runEmbeddedAttempt(
cancel: (reason?: "user_abort" | "restart" | "superseded") => void;
} = {
kind: "embedded",
- queueMessage: async (text: string) => {
+ queueMessage: async (text: string, options) => {
+ if (options?.steeringMode) {
+ activeSession.agent.steeringMode = options.steeringMode;
+ }
await activeSession.steer(text);
},
isStreaming: () => activeSession.isStreaming,
diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts
index c0d7a4f28f2..3b8e61f2344 100644
--- a/src/agents/pi-embedded-runner/runs.test.ts
+++ b/src/agents/pi-embedded-runner/runs.test.ts
@@ -8,6 +8,7 @@ import {
consumeEmbeddedRunModelSwitch,
getActiveEmbeddedRunSnapshot,
isEmbeddedPiRunHandleActive,
+ queueEmbeddedPiMessage,
requestEmbeddedRunModelSwitch,
resolveActiveEmbeddedRunHandleSessionId,
setActiveEmbeddedRun,
@@ -66,6 +67,32 @@ describe("pi-embedded runner run registry", () => {
expect(abortB).toHaveBeenCalledTimes(1);
});
+ it("passes steering options to active embedded runs", () => {
+ const queueMessage = vi.fn(async () => {});
+ setActiveEmbeddedRun("session-steer", {
+ ...createRunHandle(),
+ queueMessage,
+ });
+
+ expect(
+ queueEmbeddedPiMessage("session-steer", "continue", { steeringMode: "one-at-a-time" }),
+ ).toBe(true);
+
+ expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "one-at-a-time" });
+ });
+
+ it("defaults active embedded steering to all pending messages", () => {
+ const queueMessage = vi.fn(async () => {});
+ setActiveEmbeddedRun("session-default-steer", {
+ ...createRunHandle(),
+ queueMessage,
+ });
+
+ expect(queueEmbeddedPiMessage("session-default-steer", "continue")).toBe(true);
+
+ expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "all" });
+ });
+
it("force-clears an aborted run that does not drain", async () => {
vi.useFakeTimers();
try {
diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts
index ca96f6c5354..a808336513d 100644
--- a/src/agents/pi-embedded-runner/runs.ts
+++ b/src/agents/pi-embedded-runner/runs.ts
@@ -23,6 +23,7 @@ import {
getActiveEmbeddedRunCount,
type ActiveEmbeddedRunSnapshot,
type EmbeddedPiQueueHandle,
+ type EmbeddedPiQueueMessageOptions,
type EmbeddedRunModelSwitchRequest,
type EmbeddedRunWaiter,
} from "./run-state.js";
@@ -31,6 +32,7 @@ export {
getActiveEmbeddedRunCount,
type ActiveEmbeddedRunSnapshot,
type EmbeddedPiQueueHandle,
+ type EmbeddedPiQueueMessageOptions,
type EmbeddedRunModelSwitchRequest,
} from "./run-state.js";
@@ -57,7 +59,11 @@ function clearActiveRunSessionKeys(sessionId: string, sessionKey?: string): void
}
}
-export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
+export function queueEmbeddedPiMessage(
+ sessionId: string,
+ text: string,
+ options?: EmbeddedPiQueueMessageOptions,
+): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
const queuedReplyRunMessage = queueReplyRunMessage(sessionId, text);
@@ -77,7 +83,7 @@ export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean
return false;
}
logMessageQueued({ sessionId, source: "pi-embedded-runner" });
- void handle.queueMessage(text);
+ void handle.queueMessage(text, options ?? { steeringMode: "all" });
return true;
}
diff --git a/src/agents/subagent-announce-delivery.runtime.ts b/src/agents/subagent-announce-delivery.runtime.ts
index 11b610638cf..34f09f979fd 100644
--- a/src/agents/subagent-announce-delivery.runtime.ts
+++ b/src/agents/subagent-announce-delivery.runtime.ts
@@ -6,7 +6,11 @@ export {
resolveStorePath,
} from "../config/sessions.js";
export { callGateway } from "../gateway/call.js";
-export { resolveQueueSettings } from "../auto-reply/reply/queue.js";
+export {
+ isSteeringQueueMode,
+ resolvePiSteeringModeForQueueMode,
+ resolveQueueSettings,
+} from "../auto-reply/reply/queue.js";
export { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best-effort-delivery.js";
export { sendMessage } from "../infra/outbound/message.js";
export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts
index 53ab083581d..6c60f6d73bc 100644
--- a/src/agents/subagent-announce-delivery.test.ts
+++ b/src/agents/subagent-announce-delivery.test.ts
@@ -498,7 +498,10 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
path: "steered",
}),
);
- expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-1", "child done");
+ expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-1", "child done", {
+ steeringMode: "all",
+ debounceMs: 500,
+ });
expect(callGateway).not.toHaveBeenCalled();
});
@@ -704,7 +707,14 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
path: "direct-fallback",
}),
);
- expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-telegram", "child done");
+ expect(queueEmbeddedPiMessage).toHaveBeenCalledWith(
+ "requester-session-telegram",
+ "child done",
+ {
+ steeringMode: "all",
+ debounceMs: 500,
+ },
+ );
expect(callGateway).not.toHaveBeenCalled();
expect(sendMessage).toHaveBeenCalledWith(
expect.objectContaining({
diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts
index b272bb44143..d4fcde6d789 100644
--- a/src/agents/subagent-announce-delivery.ts
+++ b/src/agents/subagent-announce-delivery.ts
@@ -25,8 +25,10 @@ import {
getGlobalHookRunner,
isEmbeddedPiRunActive,
getRuntimeConfig,
+ isSteeringQueueMode,
loadSessionStore,
queueEmbeddedPiMessage,
+ resolvePiSteeringModeForQueueMode,
resolveActiveEmbeddedRunSessionId,
resolveAgentIdFromSessionKey,
resolveConversationIdFromTargets,
@@ -477,11 +479,15 @@ async function maybeQueueSubagentAnnounce(params: {
sessionEntry: entry,
});
- const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog";
+ const shouldSteer = isSteeringQueueMode(queueSettings.mode);
if (shouldSteer) {
const steered = subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage(
sessionId,
params.steerMessage,
+ {
+ steeringMode: resolvePiSteeringModeForQueueMode(queueSettings.mode),
+ ...(queueSettings.debounceMs !== undefined ? { debounceMs: queueSettings.debounceMs } : {}),
+ },
);
if (steered) {
return "steered";
@@ -493,7 +499,10 @@ async function maybeQueueSubagentAnnounce(params: {
queueSettings.mode === "collect" ||
queueSettings.mode === "steer-backlog" ||
queueSettings.mode === "interrupt";
- if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
+ if (
+ isActive &&
+ (shouldFollowup || queueSettings.mode === "steer" || queueSettings.mode === "queue")
+ ) {
const origin = resolveAnnounceOrigin(entry, params.requesterOrigin);
const didQueue = enqueueAnnounce({
key: buildAnnounceQueueKey(canonicalKey, origin),
@@ -714,11 +723,28 @@ async function sendSubagentAnnounceDirectly(params: {
? extractThreadCompletionFallbackText(params.internalEvents)
: "";
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
+ const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry;
+ const requesterQueueSettings = resolveQueueSettings({
+ cfg,
+ channel:
+ requesterEntry?.channel ??
+ requesterEntry?.lastChannel ??
+ requesterEntry?.origin?.provider ??
+ requesterSessionOrigin?.channel ??
+ directOrigin?.channel,
+ sessionEntry: requesterEntry,
+ });
if (params.expectsCompletionMessage && requesterActivity.sessionId) {
const woke = requesterActivity.sessionId
? subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage(
requesterActivity.sessionId,
params.triggerMessage,
+ {
+ steeringMode: "all",
+ ...(requesterQueueSettings.debounceMs !== undefined
+ ? { debounceMs: requesterQueueSettings.debounceMs }
+ : {}),
+ },
)
: false;
if (woke) {
diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts
index f09db36272d..4aa3850b80d 100644
--- a/src/agents/subagent-announce.format.e2e.test.ts
+++ b/src/agents/subagent-announce.format.e2e.test.ts
@@ -93,7 +93,11 @@ const embeddedPiRunStreamingMock = vi.fn false,
);
const queueEmbeddedPiMessageMock = vi.fn(
- (_sessionId: string, _text: string) => false,
+ (
+ _sessionId: string,
+ _text: string,
+ _options?: Parameters[2],
+ ) => false,
);
const waitForEmbeddedPiRunEndMock = vi.fn(
async (_sessionId: string, _timeoutMs?: number) => true,
@@ -327,8 +331,11 @@ describe("subagent announce formatting", () => {
isActive: Boolean(sessionId && embeddedRunMock.isEmbeddedPiRunActive(sessionId)),
};
},
- queueEmbeddedPiMessage: (sessionId: string, text: string) =>
- embeddedRunMock.queueEmbeddedPiMessage(sessionId, text),
+ queueEmbeddedPiMessage: (
+ sessionId: string,
+ text: string,
+ options?: Parameters[2],
+ ) => embeddedRunMock.queueEmbeddedPiMessage(sessionId, text, options),
});
subagentAnnounceTesting.setDepsForTest({
callGateway: async >(
@@ -356,8 +363,8 @@ describe("subagent announce formatting", () => {
.mockImplementation((sessionId) => embeddedRunMock.isEmbeddedPiRunStreaming(sessionId));
queueEmbeddedPiMessageSpy
.mockReset()
- .mockImplementation((sessionId, text) =>
- embeddedRunMock.queueEmbeddedPiMessage(sessionId, text),
+ .mockImplementation((sessionId, text, options) =>
+ embeddedRunMock.queueEmbeddedPiMessage(sessionId, text, options),
);
waitForEmbeddedPiRunEndSpy
.mockReset()
diff --git a/src/agents/subagent-announce.test-support.ts b/src/agents/subagent-announce.test-support.ts
index e6c6686d97f..de9a7858f9d 100644
--- a/src/agents/subagent-announce.test-support.ts
+++ b/src/agents/subagent-announce.test-support.ts
@@ -1,5 +1,6 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { callGateway } from "../gateway/call.js";
+import type { EmbeddedPiQueueMessageOptions } from "./pi-embedded-runner/run-state.js";
type DeliveryRuntimeMockOptions = {
callGateway: (request: unknown) => Promise;
@@ -9,7 +10,11 @@ type DeliveryRuntimeMockOptions = {
resolveMainSessionKey: (cfg: unknown) => string;
resolveStorePath: (store: unknown, options: unknown) => string;
isEmbeddedPiRunActive: (sessionId: string) => boolean;
- queueEmbeddedPiMessage: (sessionId: string, text: string) => boolean;
+ queueEmbeddedPiMessage: (
+ sessionId: string,
+ text: string,
+ options?: EmbeddedPiQueueMessageOptions,
+ ) => boolean;
hasHooks?: () => boolean;
};
@@ -54,6 +59,10 @@ export function createSubagentAnnounceDeliveryRuntimeMock(options: DeliveryRunti
resolveStorePath: options.resolveStorePath,
isEmbeddedPiRunActive: options.isEmbeddedPiRunActive,
queueEmbeddedPiMessage: options.queueEmbeddedPiMessage,
+ isSteeringQueueMode: (mode: string) =>
+ mode === "steer" || mode === "queue" || mode === "steer-backlog",
+ resolvePiSteeringModeForQueueMode: (mode: string) =>
+ mode === "queue" ? "one-at-a-time" : "all",
getGlobalHookRunner: () => ({ hasHooks: () => options.hasHooks?.() ?? false }),
createBoundDeliveryRouter: () => ({
resolveDestination: () => ({ mode: "none" }),
diff --git a/src/agents/subagent-announce.test.ts b/src/agents/subagent-announce.test.ts
index 2d4d08c3e26..7a85e72fd16 100644
--- a/src/agents/subagent-announce.test.ts
+++ b/src/agents/subagent-announce.test.ts
@@ -14,7 +14,9 @@ const resolveStorePathMock = vi.fn((_store: unknown, _options: unknown) => "/tmp
const resolveMainSessionKeyMock = vi.fn((_cfg: unknown) => "agent:main:main");
const readLatestAssistantReplyMock = vi.fn(async (_params?: unknown) => "raw subagent reply");
const isEmbeddedPiRunActiveMock = vi.fn((_sessionId: string) => false);
-const queueEmbeddedPiMessageMock = vi.fn((_sessionId: string, _text: string) => false);
+const queueEmbeddedPiMessageMock = vi.fn(
+ (_sessionId: string, _text: string, _options?: unknown) => false,
+);
const waitForEmbeddedPiRunEndMock = vi.fn(async (_sessionId: string, _timeoutMs?: number) => true);
let mockConfig: ReturnType<(typeof import("../config/config.js"))["getRuntimeConfig"]> = {
session: {
@@ -41,8 +43,8 @@ vi.mock("./subagent-announce.runtime.js", () => ({
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
getRuntimeConfig: () => mockConfig,
loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath),
- queueEmbeddedPiMessage: (sessionId: string, text: string) =>
- queueEmbeddedPiMessageMock(sessionId, text),
+ queueEmbeddedPiMessage: (sessionId: string, text: string, options?: unknown) =>
+ queueEmbeddedPiMessageMock(sessionId, text, options),
resolveAgentIdFromSessionKey: (sessionKey: string) =>
resolveAgentIdFromSessionKeyMock(sessionKey),
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
@@ -65,8 +67,8 @@ vi.mock("./subagent-announce-delivery.runtime.js", () =>
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
resolveStorePath: (store: unknown, options: unknown) => resolveStorePathMock(store, options),
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
- queueEmbeddedPiMessage: (sessionId: string, text: string) =>
- queueEmbeddedPiMessageMock(sessionId, text),
+ queueEmbeddedPiMessage: (sessionId: string, text: string, options?: unknown) =>
+ queueEmbeddedPiMessageMock(sessionId, text, options),
}),
);
@@ -101,6 +103,7 @@ vi.mock("./subagent-announce-delivery.js", () => ({
queueEmbeddedPiMessageMock(
sessionId,
`[Internal task completion event]\n${params.triggerMessage}`,
+ { steeringMode: "all" },
);
return { delivered: true, path: "queue" };
}
@@ -355,6 +358,7 @@ describe("subagent announce seam flow", () => {
expect(queueEmbeddedPiMessageMock).toHaveBeenCalledWith(
"session-origin-provider-steer",
expect.stringContaining("[Internal task completion event]"),
+ { steeringMode: "all" },
);
expect(agentSpy).not.toHaveBeenCalled();
});
diff --git a/src/auto-reply/commands-registry.shared.ts b/src/auto-reply/commands-registry.shared.ts
index 4b0254e58a4..fe20ade6ba0 100644
--- a/src/auto-reply/commands-registry.shared.ts
+++ b/src/auto-reply/commands-registry.shared.ts
@@ -892,7 +892,7 @@ export function buildBuiltinChatCommands(): ChatCommandDefinition[] {
name: "mode",
description: "queue mode",
type: "string",
- choices: ["steer", "interrupt", "followup", "collect", "steer-backlog"],
+ choices: ["steer", "queue", "interrupt", "followup", "collect", "steer-backlog"],
},
{
name: "debounce",
diff --git a/src/auto-reply/reply.directive.parse.test.ts b/src/auto-reply/reply.directive.parse.test.ts
index 6745d55a325..e76ebebb02e 100644
--- a/src/auto-reply/reply.directive.parse.test.ts
+++ b/src/auto-reply/reply.directive.parse.test.ts
@@ -178,6 +178,13 @@ describe("directive parsing", () => {
expect(res.cleaned).toBe("please now");
});
+ it("keeps legacy queue directive as queue mode", () => {
+ const res = extractQueueDirective("please /queue queue now");
+ expect(res.hasDirective).toBe(true);
+ expect(res.queueMode).toBe("queue");
+ expect(res.cleaned).toBe("please now");
+ });
+
it("strips inline /model and /think directives while keeping user text", () => {
expect(parseInlineDirectives("please sync /model openai/gpt-4.1-mini now")).toMatchObject({
cleaned: "please sync now",
diff --git a/src/auto-reply/reply/agent-runner.media-paths.test.ts b/src/auto-reply/reply/agent-runner.media-paths.test.ts
index b937c7cea0a..acfde87d4bc 100644
--- a/src/auto-reply/reply/agent-runner.media-paths.test.ts
+++ b/src/auto-reply/reply/agent-runner.media-paths.test.ts
@@ -42,9 +42,14 @@ vi.mock("../../agents/pi-embedded.js", () => ({
waitForEmbeddedPiRunEnd: waitForEmbeddedPiRunEndMock,
}));
+vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
+ queueEmbeddedPiMessage: queueEmbeddedPiMessageMock,
+}));
+
vi.mock("./queue.js", () => ({
enqueueFollowupRun: enqueueFollowupRunMock,
refreshQueuedFollowupSession: refreshQueuedFollowupSessionMock,
+ resolvePiSteeringModeForQueueMode: (mode: string) => (mode === "queue" ? "one-at-a-time" : "all"),
scheduleFollowupDrain: scheduleFollowupDrainMock,
}));
@@ -198,6 +203,34 @@ describe("runReplyAgent media path normalization", () => {
);
});
+ it("maps steer queue modes to Pi steering drain modes", async () => {
+ queueEmbeddedPiMessageMock.mockReturnValue(true);
+
+ await runReplyAgent(
+ makeRunReplyAgentParams({
+ resolvedQueue: { mode: "steer" } as QueueSettings,
+ shouldSteer: true,
+ isStreaming: true,
+ }),
+ );
+
+ expect(queueEmbeddedPiMessageMock).toHaveBeenLastCalledWith("session", "generate chart", {
+ steeringMode: "all",
+ });
+
+ await runReplyAgent(
+ makeRunReplyAgentParams({
+ resolvedQueue: { mode: "queue" } as QueueSettings,
+ shouldSteer: true,
+ isStreaming: true,
+ }),
+ );
+
+ expect(queueEmbeddedPiMessageMock).toHaveBeenLastCalledWith("session", "generate chart", {
+ steeringMode: "one-at-a-time",
+ });
+ });
+
it("shares one media cache between block accumulation and final payload delivery", async () => {
let stagedIndex = 0;
resolveOutboundAttachmentFromUrlMock.mockImplementation(async (mediaUrl: string) => {
diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts
index 379e538ab55..15cb1499b1a 100644
--- a/src/auto-reply/reply/agent-runner.ts
+++ b/src/auto-reply/reply/agent-runner.ts
@@ -69,6 +69,7 @@ import { resolveActiveRunQueueAction } from "./queue-policy.js";
import {
enqueueFollowupRun,
refreshQueuedFollowupSession,
+ resolvePiSteeringModeForQueueMode,
type FollowupRun,
type QueueSettings,
} from "./queue.js";
@@ -998,7 +999,10 @@ export async function runReplyAgent(params: {
const steerSessionId =
(sessionKey ? replyRunRegistry.resolveSessionId(sessionKey) : undefined) ??
followupRun.run.sessionId;
- const steered = queueEmbeddedPiMessage(steerSessionId, followupRun.prompt);
+ const steered = queueEmbeddedPiMessage(steerSessionId, followupRun.prompt, {
+ steeringMode: resolvePiSteeringModeForQueueMode(resolvedQueue.mode),
+ ...(resolvedQueue.debounceMs !== undefined ? { debounceMs: resolvedQueue.debounceMs } : {}),
+ });
if (steered && !shouldFollowup) {
await touchActiveSessionEntry();
typing.cleanup();
diff --git a/src/auto-reply/reply/directive-handling.queue-validation.test.ts b/src/auto-reply/reply/directive-handling.queue-validation.test.ts
index a1a4cf6266f..c0640f2c0d1 100644
--- a/src/auto-reply/reply/directive-handling.queue-validation.test.ts
+++ b/src/auto-reply/reply/directive-handling.queue-validation.test.ts
@@ -32,7 +32,7 @@ describe("maybeHandleQueueDirective", () => {
"Current queue settings: mode=collect, debounce=1500ms, cap=9, drop=summarize.",
);
expect(current?.text).toContain(
- "Options: modes steer, followup, collect, steer+backlog, interrupt; debounce:, cap:, drop:old|new|summarize.",
+ "Options: modes steer, queue, followup, collect, steer+backlog, interrupt; debounce:, cap:, drop:old|new|summarize.",
);
});
});
diff --git a/src/auto-reply/reply/directive-handling.queue-validation.ts b/src/auto-reply/reply/directive-handling.queue-validation.ts
index 4f3e11a416a..9f7221662aa 100644
--- a/src/auto-reply/reply/directive-handling.queue-validation.ts
+++ b/src/auto-reply/reply/directive-handling.queue-validation.ts
@@ -37,7 +37,7 @@ export function maybeHandleQueueDirective(params: {
return {
text: withOptions(
`Current queue settings: mode=${settings.mode}, debounce=${debounceLabel}, cap=${capLabel}, drop=${dropLabel}.`,
- "modes steer, followup, collect, steer+backlog, interrupt; debounce:, cap:, drop:old|new|summarize",
+ "modes steer, queue, followup, collect, steer+backlog, interrupt; debounce:, cap:, drop:old|new|summarize",
),
};
}
@@ -53,7 +53,7 @@ export function maybeHandleQueueDirective(params: {
const errors: string[] = [];
if (queueModeInvalid) {
errors.push(
- `Unrecognized queue mode "${directives.rawQueueMode ?? ""}". Valid modes: steer, followup, collect, steer+backlog, interrupt.`,
+ `Unrecognized queue mode "${directives.rawQueueMode ?? ""}". Valid modes: steer, queue, followup, collect, steer+backlog, interrupt.`,
);
}
if (queueDebounceInvalid) {
diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts
index 092d75d8385..9515b7431c0 100644
--- a/src/auto-reply/reply/get-reply-run.ts
+++ b/src/auto-reply/reply/get-reply-run.ts
@@ -61,6 +61,7 @@ import { resolveOriginMessageProvider } from "./origin-routing.js";
import { buildReplyPromptBodies } from "./prompt-prelude.js";
import { resolveActiveRunQueueAction } from "./queue-policy.js";
import { resolveQueueSettings } from "./queue/settings-runtime.js";
+import { isSteeringQueueMode } from "./queue/steering.js";
import { resolveRuntimePolicySessionKey } from "./runtime-policy-session-key.js";
import { resolveBareSessionResetPromptState } from "./session-reset-prompt.js";
import { resolveBareResetBootstrapFileAccess } from "./session-reset-prompt.js";
@@ -855,7 +856,7 @@ export async function runPreparedReply(
};
};
let { activeSessionId, isActive, isStreaming } = resolveQueueBusyState();
- const shouldSteer = resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog";
+ const shouldSteer = isSteeringQueueMode(resolvedQueue.mode);
const shouldFollowup =
resolvedQueue.mode === "followup" ||
resolvedQueue.mode === "collect" ||
diff --git a/src/auto-reply/reply/queue-policy.test.ts b/src/auto-reply/reply/queue-policy.test.ts
index 265cc19ff9b..c8adde53ea6 100644
--- a/src/auto-reply/reply/queue-policy.test.ts
+++ b/src/auto-reply/reply/queue-policy.test.ts
@@ -36,13 +36,15 @@ describe("resolveActiveRunQueueAction", () => {
});
it("enqueues steer mode runs while active", () => {
- expect(
- resolveActiveRunQueueAction({
- isActive: true,
- isHeartbeat: false,
- shouldFollowup: false,
- queueMode: "steer",
- }),
- ).toBe("enqueue-followup");
+ for (const queueMode of ["steer", "queue"] as const) {
+ expect(
+ resolveActiveRunQueueAction({
+ isActive: true,
+ isHeartbeat: false,
+ shouldFollowup: false,
+ queueMode,
+ }),
+ ).toBe("enqueue-followup");
+ }
});
});
diff --git a/src/auto-reply/reply/queue-policy.ts b/src/auto-reply/reply/queue-policy.ts
index 73fc48bdcc6..d7c85107491 100644
--- a/src/auto-reply/reply/queue-policy.ts
+++ b/src/auto-reply/reply/queue-policy.ts
@@ -14,7 +14,7 @@ export function resolveActiveRunQueueAction(params: {
if (params.isHeartbeat) {
return "drop";
}
- if (params.shouldFollowup || params.queueMode === "steer") {
+ if (params.shouldFollowup || params.queueMode === "steer" || params.queueMode === "queue") {
return "enqueue-followup";
}
return "run-now";
diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts
index b293b56262a..d3c7a5b2e64 100644
--- a/src/auto-reply/reply/queue.ts
+++ b/src/auto-reply/reply/queue.ts
@@ -9,6 +9,11 @@ export {
} from "./queue/enqueue.js";
export { resolveQueueSettings } from "./queue/settings-runtime.js";
export { clearFollowupQueue, refreshQueuedFollowupSession } from "./queue/state.js";
+export {
+ isSteeringQueueMode,
+ resolvePiSteeringModeForQueueMode,
+ type PiSteeringMode,
+} from "./queue/steering.js";
export type {
FollowupRun,
QueueDedupeMode,
diff --git a/src/auto-reply/reply/queue/normalize.ts b/src/auto-reply/reply/queue/normalize.ts
index fbb7c7d8522..4e3a7339b82 100644
--- a/src/auto-reply/reply/queue/normalize.ts
+++ b/src/auto-reply/reply/queue/normalize.ts
@@ -7,7 +7,7 @@ export function normalizeQueueMode(raw?: string): QueueMode | undefined {
return undefined;
}
if (cleaned === "queue" || cleaned === "queued") {
- return "steer";
+ return "queue";
}
if (cleaned === "interrupt" || cleaned === "interrupts" || cleaned === "abort") {
return "interrupt";
diff --git a/src/auto-reply/reply/queue/settings.test.ts b/src/auto-reply/reply/queue/settings.test.ts
index ee9b406bc11..1ac8313268e 100644
--- a/src/auto-reply/reply/queue/settings.test.ts
+++ b/src/auto-reply/reply/queue/settings.test.ts
@@ -54,4 +54,20 @@ describe("resolveQueueSettings", () => {
dropPolicy: "summarize",
});
});
+
+ it("keeps legacy queue mode distinct from steer", () => {
+ expect(
+ resolveQueueSettings({
+ cfg: {
+ messages: {
+ queue: {
+ mode: "queue",
+ },
+ },
+ } as OpenClawConfig,
+ }),
+ ).toMatchObject({
+ mode: "queue",
+ });
+ });
});
diff --git a/src/auto-reply/reply/queue/steering.ts b/src/auto-reply/reply/queue/steering.ts
new file mode 100644
index 00000000000..d0c388eb82a
--- /dev/null
+++ b/src/auto-reply/reply/queue/steering.ts
@@ -0,0 +1,11 @@
+import type { QueueMode } from "./types.js";
+
+export type PiSteeringMode = "all" | "one-at-a-time";
+
+export function isSteeringQueueMode(mode: QueueMode): boolean {
+ return mode === "steer" || mode === "queue" || mode === "steer-backlog";
+}
+
+export function resolvePiSteeringModeForQueueMode(mode: QueueMode): PiSteeringMode {
+ return mode === "queue" ? "one-at-a-time" : "all";
+}
diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts
index fb2553c87d5..ac976270054 100644
--- a/src/config/schema.base.generated.ts
+++ b/src/config/schema.base.generated.ts
@@ -18977,7 +18977,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
],
title: "Queue Mode",
description:
- 'Queue behavior mode. "steer" injects at the next model boundary; "followup" runs later; "collect" batches later; "steer-backlog" and "steer+backlog" steer now and preserve backlog; "queue" aliases steer; "interrupt" aborts the active run. Use conservative modes unless interruption is intentional.',
+ 'Queue behavior mode. "steer" injects all queued steering messages at the next model boundary; "queue" is legacy one-at-a-time steering; "followup" runs later; "collect" batches later; "steer-backlog" does both; "interrupt" aborts the active run.',
},
byChannel: {
type: "object",
@@ -28307,7 +28307,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
},
"messages.queue.mode": {
label: "Queue Mode",
- help: 'Queue behavior mode. "steer" injects at the next model boundary; "followup" runs later; "collect" batches later; "steer-backlog" and "steer+backlog" steer now and preserve backlog; "queue" aliases steer; "interrupt" aborts the active run. Use conservative modes unless interruption is intentional.',
+ help: 'Queue behavior mode. "steer" injects all queued steering messages at the next model boundary; "queue" is legacy one-at-a-time steering; "followup" runs later; "collect" batches later; "steer-backlog" does both; "interrupt" aborts the active run.',
tags: ["advanced"],
},
"messages.queue.byChannel": {
diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts
index 7fb6d752da8..f823b4fe4e8 100644
--- a/src/config/schema.help.ts
+++ b/src/config/schema.help.ts
@@ -1637,7 +1637,7 @@ export const FIELD_HELP: Record = {
"messages.queue":
"Inbound message queue strategy for messages that arrive while a session run is active. Default mode is steer, with followup fallback when steering is unavailable.",
"messages.queue.mode":
- 'Queue behavior mode. "steer" injects at the next model boundary; "followup" runs later; "collect" batches later; "steer-backlog" and "steer+backlog" steer now and preserve backlog; "queue" aliases steer; "interrupt" aborts the active run. Use conservative modes unless interruption is intentional.',
+ 'Queue behavior mode. "steer" injects all queued steering messages at the next model boundary; "queue" is legacy one-at-a-time steering; "followup" runs later; "collect" batches later; "steer-backlog" does both; "interrupt" aborts the active run.',
"messages.queue.byChannel":
"Per-channel queue mode overrides keyed by provider id (for example telegram, discord, slack). Use this when one channel’s traffic pattern needs different queue behavior than global defaults.",
"messages.queue.debounceMs":