fix(agents): pin media requester route at task start

This commit is contained in:
Peter Steinberger
2026-06-22 09:15:57 -07:00
committed by Ayaan Zaidi
parent 1ed8592467
commit 025db6cf9e
6 changed files with 100 additions and 223 deletions

View File

@@ -1,8 +1,5 @@
// Subagent announce delivery tests cover the last-mile routing used when child
// runs report progress or completion back to the requester session.
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { OutboundDeliveryError } from "../infra/outbound/deliver-types.js";
import {
@@ -4852,183 +4849,3 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
});
});
});
describe("deliverSubagentAnnouncement requester session backfill (issue #86034)", () => {
// Regression: image_generate launched from a non-direct-reply turn (heartbeat,
// cron, subagent spawn) supplies a completion origin missing `to`. The
// already-loaded requester session entry carries `lastTo`/`lastChannel`, so
// effectiveDirectOrigin must backfill from it before the deliverability gate,
// otherwise generated media is silently dropped.
it("backfills to/accountId from the requester session entry when completion origin is incomplete", async () => {
const agentId = `backfill-${Date.now()}-${Math.random().toString(16).slice(2)}`;
const sessionKey = `agent:${agentId}:telegram:5866004662`;
const storeTemplate = path.join(
os.tmpdir(),
`openclaw-86034-session-${agentId}-{agentId}.json`,
);
const storePath = storeTemplate.replaceAll("{agentId}", agentId);
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId: "telegram-session-1",
updatedAt: Date.now(),
channel: "telegram",
lastChannel: "telegram",
lastTo: "5866004662",
lastAccountId: "bot-1",
},
},
null,
2,
),
"utf-8",
);
try {
const dispatchGatewayMethodInProcess = createInProcessGatewayMock({
result: { payloads: [{ text: "requester voice completion" }] },
});
testing.setDepsForTest({
dispatchGatewayMethodInProcess,
getRequesterSessionActivity: () => ({
sessionId: "telegram-session-1",
isActive: false,
}),
getRuntimeConfig: () => ({ session: { store: storeTemplate } }) as never,
});
const result = await deliverSubagentAnnouncement({
requesterSessionKey: sessionKey,
targetRequesterSessionKey: sessionKey,
triggerMessage: "image done",
steerMessage: "image done",
// Origin carries channel but NOT `to` or `accountId` — simulates an
// image_generate task created off the direct-reply path.
requesterOrigin: { channel: "telegram" },
requesterSessionOrigin: { channel: "telegram" },
completionDirectOrigin: { channel: "telegram" },
directOrigin: { channel: "telegram" },
requesterIsSubagent: false,
expectsCompletionMessage: true,
bestEffortDeliver: true,
directIdempotencyKey: "announce-86034-backfill",
sourceTool: "image_generate",
});
expectRecordFields(result, {
delivered: true,
path: "direct",
});
// The deliverability decision must see the backfilled `to`.
expectInProcessAgentParams(dispatchGatewayMethodInProcess, {
deliver: true,
channel: "telegram",
accountId: "bot-1",
to: "5866004662",
});
} finally {
await fs.rm(storePath, { force: true });
}
});
// Cross-channel safety regression: a stale lastChannel that differs from
// the completion origin's channel must not leak its lastTo into the
// telegram delivery. mergeDeliveryContext.channelsConflict (delivery-context.shared.ts:233-260)
// is the structural guard; this test locks it.
it("does not import a cross-channel lastTo when the completion origin's channel differs from lastChannel (cross-channel guard regression)", async () => {
const agentId = `xchan-${Date.now()}-${Math.random().toString(16).slice(2)}`;
const sessionKey = `agent:${agentId}:telegram:5866004662`;
const storeTemplate = path.join(
os.tmpdir(),
`openclaw-86034-xchan-session-${agentId}-{agentId}.json`,
);
const storePath = storeTemplate.replaceAll("{agentId}", agentId);
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId: "telegram-session-xchan",
updatedAt: Date.now(),
channel: "telegram",
lastChannel: "signal",
lastTo: "signal-stale-target",
lastAccountId: "signal-bot-1",
},
},
null,
2,
),
"utf-8",
);
try {
const dispatchGatewayMethodInProcess = createInProcessGatewayMock({
result: { payloads: [{ text: "requester voice completion" }] },
});
testing.setDepsForTest({
dispatchGatewayMethodInProcess,
getRequesterSessionActivity: () => ({
sessionId: "telegram-session-xchan",
isActive: false,
}),
getRuntimeConfig: () => ({ session: { store: storeTemplate } }) as never,
});
const result = await deliverSubagentAnnouncement({
requesterSessionKey: sessionKey,
targetRequesterSessionKey: sessionKey,
triggerMessage: "image done",
steerMessage: "image done",
requesterOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
requesterSessionOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
completionDirectOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
directOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
requesterIsSubagent: false,
expectsCompletionMessage: true,
bestEffortDeliver: true,
directIdempotencyKey: "announce-86034-cross-channel",
sourceTool: "image_generate",
});
// Structural assertion: the stale signal `to` must not have leaked into
// any in-process gateway dispatch, regardless of whether the call ended
// up routed (path: "direct") or short-circuited (delivered: false /
// path: "none"). The guard is "no cross-channel `to` ever reaches the
// gateway", not a specific terminal path.
expect(dispatchGatewayMethodInProcess).not.toHaveBeenCalledWith(
"agent",
expect.objectContaining({ to: "signal-stale-target" }),
expect.anything(),
);
expect(dispatchGatewayMethodInProcess).not.toHaveBeenCalledWith(
"agent",
expect.objectContaining({ channel: "signal" }),
expect.anything(),
);
for (const call of asMock(dispatchGatewayMethodInProcess).mock.calls) {
const params = call[1] as Record<string, unknown> | undefined;
if (!params) {
continue;
}
expect(params.to).not.toBe("signal-stale-target");
expect(params.channel).not.toBe("signal");
}
// Result-shape assertion: if the dispatcher was invoked at all on the
// direct path, it must not have been with the stale signal target.
if (
(result as { path?: string }).path === "direct" &&
(result as { delivered?: boolean }).delivered === true
) {
const params = mockCallArg(dispatchGatewayMethodInProcess, 0, 1) as Record<string, unknown>;
expect(params.to).not.toBe("signal-stale-target");
expect(params.channel).not.toBe("signal");
}
} finally {
await fs.rm(storePath, { force: true });
}
});
});

