fix(queue): split collect batches by auth context (#66024)

* fix(queue): split collect batches by auth context

Co-authored-by: zsx <git@zsxsoft.com>

* fix(queue): keep overflow summary on splits

* fix(queue): preserve grouped collect retry semantics

* fix(queue): drop USER.md from pr

* fix(queue): keep overflow summary in first auth group

* fix(queue): clear overflow summary state after first auth group

* fix(queue): narrow auth split key

* fix(queue): flush collect summary-only drains

* changelog: note collect-mode auth-context batch split (#66024)

---------

Co-authored-by: zsx <git@zsxsoft.com>
Co-authored-by: Devin Robison <drobison@nvidia.com>
This commit is contained in:
Agustin Rivera
2026-04-13 12:35:39 -07:00
committed by GitHub
parent 48aae82bbc
commit 43d4be9027
3 changed files with 853 additions and 19 deletions

View File

@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
- Cron/scheduler: stop inventing short retries when cron next-run calculation returns no valid future slot, and keep a maintenance wake armed so enabled unscheduled jobs recover without entering a refire loop. (#66019, #66083) Thanks @mbelinky.
- Cron/scheduler: preserve the active error-backoff floor when maintenance repair recomputes a missing cron next-run, so recurring errored jobs do not resume early after a transient next-run resolution failure. (#66019, #66083, #66113) Thanks @mbelinky.
- Outbound/delivery-queue: persist the originating outbound `session` context on queued delivery entries and replay it during recovery, so write-ahead-queued sends keep their original outbound media policy context after restart instead of evaluating against a missing session. (#66025) Thanks @eleqtrizit.
- Auto-reply/queue: split collect-mode followup drains into contiguous groups by per-message authorization context (sender id, owner status, exec/bash-elevated overrides), so queued items from different senders or exec configs no longer execute under the last queued run's owner-only and exec-approval context. (#66024) Thanks @eleqtrizit.
## 2026.4.12

View File

@@ -6,6 +6,7 @@ import {
createQueueTestRun as createRun,
installQueueRuntimeErrorSilencer,
} from "./queue.test-helpers.js";
import { resolveFollowupAuthorizationKey } from "./queue/drain.js";
installQueueRuntimeErrorSilencer();
@@ -94,6 +95,440 @@ describe("followup queue collect routing", () => {
expect(calls[0]?.originatingTo).toBe("channel:A");
});
it("splits collect batches when sender authorization changes", async () => {
const key = `test-collect-auth-split-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const expectedCalls = 2;
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
if (calls.length >= expectedCalls) {
done.resolve();
}
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const nonOwner = createRun({
prompt: "use the gateway tool",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...nonOwner,
run: {
...nonOwner.run,
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
const owner = createRun({
prompt: "what's the weather?",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...owner,
run: {
...owner.run,
senderId: "owner-1",
senderName: "Owner",
senderIsOwner: true,
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls.map((call) => call.run.senderIsOwner)).toEqual([false, true]);
expect(calls[0]?.prompt).toContain("use the gateway tool");
expect(calls[0]?.prompt).not.toContain("what's the weather?");
expect(calls[1]?.prompt).toContain("what's the weather?");
expect(calls[1]?.prompt).toContain("(from Owner)");
});
it("keeps one collect batch when authorization context matches", async () => {
const key = `test-collect-auth-match-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
done.resolve();
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const first = createRun({
prompt: "first",
originatingChannel: "slack",
originatingTo: "channel:A",
});
const second = createRun({
prompt: "second",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...first,
run: {
...first.run,
senderId: "user-1",
senderName: "Guest",
senderUsername: "guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...second,
run: {
...second.run,
senderId: "user-1",
senderName: "Guest",
senderUsername: "guest",
senderIsOwner: false,
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls).toHaveLength(1);
expect(calls[0]?.prompt).toContain("first");
expect(calls[0]?.prompt).toContain("second");
expect(calls[0]?.prompt).toContain("(from Guest)");
});
it("keeps one collect batch when only sender display fields drift", async () => {
const key = `test-collect-auth-display-drift-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
done.resolve();
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const first = createRun({
prompt: "first",
originatingChannel: "slack",
originatingTo: "channel:A",
});
const second = createRun({
prompt: "second",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...first,
run: {
...first.run,
senderId: "user-1",
senderName: "Guest",
senderUsername: "guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...second,
run: {
...second.run,
senderId: "user-1",
senderName: "Guest User",
senderUsername: "guest-renamed",
senderIsOwner: false,
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls).toHaveLength(1);
expect(calls[0]?.prompt).toContain("first");
expect(calls[0]?.prompt).toContain("second");
expect(calls[0]?.prompt).toContain("(from Guest)");
expect(calls[0]?.prompt).toContain("(from Guest User)");
});
it("splits collect batches when exec context changes", async () => {
const key = `test-collect-exec-split-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const expectedCalls = 2;
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
if (calls.length >= expectedCalls) {
done.resolve();
}
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const base = createRun({
prompt: "first",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...base,
run: {
...base.run,
senderId: "owner-1",
senderIsOwner: true,
bashElevated: { enabled: false, allowed: true, defaultLevel: "off" },
},
},
settings,
);
enqueueFollowupRun(
key,
{
...createRun({
prompt: "second",
originatingChannel: "slack",
originatingTo: "channel:A",
}),
run: {
...base.run,
senderId: "owner-1",
senderIsOwner: true,
bashElevated: { enabled: true, allowed: true, defaultLevel: "on" },
execOverrides: { ask: "always" },
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls[0]?.prompt).toContain("first");
expect(calls[0]?.prompt).not.toContain("second");
expect(calls[1]?.prompt).toContain("second");
expect(calls[1]?.run.bashElevated?.enabled).toBe(true);
expect(calls[1]?.run.execOverrides?.ask).toBe("always");
});
it("uses the newest run within a matching authorization batch", async () => {
const key = `test-collect-latest-run-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
done.resolve();
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const first = createRun({ prompt: "first", originatingChannel: "slack", originatingTo: "A" });
const second = createRun({
prompt: "second",
originatingChannel: "slack",
originatingTo: "A",
});
enqueueFollowupRun(
key,
{
...first,
run: {
...first.run,
provider: "openai",
model: "gpt-5.4",
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...second,
run: {
...second.run,
provider: "anthropic",
model: "sonnet-4.6",
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls).toHaveLength(1);
expect(calls[0]?.run.provider).toBe("anthropic");
expect(calls[0]?.run.model).toBe("sonnet-4.6");
});
it("delivers and clears summary-only collect drains after cross-channel items", async () => {
const key = `test-collect-summary-only-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const expectedCalls = 3;
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
if (calls.length >= expectedCalls) {
done.resolve();
}
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 2,
dropPolicy: "summarize",
};
enqueueFollowupRun(
key,
createRun({
prompt: "first",
originatingChannel: "slack",
originatingTo: "channel:A",
}),
settings,
);
enqueueFollowupRun(
key,
createRun({
prompt: "second",
originatingChannel: "slack",
originatingTo: "channel:B",
}),
settings,
);
enqueueFollowupRun(
key,
createRun({
prompt: "third",
originatingChannel: "slack",
originatingTo: "channel:C",
}),
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls).toHaveLength(3);
expect(calls[0]?.prompt).toBe("second");
expect(calls[1]?.prompt).toBe("third");
expect(calls[2]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
expect(calls[2]?.prompt).toContain("- first");
});
it("preserves collect order when authorization changes more than once", async () => {
const key = `test-collect-auth-order-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const expectedCalls = 3;
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
if (calls.length >= expectedCalls) {
done.resolve();
}
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const first = createRun({ prompt: "first", originatingChannel: "slack", originatingTo: "A" });
const second = createRun({ prompt: "second", originatingChannel: "slack", originatingTo: "A" });
const third = createRun({ prompt: "third", originatingChannel: "slack", originatingTo: "A" });
enqueueFollowupRun(
key,
{
...first,
run: { ...first.run, senderId: "user-a", senderName: "A", senderIsOwner: false },
},
settings,
);
enqueueFollowupRun(
key,
{
...second,
run: { ...second.run, senderId: "owner-1", senderName: "Owner", senderIsOwner: true },
},
settings,
);
enqueueFollowupRun(
key,
{
...third,
run: { ...third.run, senderId: "user-a", senderName: "A", senderIsOwner: false },
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls.map((call) => call.prompt)).toEqual([
expect.stringContaining("first"),
expect.stringContaining("second"),
expect.stringContaining("third"),
]);
});
it("collects Slack messages in same thread and preserves string thread id", async () => {
const key = `test-collect-slack-thread-same-${Date.now()}`;
const calls: FollowupRun[] = [];
@@ -212,6 +647,83 @@ describe("followup queue collect routing", () => {
expect(calls[0]?.prompt).toContain("Queued #2\ntwo");
});
it("retries only the remaining collect auth groups after a partial failure", async () => {
const key = `test-collect-partial-retry-${Date.now()}`;
const attempts: FollowupRun[] = [];
const successfulCalls: FollowupRun[] = [];
const done = createDeferred<void>();
let attempt = 0;
const runFollowup = async (run: FollowupRun) => {
attempt += 1;
attempts.push(run);
if (attempt === 2) {
throw new Error("transient failure");
}
successfulCalls.push(run);
if (attempt >= 3) {
done.resolve();
}
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const guest = createRun({
prompt: "guest message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
const owner = createRun({
prompt: "owner message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...guest,
run: {
...guest.run,
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...owner,
run: {
...owner.run,
senderId: "owner-1",
senderName: "Owner",
senderIsOwner: true,
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
const guestAttempts = attempts.filter((call) => call.prompt.includes("guest message"));
const ownerAttempts = attempts.filter((call) => call.prompt.includes("owner message"));
expect(attempts).toHaveLength(3);
expect(guestAttempts).toHaveLength(1);
expect(ownerAttempts).toHaveLength(2);
expect(successfulCalls.map((call) => call.prompt)).toEqual([
expect.stringContaining("guest message"),
expect.stringContaining("owner message"),
]);
});
it("retries overflow summary delivery without losing dropped previews", async () => {
const key = `test-overflow-summary-retry-${Date.now()}`;
const calls: FollowupRun[] = [];
@@ -241,6 +753,183 @@ describe("followup queue collect routing", () => {
expect(calls[0]?.prompt).toContain("- first");
});
it("includes the overflow summary only in the first split auth group", async () => {
const key = `test-collect-overflow-summary-once-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
const expectedCalls = 2;
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
if (calls.length >= expectedCalls) {
done.resolve();
}
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 2,
dropPolicy: "summarize",
};
const droppedGuest = createRun({
prompt: "dropped guest message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
const guest = createRun({
prompt: "guest message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
const owner = createRun({
prompt: "owner message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...droppedGuest,
run: {
...droppedGuest.run,
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...guest,
run: {
...guest.run,
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...owner,
run: {
...owner.run,
senderId: "owner-1",
senderName: "Owner",
senderIsOwner: true,
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls).toHaveLength(2);
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
expect(calls[0]?.prompt).toContain("- dropped guest message");
expect(calls[1]?.prompt).not.toContain("[Queue overflow]");
expect(calls[1]?.prompt).not.toContain("dropped guest message");
});
it("does not re-deliver overflow summary on partial auth group failure retry", async () => {
const key = `test-collect-overflow-partial-retry-${Date.now()}`;
const calls: FollowupRun[] = [];
const done = createDeferred<void>();
let attempt = 0;
const runFollowup = async (run: FollowupRun) => {
attempt += 1;
// First group succeeds (attempt 1), second group fails (attempt 2),
// then second group succeeds on retry (attempt 3).
if (attempt === 2) {
throw new Error("transient failure");
}
calls.push(run);
if (calls.length >= 2) {
done.resolve();
}
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 2,
dropPolicy: "summarize",
};
const droppedGuest = createRun({
prompt: "dropped guest message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
const guest = createRun({
prompt: "guest message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
const owner = createRun({
prompt: "owner message",
originatingChannel: "slack",
originatingTo: "channel:A",
});
enqueueFollowupRun(
key,
{
...droppedGuest,
run: {
...droppedGuest.run,
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...guest,
run: {
...guest.run,
senderId: "user-1",
senderName: "Guest",
senderIsOwner: false,
},
},
settings,
);
enqueueFollowupRun(
key,
{
...owner,
run: {
...owner.run,
senderId: "owner-1",
senderName: "Owner",
senderIsOwner: true,
},
},
settings,
);
scheduleFollowupDrain(key, runFollowup);
await done.promise;
expect(calls).toHaveLength(2);
// First group got the overflow summary
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
expect(calls[0]?.prompt).toContain("- dropped guest message");
// Second group (retried after failure) must NOT get the overflow summary again
expect(calls[1]?.prompt).not.toContain("[Queue overflow]");
expect(calls[1]?.prompt).not.toContain("dropped guest message");
expect(calls[1]?.prompt).toContain("owner message");
});
it("preserves routing metadata on overflow summary followups", async () => {
const key = `test-overflow-summary-routing-${Date.now()}`;
const calls: FollowupRun[] = [];
@@ -289,3 +978,61 @@ describe("followup queue collect routing", () => {
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
});
});
describe("resolveFollowupAuthorizationKey", () => {
it("changes when sender ownership changes", () => {
const run = createRun({ prompt: "one" }).run;
expect(
resolveFollowupAuthorizationKey({
...run,
senderId: "user-1",
senderIsOwner: false,
}),
).not.toBe(
resolveFollowupAuthorizationKey({
...run,
senderId: "user-1",
senderIsOwner: true,
}),
);
});
it("changes when exec defaults change", () => {
const run = createRun({ prompt: "one" }).run;
expect(
resolveFollowupAuthorizationKey({
...run,
senderId: "user-1",
bashElevated: { enabled: false, allowed: true, defaultLevel: "off" },
}),
).not.toBe(
resolveFollowupAuthorizationKey({
...run,
senderId: "user-1",
bashElevated: { enabled: true, allowed: true, defaultLevel: "on" },
execOverrides: { ask: "always" },
}),
);
});
it("does not change when only sender display fields change", () => {
const run = createRun({ prompt: "one" }).run;
expect(
resolveFollowupAuthorizationKey({
...run,
senderId: "user-1",
senderName: "Guest",
senderUsername: "guest",
senderIsOwner: false,
}),
).toBe(
resolveFollowupAuthorizationKey({
...run,
senderId: "user-1",
senderName: "Guest User",
senderUsername: "guest-renamed",
senderIsOwner: false,
}),
);
});
});

View File

@@ -59,6 +59,63 @@ function resolveOriginRoutingMetadata(items: FollowupRun[]): OriginRoutingMetada
};
}
// Keep this key aligned with the fields that affect per-message authorization or
// exec-context propagation in collect-mode batching. Display-only sender fields
// stay out of the key so profile/name drift does not force conservative splits.
// Fields like authProfileId, elevatedLevel, ownerNumbers, and config are
// intentionally excluded because they are session-level or not consulted in
// per-message authorization checks.
export function resolveFollowupAuthorizationKey(run: FollowupRun["run"]): string {
return JSON.stringify([
run.senderId ?? "",
run.senderE164 ?? "",
run.senderIsOwner === true,
run.execOverrides?.host ?? "",
run.execOverrides?.security ?? "",
run.execOverrides?.ask ?? "",
run.execOverrides?.node ?? "",
run.bashElevated?.enabled === true,
run.bashElevated?.allowed === true,
run.bashElevated?.defaultLevel ?? "",
]);
}
function splitCollectItemsByAuthorization(items: FollowupRun[]): FollowupRun[][] {
if (items.length <= 1) {
return items.length === 0 ? [] : [items];
}
const groups: FollowupRun[][] = [];
let currentGroup: FollowupRun[] = [];
let currentKey: string | undefined;
for (const item of items) {
const itemKey = resolveFollowupAuthorizationKey(item.run);
if (currentGroup.length === 0 || itemKey === currentKey) {
currentGroup.push(item);
currentKey = itemKey;
continue;
}
groups.push(currentGroup);
currentGroup = [item];
currentKey = itemKey;
}
if (currentGroup.length > 0) {
groups.push(currentGroup);
}
return groups;
}
function renderCollectItem(item: FollowupRun, idx: number): string {
const senderLabel =
item.run.senderName ?? item.run.senderUsername ?? item.run.senderId ?? item.run.senderE164;
const senderSuffix = senderLabel ? ` (from ${senderLabel})` : "";
return `---\nQueued #${idx + 1}${senderSuffix}\n${item.prompt}`.trim();
}
function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string } {
const { originatingChannel: channel, originatingTo: to, originatingAccountId: accountId } = item;
const threadId = item.originatingThreadId;
@@ -108,6 +165,17 @@ export function scheduleFollowupDrain(
run: effectiveRunFollowup,
});
if (collectDrainResult === "empty") {
const summaryOnlyPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" });
const run = queue.lastRun;
if (summaryOnlyPrompt && run) {
await effectiveRunFollowup({
prompt: summaryOnlyPrompt,
run,
enqueuedAt: Date.now(),
});
clearQueueSummaryState(queue);
continue;
}
break;
}
if (collectDrainResult === "drained") {
@@ -116,28 +184,46 @@ export function scheduleFollowupDrain(
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" });
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
const authGroups = splitCollectItemsByAuthorization(items);
if (authGroups.length === 0) {
const run = queue.lastRun;
if (!summary || !run) {
break;
}
await effectiveRunFollowup({
prompt: summary,
run,
enqueuedAt: Date.now(),
});
clearQueueSummaryState(queue);
continue;
}
const routing = resolveOriginRoutingMetadata(items);
let pendingSummary = summary;
for (const groupItems of authGroups) {
const run = groupItems.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
}
const prompt = buildCollectPrompt({
title: "[Queued messages while agent was busy]",
items,
summary,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
await effectiveRunFollowup({
prompt,
run,
enqueuedAt: Date.now(),
...routing,
});
queue.items.splice(0, items.length);
if (summary) {
clearQueueSummaryState(queue);
const routing = resolveOriginRoutingMetadata(groupItems);
const prompt = buildCollectPrompt({
title: "[Queued messages while agent was busy]",
items: groupItems,
summary: pendingSummary,
renderItem: renderCollectItem,
});
await effectiveRunFollowup({
prompt,
run,
enqueuedAt: Date.now(),
...routing,
});
queue.items.splice(0, groupItems.length);
if (pendingSummary) {
clearQueueSummaryState(queue);
pendingSummary = undefined;
}
}
continue;
}