diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index a562fad21c4..57741517689 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -1,8 +1,5 @@ // Subagent announce delivery tests cover the last-mile routing used when child // runs report progress or completion back to the requester session. -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; import { OutboundDeliveryError } from "../infra/outbound/deliver-types.js"; import { @@ -4852,183 +4849,3 @@ describe("deliverSubagentAnnouncement completion delivery", () => { }); }); }); - -describe("deliverSubagentAnnouncement requester session backfill (issue #86034)", () => { - // Regression: image_generate launched from a non-direct-reply turn (heartbeat, - // cron, subagent spawn) supplies a completion origin missing `to`. The - // already-loaded requester session entry carries `lastTo`/`lastChannel`, so - // effectiveDirectOrigin must backfill from it before the deliverability gate, - // otherwise generated media is silently dropped. - it("backfills to/accountId from the requester session entry when completion origin is incomplete", async () => { - const agentId = `backfill-${Date.now()}-${Math.random().toString(16).slice(2)}`; - const sessionKey = `agent:${agentId}:telegram:5866004662`; - const storeTemplate = path.join( - os.tmpdir(), - `openclaw-86034-session-${agentId}-{agentId}.json`, - ); - const storePath = storeTemplate.replaceAll("{agentId}", agentId); - await fs.writeFile( - storePath, - JSON.stringify( - { - [sessionKey]: { - sessionId: "telegram-session-1", - updatedAt: Date.now(), - channel: "telegram", - lastChannel: "telegram", - lastTo: "5866004662", - lastAccountId: "bot-1", - }, - }, - null, - 2, - ), - "utf-8", - ); - - try { - const dispatchGatewayMethodInProcess = createInProcessGatewayMock({ - result: { payloads: [{ text: "requester voice completion" }] }, - }); - testing.setDepsForTest({ - dispatchGatewayMethodInProcess, - getRequesterSessionActivity: () => ({ - sessionId: "telegram-session-1", - isActive: false, - }), - getRuntimeConfig: () => ({ session: { store: storeTemplate } }) as never, - }); - - const result = await deliverSubagentAnnouncement({ - requesterSessionKey: sessionKey, - targetRequesterSessionKey: sessionKey, - triggerMessage: "image done", - steerMessage: "image done", - // Origin carries channel but NOT `to` or `accountId` — simulates an - // image_generate task created off the direct-reply path. - requesterOrigin: { channel: "telegram" }, - requesterSessionOrigin: { channel: "telegram" }, - completionDirectOrigin: { channel: "telegram" }, - directOrigin: { channel: "telegram" }, - requesterIsSubagent: false, - expectsCompletionMessage: true, - bestEffortDeliver: true, - directIdempotencyKey: "announce-86034-backfill", - sourceTool: "image_generate", - }); - - expectRecordFields(result, { - delivered: true, - path: "direct", - }); - // The deliverability decision must see the backfilled `to`. - expectInProcessAgentParams(dispatchGatewayMethodInProcess, { - deliver: true, - channel: "telegram", - accountId: "bot-1", - to: "5866004662", - }); - } finally { - await fs.rm(storePath, { force: true }); - } - }); - - // Cross-channel safety regression: a stale lastChannel that differs from - // the completion origin's channel must not leak its lastTo into the - // telegram delivery. mergeDeliveryContext.channelsConflict (delivery-context.shared.ts:233-260) - // is the structural guard; this test locks it. - it("does not import a cross-channel lastTo when the completion origin's channel differs from lastChannel (cross-channel guard regression)", async () => { - const agentId = `xchan-${Date.now()}-${Math.random().toString(16).slice(2)}`; - const sessionKey = `agent:${agentId}:telegram:5866004662`; - const storeTemplate = path.join( - os.tmpdir(), - `openclaw-86034-xchan-session-${agentId}-{agentId}.json`, - ); - const storePath = storeTemplate.replaceAll("{agentId}", agentId); - await fs.writeFile( - storePath, - JSON.stringify( - { - [sessionKey]: { - sessionId: "telegram-session-xchan", - updatedAt: Date.now(), - channel: "telegram", - lastChannel: "signal", - lastTo: "signal-stale-target", - lastAccountId: "signal-bot-1", - }, - }, - null, - 2, - ), - "utf-8", - ); - - try { - const dispatchGatewayMethodInProcess = createInProcessGatewayMock({ - result: { payloads: [{ text: "requester voice completion" }] }, - }); - testing.setDepsForTest({ - dispatchGatewayMethodInProcess, - getRequesterSessionActivity: () => ({ - sessionId: "telegram-session-xchan", - isActive: false, - }), - getRuntimeConfig: () => ({ session: { store: storeTemplate } }) as never, - }); - - const result = await deliverSubagentAnnouncement({ - requesterSessionKey: sessionKey, - targetRequesterSessionKey: sessionKey, - triggerMessage: "image done", - steerMessage: "image done", - requesterOrigin: { channel: "telegram", accountId: "telegram-bot-1" }, - requesterSessionOrigin: { channel: "telegram", accountId: "telegram-bot-1" }, - completionDirectOrigin: { channel: "telegram", accountId: "telegram-bot-1" }, - directOrigin: { channel: "telegram", accountId: "telegram-bot-1" }, - requesterIsSubagent: false, - expectsCompletionMessage: true, - bestEffortDeliver: true, - directIdempotencyKey: "announce-86034-cross-channel", - sourceTool: "image_generate", - }); - - // Structural assertion: the stale signal `to` must not have leaked into - // any in-process gateway dispatch, regardless of whether the call ended - // up routed (path: "direct") or short-circuited (delivered: false / - // path: "none"). The guard is "no cross-channel `to` ever reaches the - // gateway", not a specific terminal path. - expect(dispatchGatewayMethodInProcess).not.toHaveBeenCalledWith( - "agent", - expect.objectContaining({ to: "signal-stale-target" }), - expect.anything(), - ); - expect(dispatchGatewayMethodInProcess).not.toHaveBeenCalledWith( - "agent", - expect.objectContaining({ channel: "signal" }), - expect.anything(), - ); - for (const call of asMock(dispatchGatewayMethodInProcess).mock.calls) { - const params = call[1] as Record | undefined; - if (!params) { - continue; - } - expect(params.to).not.toBe("signal-stale-target"); - expect(params.channel).not.toBe("signal"); - } - - // Result-shape assertion: if the dispatcher was invoked at all on the - // direct path, it must not have been with the stale signal target. - if ( - (result as { path?: string }).path === "direct" && - (result as { delivered?: boolean }).delivered === true - ) { - const params = mockCallArg(dispatchGatewayMethodInProcess, 0, 1) as Record; - expect(params.to).not.toBe("signal-stale-target"); - expect(params.channel).not.toBe("signal"); - } - } finally { - await fs.rm(storePath, { force: true }); - } - }); -}); diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 169b327f9ce..7af7957cc22 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -28,11 +28,7 @@ import { import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js"; import { isCronRunSessionKey, isCronSessionKey } from "../sessions/session-key-utils.js"; import { isNonTerminalAgentRunStatus } from "../shared/agent-run-status.js"; -import { - deliveryContextFromSession, - mergeDeliveryContext, - normalizeDeliveryContext, -} from "../utils/delivery-context.js"; +import { mergeDeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { INTERNAL_MESSAGE_CHANNEL, isDeliverableMessageChannel, @@ -1228,19 +1224,15 @@ async function sendSubagentAnnounceDirectly(params: { const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin); const directOrigin = normalizeDeliveryContext(params.directOrigin); const requesterSessionOrigin = normalizeDeliveryContext(params.requesterSessionOrigin); - const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry; - // Backfill missing fields (channel, to, accountId) from the requester - // session entry's lastChannel/lastTo so a completion origin that carries - // a channel but not a `to` (e.g. heartbeat, cron, subagent spawn paths - // where `agentTo` is undefined) still resolves an external delivery - // target. Without this, every Boolean(channel && to) gate downstream - // short-circuits and generated media is silently dropped. - const requesterSessionDeliveryFallback = deliveryContextFromSession(requesterEntry); + // Merge completionDirectOrigin with directOrigin so that missing fields + // (channel, to, accountId) fall back to the originating session's + // lastChannel / lastTo. Without this, a completion origin that carries a + // channel but not a `to` would prevent external delivery. const externalCompletionDirectOrigin = stripNonDeliverableChannelForCompletionOrigin(completionDirectOrigin); const completionExternalFallbackOrigin = mergeDeliveryContext( directOrigin, - mergeDeliveryContext(requesterSessionOrigin, requesterSessionDeliveryFallback), + requesterSessionOrigin, ); const effectiveDirectOrigin = params.expectsCompletionMessage ? mergeDeliveryContext(externalCompletionDirectOrigin, completionExternalFallbackOrigin) @@ -1248,6 +1240,7 @@ async function sendSubagentAnnounceDirectly(params: { const sessionOnlyOrigin = effectiveDirectOrigin?.channel ? effectiveDirectOrigin : requesterSessionOrigin; + const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry; const deliveryTarget = !params.requesterIsSubagent ? resolveExternalBestEffortDeliveryTarget({ channel: effectiveDirectOrigin?.channel, diff --git a/src/agents/tools/media-generate-background-shared.test.ts b/src/agents/tools/media-generate-background-shared.test.ts index 32d68aeefba..f5058f8a2da 100644 --- a/src/agents/tools/media-generate-background-shared.test.ts +++ b/src/agents/tools/media-generate-background-shared.test.ts @@ -4,12 +4,20 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const subagentAnnounceDeliveryMocks = vi.hoisted(() => ({ deliverSubagentAnnouncement: vi.fn(), + loadRequesterSessionEntry: vi.fn(() => ({ entry: undefined })), +})); +const detachedTaskRuntimeMocks = vi.hoisted(() => ({ + completeTaskRunByRunId: vi.fn(), + createRunningTaskRun: vi.fn(() => ({ taskId: "task-pinned-route" })), + failTaskRunByRunId: vi.fn(), + recordTaskRunProgressByRunId: vi.fn(), })); const taskRegistryDeliveryRuntimeMocks = vi.hoisted(() => ({ sendMessage: vi.fn(), })); vi.mock("../subagent-announce-delivery.js", () => subagentAnnounceDeliveryMocks); +vi.mock("../../tasks/detached-task-runtime.js", () => detachedTaskRuntimeMocks); vi.mock("../../tasks/task-registry-delivery-runtime.js", () => taskRegistryDeliveryRuntimeMocks); import { @@ -20,6 +28,9 @@ import { beforeEach(() => { subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockReset(); + subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReset(); + subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({ entry: undefined }); + detachedTaskRuntimeMocks.createRunningTaskRun.mockClear(); taskRegistryDeliveryRuntimeMocks.sendMessage.mockReset(); }); @@ -378,6 +389,65 @@ describe("scheduleMediaGenerationTaskCompletion", () => { }); describe("createMediaGenerationTaskLifecycle", () => { + it("pins a missing requester target from session state when the task starts", async () => { + subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({ + entry: { + lastChannel: "telegram", + lastTo: "5866004662", + lastAccountId: "bot-1", + }, + }); + subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValueOnce({ + delivered: true, + }); + const lifecycle = createMediaGenerationTaskLifecycle({ + toolName: "image_generate", + taskKind: "image_generation", + label: "Image generation", + queuedProgressSummary: "Queued image generation", + generatedLabel: "image", + failureProgressSummary: "Image generation failed", + eventSource: "image_generation", + announceType: "image generation task", + completionLabel: "image", + }); + + const handle = lifecycle.createTaskRun({ + sessionKey: "agent:main:telegram:5866004662", + requesterOrigin: { channel: "telegram" }, + prompt: "proof image", + }); + expect(handle?.requesterOrigin).toEqual({ + channel: "telegram", + to: "5866004662", + accountId: "bot-1", + }); + + // Later session drift cannot change the route stored on the task handle. + subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({ + entry: { + lastChannel: "telegram", + lastTo: "other-peer", + lastAccountId: "bot-1", + }, + }); + await lifecycle.wakeTaskCompletion({ + handle, + status: "ok", + statusLabel: "completed successfully", + result: "generated", + }); + expect(subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement).toHaveBeenCalledWith( + expect.objectContaining({ + completionDirectOrigin: { + channel: "telegram", + to: "5866004662", + accountId: "bot-1", + }, + }), + ); + }); + it("returns the completion wake delivery result", async () => { subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValueOnce({ delivered: true, diff --git a/src/agents/tools/media-generate-background-shared.ts b/src/agents/tools/media-generate-background-shared.ts index 99a80cf7050..3f5a9445d2f 100644 --- a/src/agents/tools/media-generate-background-shared.ts +++ b/src/agents/tools/media-generate-background-shared.ts @@ -19,7 +19,12 @@ import { resolveRequiredCompletionDeliveryFailureTerminalResult, type RequiredCompletionTerminalResult, } from "../../tasks/task-completion-contract.js"; -import { normalizeDeliveryContext, type DeliveryContext } from "../../utils/delivery-context.js"; +import { + deliveryContextFromSession, + mergeDeliveryContext, + normalizeDeliveryContext, + type DeliveryContext, +} from "../../utils/delivery-context.js"; import { INTERNAL_MESSAGE_CHANNEL, isDeliverableMessageChannel, @@ -29,7 +34,10 @@ import { type AgentGeneratedAttachment, } from "../generated-attachments.js"; import { formatAgentInternalEventsForPrompt, type AgentInternalEvent } from "../internal-events.js"; -import { deliverSubagentAnnouncement } from "../subagent-announce-delivery.js"; +import { + deliverSubagentAnnouncement, + loadRequesterSessionEntry, +} from "../subagent-announce-delivery.js"; import type { SubagentAnnounceDeliveryFailureReason } from "../subagent-announce-dispatch.js"; const log = createSubsystemLogger("agents/tools/media-generate-background-shared"); @@ -141,6 +149,12 @@ function createMediaGenerationTaskRun(params: { } const runId = `tool:${params.toolName}:${crypto.randomUUID()}`; try { + // Pin the complete requester route when detached work starts. Completion-time + // session state can move to another peer while generation is still running. + const requesterOrigin = mergeDeliveryContext( + normalizeDeliveryContext(params.requesterOrigin), + deliveryContextFromSession(loadRequesterSessionEntry(sessionKey).entry), + ); const task = createRunningTaskRun({ runtime: "cli", taskKind: params.taskKind, @@ -148,7 +162,7 @@ function createMediaGenerationTaskRun(params: { requesterSessionKey: sessionKey, ownerKey: sessionKey, scopeKind: "session", - requesterOrigin: params.requesterOrigin, + requesterOrigin, childSessionKey: sessionKey, runId, label: params.label, @@ -166,7 +180,7 @@ function createMediaGenerationTaskRun(params: { taskId: task.taskId, runId, requesterSessionKey: sessionKey, - requesterOrigin: params.requesterOrigin, + requesterOrigin, taskLabel: params.prompt, }; touchMediaGenerationTaskRunContext(handle); diff --git a/src/utils/delivery-context.shared.ts b/src/utils/delivery-context.shared.ts index 8948d7c8f75..a7dcce48f4a 100644 --- a/src/utils/delivery-context.shared.ts +++ b/src/utils/delivery-context.shared.ts @@ -243,20 +243,17 @@ export function mergeDeliveryContext( normalizedPrimary?.channel && normalizedFallback?.channel && normalizedPrimary.channel !== normalizedFallback.channel; - const accountsConflict = - normalizedPrimary?.accountId && - normalizedFallback?.accountId && - normalizedPrimary.accountId !== normalizedFallback.accountId; - const routesConflict = channelsConflict || accountsConflict; return normalizeDeliveryContext({ channel: normalizedPrimary?.channel ?? normalizedFallback?.channel, // Keep route fields paired to their channel; avoid crossing fields between // unrelated channels during session context merges. - to: routesConflict ? normalizedPrimary?.to : (normalizedPrimary?.to ?? normalizedFallback?.to), - accountId: routesConflict + to: channelsConflict + ? normalizedPrimary?.to + : (normalizedPrimary?.to ?? normalizedFallback?.to), + accountId: channelsConflict ? normalizedPrimary?.accountId : (normalizedPrimary?.accountId ?? normalizedFallback?.accountId), - threadId: routesConflict + threadId: channelsConflict ? normalizedPrimary?.threadId : (normalizedPrimary?.threadId ?? normalizedFallback?.threadId), }); diff --git a/src/utils/delivery-context.test.ts b/src/utils/delivery-context.test.ts index 8b64b97ab58..bae7a0576e8 100644 --- a/src/utils/delivery-context.test.ts +++ b/src/utils/delivery-context.test.ts @@ -53,20 +53,6 @@ describe("delivery context helpers", () => { }); }); - it("does not inherit route fields from a different account on the same channel", () => { - const merged = mergeDeliveryContext( - { channel: "telegram", accountId: "bot-a" }, - { channel: "telegram", to: "123", accountId: "bot-b", threadId: "99" }, - ); - - expect(merged).toEqual({ - channel: "telegram", - to: undefined, - accountId: "bot-a", - }); - expect(merged?.threadId).toBeUndefined(); - }); - it("uses fallback route fields when fallback has no channel", () => { const merged = mergeDeliveryContext( { channel: "demo-channel" },