fix(cron): direct-deliver thread and topic announce targets

Co-authored-by: Andrei Aratmonov <247877121+AndrewArto@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-02-22 21:41:36 +01:00
parent 320cf8eb3e
commit ffb12397a8
4 changed files with 120 additions and 9 deletions

View File

@@ -0,0 +1,101 @@
import "./isolated-agent.mocks.js";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import type { CliDeps } from "../cli/deps.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
import {
makeCfg,
makeJob,
withTempCronHome,
writeSessionStore,
} from "./isolated-agent.test-harness.js";
import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
function createCliDeps(overrides: Partial<CliDeps> = {}): CliDeps {
return {
sendMessageSlack: vi.fn(),
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
...overrides,
};
}
function mockAgentPayloads(payloads: Array<Record<string, unknown>>) {
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads,
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
}
describe("runCronIsolatedAgentTurn forum topic delivery", () => {
beforeEach(() => {
setupIsolatedAgentTurnMocks();
});
it("uses direct delivery for text-only forum topic targets", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads([{ text: "forum message" }]);
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123:topic:42" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"forum message",
expect.objectContaining({
messageThreadId: 42,
}),
);
});
});
it("keeps text-only non-threaded targets on announce flow", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads([{ text: "plain message" }]);
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
});
});
});

View File

@@ -184,7 +184,7 @@ describe("runCronIsolatedAgentTurn", () => {
});
});
it("passes resolved threadId into shared subagent announce flow", async () => {
it("routes threaded announce targets through direct delivery", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
await fs.writeFile(
@@ -214,13 +214,16 @@ describe("runCronIsolatedAgentTurn", () => {
});
expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| { requesterOrigin?: { threadId?: string | number; channel?: string; to?: string } }
| undefined;
expect(announceArgs?.requesterOrigin?.channel).toBe("telegram");
expect(announceArgs?.requesterOrigin?.to).toBe("123");
expect(announceArgs?.requesterOrigin?.threadId).toBe(42);
expect(res.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"Final weather summary",
expect.objectContaining({
messageThreadId: 42,
}),
);
});
});

View File

@@ -657,7 +657,13 @@ export async function runCronIsolatedAgentTurn(params: {
// follows the same system-message injection path as subagent completions.
// Keep direct outbound delivery only for structured payloads (media/channel
// data), which cannot be represented by the shared announce flow.
if (deliveryPayloadHasStructuredContent) {
//
// Forum/topic targets should also use direct delivery. Announce flow can
// be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which
// silently drops cron output for topic-bound sessions.
const useDirectDelivery =
deliveryPayloadHasStructuredContent || resolvedDelivery.threadId != null;
if (useDirectDelivery) {
try {
const payloadsForDelivery =
deliveryPayloads.length > 0