View File

@@ -28,11 +28,7 @@ import {
import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js";
import { isCronRunSessionKey, isCronSessionKey } from "../sessions/session-key-utils.js";
import { isNonTerminalAgentRunStatus } from "../shared/agent-run-status.js";
import {
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import { mergeDeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isDeliverableMessageChannel,
@@ -1228,19 +1224,15 @@ async function sendSubagentAnnounceDirectly(params: {
const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin);
const directOrigin = normalizeDeliveryContext(params.directOrigin);
const requesterSessionOrigin = normalizeDeliveryContext(params.requesterSessionOrigin);
const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry;
// Backfill missing fields (channel, to, accountId) from the requester
// session entry's lastChannel/lastTo so a completion origin that carries
// a channel but not a `to` (e.g. heartbeat, cron, subagent spawn paths
// where `agentTo` is undefined) still resolves an external delivery
// target. Without this, every Boolean(channel && to) gate downstream
// short-circuits and generated media is silently dropped.
const requesterSessionDeliveryFallback = deliveryContextFromSession(requesterEntry);
// Merge completionDirectOrigin with directOrigin so that missing fields
// (channel, to, accountId) fall back to the originating session's
// lastChannel / lastTo. Without this, a completion origin that carries a
// channel but not a `to` would prevent external delivery.
const externalCompletionDirectOrigin =
stripNonDeliverableChannelForCompletionOrigin(completionDirectOrigin);
const completionExternalFallbackOrigin = mergeDeliveryContext(
directOrigin,
mergeDeliveryContext(requesterSessionOrigin, requesterSessionDeliveryFallback),
requesterSessionOrigin,
);
const effectiveDirectOrigin = params.expectsCompletionMessage
? mergeDeliveryContext(externalCompletionDirectOrigin, completionExternalFallbackOrigin)
@@ -1248,6 +1240,7 @@ async function sendSubagentAnnounceDirectly(params: {
const sessionOnlyOrigin = effectiveDirectOrigin?.channel
? effectiveDirectOrigin
: requesterSessionOrigin;
const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry;
const deliveryTarget = !params.requesterIsSubagent
? resolveExternalBestEffortDeliveryTarget({
channel: effectiveDirectOrigin?.channel,

View File

@@ -4,12 +4,20 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
const subagentAnnounceDeliveryMocks = vi.hoisted(() => ({
deliverSubagentAnnouncement: vi.fn(),
loadRequesterSessionEntry: vi.fn(() => ({ entry: undefined })),
}));
const detachedTaskRuntimeMocks = vi.hoisted(() => ({
completeTaskRunByRunId: vi.fn(),
createRunningTaskRun: vi.fn(() => ({ taskId: "task-pinned-route" })),
failTaskRunByRunId: vi.fn(),
recordTaskRunProgressByRunId: vi.fn(),
}));
const taskRegistryDeliveryRuntimeMocks = vi.hoisted(() => ({
sendMessage: vi.fn(),
}));
vi.mock("../subagent-announce-delivery.js", () => subagentAnnounceDeliveryMocks);
vi.mock("../../tasks/detached-task-runtime.js", () => detachedTaskRuntimeMocks);
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => taskRegistryDeliveryRuntimeMocks);
import {
@@ -20,6 +28,9 @@ import {
beforeEach(() => {
subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockReset();
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReset();
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({ entry: undefined });
detachedTaskRuntimeMocks.createRunningTaskRun.mockClear();
taskRegistryDeliveryRuntimeMocks.sendMessage.mockReset();
});
@@ -378,6 +389,65 @@ describe("scheduleMediaGenerationTaskCompletion", () => {
});
describe("createMediaGenerationTaskLifecycle", () => {
it("pins a missing requester target from session state when the task starts", async () => {
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({
entry: {
lastChannel: "telegram",
lastTo: "5866004662",
lastAccountId: "bot-1",
},
});
subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValueOnce({
delivered: true,
});
const lifecycle = createMediaGenerationTaskLifecycle({
toolName: "image_generate",
taskKind: "image_generation",
label: "Image generation",
queuedProgressSummary: "Queued image generation",
generatedLabel: "image",
failureProgressSummary: "Image generation failed",
eventSource: "image_generation",
announceType: "image generation task",
completionLabel: "image",
});
const handle = lifecycle.createTaskRun({
sessionKey: "agent:main:telegram:5866004662",
requesterOrigin: { channel: "telegram" },
prompt: "proof image",
});
expect(handle?.requesterOrigin).toEqual({
channel: "telegram",
to: "5866004662",
accountId: "bot-1",
});
// Later session drift cannot change the route stored on the task handle.
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({
entry: {
lastChannel: "telegram",
lastTo: "other-peer",
lastAccountId: "bot-1",
},
});
await lifecycle.wakeTaskCompletion({
handle,
status: "ok",
statusLabel: "completed successfully",
result: "generated",
});
expect(subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement).toHaveBeenCalledWith(
expect.objectContaining({
completionDirectOrigin: {
channel: "telegram",
to: "5866004662",
accountId: "bot-1",
},
}),
);
});
it("returns the completion wake delivery result", async () => {
subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValueOnce({
delivered: true,

View File

@@ -19,7 +19,12 @@ import {
resolveRequiredCompletionDeliveryFailureTerminalResult,
type RequiredCompletionTerminalResult,
} from "../../tasks/task-completion-contract.js";
import { normalizeDeliveryContext, type DeliveryContext } from "../../utils/delivery-context.js";
import {
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
type DeliveryContext,
} from "../../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isDeliverableMessageChannel,
@@ -29,7 +34,10 @@ import {
type AgentGeneratedAttachment,
} from "../generated-attachments.js";
import { formatAgentInternalEventsForPrompt, type AgentInternalEvent } from "../internal-events.js";
import { deliverSubagentAnnouncement } from "../subagent-announce-delivery.js";
import {
deliverSubagentAnnouncement,
loadRequesterSessionEntry,
} from "../subagent-announce-delivery.js";
import type { SubagentAnnounceDeliveryFailureReason } from "../subagent-announce-dispatch.js";
const log = createSubsystemLogger("agents/tools/media-generate-background-shared");
@@ -141,6 +149,12 @@ function createMediaGenerationTaskRun(params: {
}
const runId = `tool:${params.toolName}:${crypto.randomUUID()}`;
try {
// Pin the complete requester route when detached work starts. Completion-time
// session state can move to another peer while generation is still running.
const requesterOrigin = mergeDeliveryContext(
normalizeDeliveryContext(params.requesterOrigin),
deliveryContextFromSession(loadRequesterSessionEntry(sessionKey).entry),
);
const task = createRunningTaskRun({
runtime: "cli",
taskKind: params.taskKind,
@@ -148,7 +162,7 @@ function createMediaGenerationTaskRun(params: {
requesterSessionKey: sessionKey,
ownerKey: sessionKey,
scopeKind: "session",
requesterOrigin: params.requesterOrigin,
requesterOrigin,
childSessionKey: sessionKey,
runId,
label: params.label,
@@ -166,7 +180,7 @@ function createMediaGenerationTaskRun(params: {
taskId: task.taskId,
runId,
requesterSessionKey: sessionKey,
requesterOrigin: params.requesterOrigin,
requesterOrigin,
taskLabel: params.prompt,
};
touchMediaGenerationTaskRunContext(handle);

View File

@@ -243,20 +243,17 @@ export function mergeDeliveryContext(
normalizedPrimary?.channel &&
normalizedFallback?.channel &&
normalizedPrimary.channel !== normalizedFallback.channel;
const accountsConflict =
normalizedPrimary?.accountId &&
normalizedFallback?.accountId &&
normalizedPrimary.accountId !== normalizedFallback.accountId;
const routesConflict = channelsConflict || accountsConflict;
return normalizeDeliveryContext({
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
// Keep route fields paired to their channel; avoid crossing fields between
// unrelated channels during session context merges.
to: routesConflict ? normalizedPrimary?.to : (normalizedPrimary?.to ?? normalizedFallback?.to),
accountId: routesConflict
to: channelsConflict
? normalizedPrimary?.to
: (normalizedPrimary?.to ?? normalizedFallback?.to),
accountId: channelsConflict
? normalizedPrimary?.accountId
: (normalizedPrimary?.accountId ?? normalizedFallback?.accountId),
threadId: routesConflict
threadId: channelsConflict
? normalizedPrimary?.threadId
: (normalizedPrimary?.threadId ?? normalizedFallback?.threadId),
});

View File

@@ -53,20 +53,6 @@ describe("delivery context helpers", () => {
});
});
it("does not inherit route fields from a different account on the same channel", () => {
const merged = mergeDeliveryContext(
{ channel: "telegram", accountId: "bot-a" },
{ channel: "telegram", to: "123", accountId: "bot-b", threadId: "99" },
);
expect(merged).toEqual({
channel: "telegram",
to: undefined,
accountId: "bot-a",
});
expect(merged?.threadId).toBeUndefined();
});
it("uses fallback route fields when fallback has no channel", () => {
const merged = mergeDeliveryContext(
{ channel: "demo-channel" },