mirror of
https://github.com/openclaw/openclaw.git
synced 2026-07-02 00:03:37 +00:00
fix(agents): pin media requester route at task start
This commit is contained in:
committed by
Ayaan Zaidi
parent
1ed8592467
commit
025db6cf9e
@@ -1,8 +1,5 @@
|
||||
// Subagent announce delivery tests cover the last-mile routing used when child
|
||||
// runs report progress or completion back to the requester session.
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { OutboundDeliveryError } from "../infra/outbound/deliver-types.js";
|
||||
import {
|
||||
@@ -4852,183 +4849,3 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("deliverSubagentAnnouncement requester session backfill (issue #86034)", () => {
|
||||
// Regression: image_generate launched from a non-direct-reply turn (heartbeat,
|
||||
// cron, subagent spawn) supplies a completion origin missing `to`. The
|
||||
// already-loaded requester session entry carries `lastTo`/`lastChannel`, so
|
||||
// effectiveDirectOrigin must backfill from it before the deliverability gate,
|
||||
// otherwise generated media is silently dropped.
|
||||
it("backfills to/accountId from the requester session entry when completion origin is incomplete", async () => {
|
||||
const agentId = `backfill-${Date.now()}-${Math.random().toString(16).slice(2)}`;
|
||||
const sessionKey = `agent:${agentId}:telegram:5866004662`;
|
||||
const storeTemplate = path.join(
|
||||
os.tmpdir(),
|
||||
`openclaw-86034-session-${agentId}-{agentId}.json`,
|
||||
);
|
||||
const storePath = storeTemplate.replaceAll("{agentId}", agentId);
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
sessionId: "telegram-session-1",
|
||||
updatedAt: Date.now(),
|
||||
channel: "telegram",
|
||||
lastChannel: "telegram",
|
||||
lastTo: "5866004662",
|
||||
lastAccountId: "bot-1",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
try {
|
||||
const dispatchGatewayMethodInProcess = createInProcessGatewayMock({
|
||||
result: { payloads: [{ text: "requester voice completion" }] },
|
||||
});
|
||||
testing.setDepsForTest({
|
||||
dispatchGatewayMethodInProcess,
|
||||
getRequesterSessionActivity: () => ({
|
||||
sessionId: "telegram-session-1",
|
||||
isActive: false,
|
||||
}),
|
||||
getRuntimeConfig: () => ({ session: { store: storeTemplate } }) as never,
|
||||
});
|
||||
|
||||
const result = await deliverSubagentAnnouncement({
|
||||
requesterSessionKey: sessionKey,
|
||||
targetRequesterSessionKey: sessionKey,
|
||||
triggerMessage: "image done",
|
||||
steerMessage: "image done",
|
||||
// Origin carries channel but NOT `to` or `accountId` — simulates an
|
||||
// image_generate task created off the direct-reply path.
|
||||
requesterOrigin: { channel: "telegram" },
|
||||
requesterSessionOrigin: { channel: "telegram" },
|
||||
completionDirectOrigin: { channel: "telegram" },
|
||||
directOrigin: { channel: "telegram" },
|
||||
requesterIsSubagent: false,
|
||||
expectsCompletionMessage: true,
|
||||
bestEffortDeliver: true,
|
||||
directIdempotencyKey: "announce-86034-backfill",
|
||||
sourceTool: "image_generate",
|
||||
});
|
||||
|
||||
expectRecordFields(result, {
|
||||
delivered: true,
|
||||
path: "direct",
|
||||
});
|
||||
// The deliverability decision must see the backfilled `to`.
|
||||
expectInProcessAgentParams(dispatchGatewayMethodInProcess, {
|
||||
deliver: true,
|
||||
channel: "telegram",
|
||||
accountId: "bot-1",
|
||||
to: "5866004662",
|
||||
});
|
||||
} finally {
|
||||
await fs.rm(storePath, { force: true });
|
||||
}
|
||||
});
|
||||
|
||||
// Cross-channel safety regression: a stale lastChannel that differs from
|
||||
// the completion origin's channel must not leak its lastTo into the
|
||||
// telegram delivery. mergeDeliveryContext.channelsConflict (delivery-context.shared.ts:233-260)
|
||||
// is the structural guard; this test locks it.
|
||||
it("does not import a cross-channel lastTo when the completion origin's channel differs from lastChannel (cross-channel guard regression)", async () => {
|
||||
const agentId = `xchan-${Date.now()}-${Math.random().toString(16).slice(2)}`;
|
||||
const sessionKey = `agent:${agentId}:telegram:5866004662`;
|
||||
const storeTemplate = path.join(
|
||||
os.tmpdir(),
|
||||
`openclaw-86034-xchan-session-${agentId}-{agentId}.json`,
|
||||
);
|
||||
const storePath = storeTemplate.replaceAll("{agentId}", agentId);
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
sessionId: "telegram-session-xchan",
|
||||
updatedAt: Date.now(),
|
||||
channel: "telegram",
|
||||
lastChannel: "signal",
|
||||
lastTo: "signal-stale-target",
|
||||
lastAccountId: "signal-bot-1",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
try {
|
||||
const dispatchGatewayMethodInProcess = createInProcessGatewayMock({
|
||||
result: { payloads: [{ text: "requester voice completion" }] },
|
||||
});
|
||||
testing.setDepsForTest({
|
||||
dispatchGatewayMethodInProcess,
|
||||
getRequesterSessionActivity: () => ({
|
||||
sessionId: "telegram-session-xchan",
|
||||
isActive: false,
|
||||
}),
|
||||
getRuntimeConfig: () => ({ session: { store: storeTemplate } }) as never,
|
||||
});
|
||||
|
||||
const result = await deliverSubagentAnnouncement({
|
||||
requesterSessionKey: sessionKey,
|
||||
targetRequesterSessionKey: sessionKey,
|
||||
triggerMessage: "image done",
|
||||
steerMessage: "image done",
|
||||
requesterOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
|
||||
requesterSessionOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
|
||||
completionDirectOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
|
||||
directOrigin: { channel: "telegram", accountId: "telegram-bot-1" },
|
||||
requesterIsSubagent: false,
|
||||
expectsCompletionMessage: true,
|
||||
bestEffortDeliver: true,
|
||||
directIdempotencyKey: "announce-86034-cross-channel",
|
||||
sourceTool: "image_generate",
|
||||
});
|
||||
|
||||
// Structural assertion: the stale signal `to` must not have leaked into
|
||||
// any in-process gateway dispatch, regardless of whether the call ended
|
||||
// up routed (path: "direct") or short-circuited (delivered: false /
|
||||
// path: "none"). The guard is "no cross-channel `to` ever reaches the
|
||||
// gateway", not a specific terminal path.
|
||||
expect(dispatchGatewayMethodInProcess).not.toHaveBeenCalledWith(
|
||||
"agent",
|
||||
expect.objectContaining({ to: "signal-stale-target" }),
|
||||
expect.anything(),
|
||||
);
|
||||
expect(dispatchGatewayMethodInProcess).not.toHaveBeenCalledWith(
|
||||
"agent",
|
||||
expect.objectContaining({ channel: "signal" }),
|
||||
expect.anything(),
|
||||
);
|
||||
for (const call of asMock(dispatchGatewayMethodInProcess).mock.calls) {
|
||||
const params = call[1] as Record<string, unknown> | undefined;
|
||||
if (!params) {
|
||||
continue;
|
||||
}
|
||||
expect(params.to).not.toBe("signal-stale-target");
|
||||
expect(params.channel).not.toBe("signal");
|
||||
}
|
||||
|
||||
// Result-shape assertion: if the dispatcher was invoked at all on the
|
||||
// direct path, it must not have been with the stale signal target.
|
||||
if (
|
||||
(result as { path?: string }).path === "direct" &&
|
||||
(result as { delivered?: boolean }).delivered === true
|
||||
) {
|
||||
const params = mockCallArg(dispatchGatewayMethodInProcess, 0, 1) as Record<string, unknown>;
|
||||
expect(params.to).not.toBe("signal-stale-target");
|
||||
expect(params.channel).not.toBe("signal");
|
||||
}
|
||||
} finally {
|
||||
await fs.rm(storePath, { force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -28,11 +28,7 @@ import {
|
||||
import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js";
|
||||
import { isCronRunSessionKey, isCronSessionKey } from "../sessions/session-key-utils.js";
|
||||
import { isNonTerminalAgentRunStatus } from "../shared/agent-run-status.js";
|
||||
import {
|
||||
deliveryContextFromSession,
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
} from "../utils/delivery-context.js";
|
||||
import { mergeDeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import {
|
||||
INTERNAL_MESSAGE_CHANNEL,
|
||||
isDeliverableMessageChannel,
|
||||
@@ -1228,19 +1224,15 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin);
|
||||
const directOrigin = normalizeDeliveryContext(params.directOrigin);
|
||||
const requesterSessionOrigin = normalizeDeliveryContext(params.requesterSessionOrigin);
|
||||
const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry;
|
||||
// Backfill missing fields (channel, to, accountId) from the requester
|
||||
// session entry's lastChannel/lastTo so a completion origin that carries
|
||||
// a channel but not a `to` (e.g. heartbeat, cron, subagent spawn paths
|
||||
// where `agentTo` is undefined) still resolves an external delivery
|
||||
// target. Without this, every Boolean(channel && to) gate downstream
|
||||
// short-circuits and generated media is silently dropped.
|
||||
const requesterSessionDeliveryFallback = deliveryContextFromSession(requesterEntry);
|
||||
// Merge completionDirectOrigin with directOrigin so that missing fields
|
||||
// (channel, to, accountId) fall back to the originating session's
|
||||
// lastChannel / lastTo. Without this, a completion origin that carries a
|
||||
// channel but not a `to` would prevent external delivery.
|
||||
const externalCompletionDirectOrigin =
|
||||
stripNonDeliverableChannelForCompletionOrigin(completionDirectOrigin);
|
||||
const completionExternalFallbackOrigin = mergeDeliveryContext(
|
||||
directOrigin,
|
||||
mergeDeliveryContext(requesterSessionOrigin, requesterSessionDeliveryFallback),
|
||||
requesterSessionOrigin,
|
||||
);
|
||||
const effectiveDirectOrigin = params.expectsCompletionMessage
|
||||
? mergeDeliveryContext(externalCompletionDirectOrigin, completionExternalFallbackOrigin)
|
||||
@@ -1248,6 +1240,7 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
const sessionOnlyOrigin = effectiveDirectOrigin?.channel
|
||||
? effectiveDirectOrigin
|
||||
: requesterSessionOrigin;
|
||||
const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry;
|
||||
const deliveryTarget = !params.requesterIsSubagent
|
||||
? resolveExternalBestEffortDeliveryTarget({
|
||||
channel: effectiveDirectOrigin?.channel,
|
||||
|
||||
@@ -4,12 +4,20 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const subagentAnnounceDeliveryMocks = vi.hoisted(() => ({
|
||||
deliverSubagentAnnouncement: vi.fn(),
|
||||
loadRequesterSessionEntry: vi.fn(() => ({ entry: undefined })),
|
||||
}));
|
||||
const detachedTaskRuntimeMocks = vi.hoisted(() => ({
|
||||
completeTaskRunByRunId: vi.fn(),
|
||||
createRunningTaskRun: vi.fn(() => ({ taskId: "task-pinned-route" })),
|
||||
failTaskRunByRunId: vi.fn(),
|
||||
recordTaskRunProgressByRunId: vi.fn(),
|
||||
}));
|
||||
const taskRegistryDeliveryRuntimeMocks = vi.hoisted(() => ({
|
||||
sendMessage: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../subagent-announce-delivery.js", () => subagentAnnounceDeliveryMocks);
|
||||
vi.mock("../../tasks/detached-task-runtime.js", () => detachedTaskRuntimeMocks);
|
||||
vi.mock("../../tasks/task-registry-delivery-runtime.js", () => taskRegistryDeliveryRuntimeMocks);
|
||||
|
||||
import {
|
||||
@@ -20,6 +28,9 @@ import {
|
||||
|
||||
beforeEach(() => {
|
||||
subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockReset();
|
||||
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReset();
|
||||
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({ entry: undefined });
|
||||
detachedTaskRuntimeMocks.createRunningTaskRun.mockClear();
|
||||
taskRegistryDeliveryRuntimeMocks.sendMessage.mockReset();
|
||||
});
|
||||
|
||||
@@ -378,6 +389,65 @@ describe("scheduleMediaGenerationTaskCompletion", () => {
|
||||
});
|
||||
|
||||
describe("createMediaGenerationTaskLifecycle", () => {
|
||||
it("pins a missing requester target from session state when the task starts", async () => {
|
||||
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({
|
||||
entry: {
|
||||
lastChannel: "telegram",
|
||||
lastTo: "5866004662",
|
||||
lastAccountId: "bot-1",
|
||||
},
|
||||
});
|
||||
subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValueOnce({
|
||||
delivered: true,
|
||||
});
|
||||
const lifecycle = createMediaGenerationTaskLifecycle({
|
||||
toolName: "image_generate",
|
||||
taskKind: "image_generation",
|
||||
label: "Image generation",
|
||||
queuedProgressSummary: "Queued image generation",
|
||||
generatedLabel: "image",
|
||||
failureProgressSummary: "Image generation failed",
|
||||
eventSource: "image_generation",
|
||||
announceType: "image generation task",
|
||||
completionLabel: "image",
|
||||
});
|
||||
|
||||
const handle = lifecycle.createTaskRun({
|
||||
sessionKey: "agent:main:telegram:5866004662",
|
||||
requesterOrigin: { channel: "telegram" },
|
||||
prompt: "proof image",
|
||||
});
|
||||
expect(handle?.requesterOrigin).toEqual({
|
||||
channel: "telegram",
|
||||
to: "5866004662",
|
||||
accountId: "bot-1",
|
||||
});
|
||||
|
||||
// Later session drift cannot change the route stored on the task handle.
|
||||
subagentAnnounceDeliveryMocks.loadRequesterSessionEntry.mockReturnValue({
|
||||
entry: {
|
||||
lastChannel: "telegram",
|
||||
lastTo: "other-peer",
|
||||
lastAccountId: "bot-1",
|
||||
},
|
||||
});
|
||||
await lifecycle.wakeTaskCompletion({
|
||||
handle,
|
||||
status: "ok",
|
||||
statusLabel: "completed successfully",
|
||||
result: "generated",
|
||||
});
|
||||
expect(subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
completionDirectOrigin: {
|
||||
channel: "telegram",
|
||||
to: "5866004662",
|
||||
accountId: "bot-1",
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns the completion wake delivery result", async () => {
|
||||
subagentAnnounceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValueOnce({
|
||||
delivered: true,
|
||||
|
||||
@@ -19,7 +19,12 @@ import {
|
||||
resolveRequiredCompletionDeliveryFailureTerminalResult,
|
||||
type RequiredCompletionTerminalResult,
|
||||
} from "../../tasks/task-completion-contract.js";
|
||||
import { normalizeDeliveryContext, type DeliveryContext } from "../../utils/delivery-context.js";
|
||||
import {
|
||||
deliveryContextFromSession,
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
type DeliveryContext,
|
||||
} from "../../utils/delivery-context.js";
|
||||
import {
|
||||
INTERNAL_MESSAGE_CHANNEL,
|
||||
isDeliverableMessageChannel,
|
||||
@@ -29,7 +34,10 @@ import {
|
||||
type AgentGeneratedAttachment,
|
||||
} from "../generated-attachments.js";
|
||||
import { formatAgentInternalEventsForPrompt, type AgentInternalEvent } from "../internal-events.js";
|
||||
import { deliverSubagentAnnouncement } from "../subagent-announce-delivery.js";
|
||||
import {
|
||||
deliverSubagentAnnouncement,
|
||||
loadRequesterSessionEntry,
|
||||
} from "../subagent-announce-delivery.js";
|
||||
import type { SubagentAnnounceDeliveryFailureReason } from "../subagent-announce-dispatch.js";
|
||||
|
||||
const log = createSubsystemLogger("agents/tools/media-generate-background-shared");
|
||||
@@ -141,6 +149,12 @@ function createMediaGenerationTaskRun(params: {
|
||||
}
|
||||
const runId = `tool:${params.toolName}:${crypto.randomUUID()}`;
|
||||
try {
|
||||
// Pin the complete requester route when detached work starts. Completion-time
|
||||
// session state can move to another peer while generation is still running.
|
||||
const requesterOrigin = mergeDeliveryContext(
|
||||
normalizeDeliveryContext(params.requesterOrigin),
|
||||
deliveryContextFromSession(loadRequesterSessionEntry(sessionKey).entry),
|
||||
);
|
||||
const task = createRunningTaskRun({
|
||||
runtime: "cli",
|
||||
taskKind: params.taskKind,
|
||||
@@ -148,7 +162,7 @@ function createMediaGenerationTaskRun(params: {
|
||||
requesterSessionKey: sessionKey,
|
||||
ownerKey: sessionKey,
|
||||
scopeKind: "session",
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
requesterOrigin,
|
||||
childSessionKey: sessionKey,
|
||||
runId,
|
||||
label: params.label,
|
||||
@@ -166,7 +180,7 @@ function createMediaGenerationTaskRun(params: {
|
||||
taskId: task.taskId,
|
||||
runId,
|
||||
requesterSessionKey: sessionKey,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
requesterOrigin,
|
||||
taskLabel: params.prompt,
|
||||
};
|
||||
touchMediaGenerationTaskRunContext(handle);
|
||||
|
||||
@@ -243,20 +243,17 @@ export function mergeDeliveryContext(
|
||||
normalizedPrimary?.channel &&
|
||||
normalizedFallback?.channel &&
|
||||
normalizedPrimary.channel !== normalizedFallback.channel;
|
||||
const accountsConflict =
|
||||
normalizedPrimary?.accountId &&
|
||||
normalizedFallback?.accountId &&
|
||||
normalizedPrimary.accountId !== normalizedFallback.accountId;
|
||||
const routesConflict = channelsConflict || accountsConflict;
|
||||
return normalizeDeliveryContext({
|
||||
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
|
||||
// Keep route fields paired to their channel; avoid crossing fields between
|
||||
// unrelated channels during session context merges.
|
||||
to: routesConflict ? normalizedPrimary?.to : (normalizedPrimary?.to ?? normalizedFallback?.to),
|
||||
accountId: routesConflict
|
||||
to: channelsConflict
|
||||
? normalizedPrimary?.to
|
||||
: (normalizedPrimary?.to ?? normalizedFallback?.to),
|
||||
accountId: channelsConflict
|
||||
? normalizedPrimary?.accountId
|
||||
: (normalizedPrimary?.accountId ?? normalizedFallback?.accountId),
|
||||
threadId: routesConflict
|
||||
threadId: channelsConflict
|
||||
? normalizedPrimary?.threadId
|
||||
: (normalizedPrimary?.threadId ?? normalizedFallback?.threadId),
|
||||
});
|
||||
|
||||
@@ -53,20 +53,6 @@ describe("delivery context helpers", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("does not inherit route fields from a different account on the same channel", () => {
|
||||
const merged = mergeDeliveryContext(
|
||||
{ channel: "telegram", accountId: "bot-a" },
|
||||
{ channel: "telegram", to: "123", accountId: "bot-b", threadId: "99" },
|
||||
);
|
||||
|
||||
expect(merged).toEqual({
|
||||
channel: "telegram",
|
||||
to: undefined,
|
||||
accountId: "bot-a",
|
||||
});
|
||||
expect(merged?.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("uses fallback route fields when fallback has no channel", () => {
|
||||
const merged = mergeDeliveryContext(
|
||||
{ channel: "demo-channel" },
|
||||
|
||||
Reference in New Issue
Block a user