fix(cron): keep message tool for chat delivery

This commit is contained in:
Ayaan Zaidi
2026-04-21 09:38:54 +05:30
parent 4a846dd129
commit 4c1f187da0
12 changed files with 126 additions and 72 deletions

View File

@@ -79,11 +79,11 @@ import {
resolveBootstrapTotalMaxChars,
} from "../../pi-embedded-helpers.js";
import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js";
import { createPreparedEmbeddedPiSettingsManager } from "../../pi-project-settings.js";
import {
applyPiAutoCompactionGuard,
applyPiCompactionSettingsFromConfig,
} from "../../pi-settings.js";
import { createPreparedEmbeddedPiSettingsManager } from "../../pi-project-settings.js";
import {
createClientToolNameConflictError,
findClientToolNameConflicts,
@@ -527,6 +527,7 @@ export async function runEmbeddedAttempt(
requireExplicitMessageTarget:
params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey),
disableMessageTool: params.disableMessageTool,
forceMessageTool: params.forceMessageTool,
onYield: (message) => {
yieldDetected = true;
yieldMessage = message;
@@ -537,6 +538,9 @@ export async function runEmbeddedAttempt(
});
if (params.toolsAllow && params.toolsAllow.length > 0) {
const allowSet = new Set(params.toolsAllow);
if (params.forceMessageTool) {
allowSet.add("message");
}
return allTools.filter((tool) => allowSet.has(tool.name));
}
return allTools;

View File

@@ -66,6 +66,8 @@ export type RunEmbeddedPiAgentParams = {
requireExplicitMessageTarget?: boolean;
/** If true, omit the message tool from the tool list. */
disableMessageTool?: boolean;
/** Keep the message tool available even when a narrow profile would omit it. */
forceMessageTool?: boolean;
/** Allow runtime plugins for this run to late-bind the gateway subagent. */
allowGatewaySubagentBinding?: boolean;
sessionFile: string;

View File

@@ -266,6 +266,19 @@ describe("createOpenClawCodingTools", () => {
expect(names.has("browser")).toBe(false);
});
it("can keep message available when a cron route needs it under the coding profile", () => {
const codingTools = createOpenClawCodingTools({
config: { tools: { profile: "coding" } },
});
expect(codingTools.some((tool) => tool.name === "message")).toBe(false);
const cronTools = createOpenClawCodingTools({
config: { tools: { profile: "coding" } },
forceMessageTool: true,
});
expect(cronTools.some((tool) => tool.name === "message")).toBe(true);
});
it("expands group shorthands in global tool policy", () => {
const tools = createOpenClawCodingTools({
config: { tools: { allow: ["group:fs"] } },

View File

@@ -330,6 +330,8 @@ export function createOpenClawCodingTools(options?: {
requireExplicitMessageTarget?: boolean;
/** If true, omit the message tool from the tool list. */
disableMessageTool?: boolean;
/** Keep the message tool available even when the selected profile omits it. */
forceMessageTool?: boolean;
/** Whether the sender is an owner (required for owner-only tools). */
senderIsOwner?: boolean;
/** Callback invoked when sessions_yield tool is called. */
@@ -380,7 +382,11 @@ export function createOpenClawCodingTools(options?: {
const profilePolicy = resolveToolProfilePolicy(profile);
const providerProfilePolicy = resolveToolProfilePolicy(providerProfile);
const profilePolicyWithAlsoAllow = mergeAlsoAllowPolicy(profilePolicy, profileAlsoAllow);
const runtimeProfileAlsoAllow = options?.forceMessageTool ? ["message"] : [];
const profilePolicyWithAlsoAllow = mergeAlsoAllowPolicy(profilePolicy, [
...(profileAlsoAllow ?? []),
...runtimeProfileAlsoAllow,
]);
const providerProfilePolicyWithAlsoAllow = mergeAlsoAllowPolicy(
providerProfilePolicy,
providerProfileAlsoAllow,

View File

@@ -56,7 +56,6 @@ export async function runTelegramAnnounceTurn(params: {
to?: string;
bestEffort?: boolean;
};
deliveryContract?: "cron-owned" | "shared";
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runCronIsolatedAgentTurn({
cfg: makeCfg(params.home, params.storePath, {
@@ -70,6 +69,5 @@ export async function runTelegramAnnounceTurn(params: {
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
deliveryContract: params.deliveryContract,
});
}

View File

@@ -87,6 +87,7 @@ export function createCronPromptExecutor(params: {
toolPolicy: {
requireExplicitMessageTarget: boolean;
disableMessageTool: boolean;
forceMessageTool: boolean;
};
skillsSnapshot: SkillSnapshot;
agentPayload: AgentTurnPayload;
@@ -204,6 +205,7 @@ export function createCronPromptExecutor(params: {
runId: params.cronSession.sessionEntry.sessionId,
requireExplicitMessageTarget: params.toolPolicy.requireExplicitMessageTarget,
disableMessageTool: params.toolPolicy.disableMessageTool,
forceMessageTool: params.toolPolicy.forceMessageTool,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
abortSignal: params.abortSignal,
bootstrapPromptWarningSignaturesSeen,
@@ -253,6 +255,7 @@ export async function executeCronRun(params: {
toolPolicy: {
requireExplicitMessageTarget: boolean;
disableMessageTool: boolean;
forceMessageTool: boolean;
};
skillsSnapshot: SkillSnapshot;
agentPayload: AgentTurnPayload;

View File

@@ -54,6 +54,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
await runCronIsolatedAgentTurn(makeParams());
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.forceMessageTool).toBe(false);
}
async function expectMessageToolEnabledForPlan(plan: {
@@ -67,6 +68,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
await runCronIsolatedAgentTurn(makeParams());
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.forceMessageTool).toBe(true);
}
async function runModeNoneDeliveryCase(params: {
@@ -86,15 +88,14 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
job: makeMessageToolPolicyJob(params.delivery),
});
expect(resolveDeliveryTargetMock).not.toHaveBeenCalled();
expect(resolveDeliveryTargetMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({
disableMessageTool: false,
messageChannel: undefined,
messageTo: undefined,
messageThreadId: undefined,
currentChannelId: undefined,
agentAccountId: undefined,
forceMessageTool: true,
messageChannel: "telegram",
messageTo: "123",
currentChannelId: "123",
});
}
@@ -137,6 +138,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
toolPolicy: {
requireExplicitMessageTarget: false,
disableMessageTool: false,
forceMessageTool: true,
},
skillsSnapshot: emptySkillsSnapshot,
agentPayload: null,
@@ -202,7 +204,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
});
});
it("does not resolve implicit last-target context for bare delivery.mode none", async () => {
it("resolves implicit last-target context for bare delivery.mode none", async () => {
mockRunCronFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
@@ -222,25 +224,29 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
} as never,
});
expect(resolveDeliveryTargetMock).not.toHaveBeenCalled();
expect(resolveDeliveryTargetMock).toHaveBeenCalledTimes(1);
expect(resolveDeliveryTargetMock.mock.calls[0]?.[2]).toMatchObject({
channel: "last",
sessionKey: undefined,
});
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({
disableMessageTool: false,
messageChannel: undefined,
messageTo: undefined,
messageThreadId: undefined,
currentChannelId: undefined,
forceMessageTool: true,
messageChannel: "telegram",
messageTo: "123",
currentChannelId: "123",
});
});
it("does not resolve implicit last-target context for delivery.mode none with only accountId", async () => {
it("resolves implicit last-target context for delivery.mode none with only accountId", async () => {
await runModeNoneDeliveryCase({
delivery: { mode: "none", accountId: "ops" },
plan: { accountId: "ops" },
});
});
it("does not resolve implicit last-target context for delivery.mode none with only threadId", async () => {
it("resolves implicit last-target context for delivery.mode none with only threadId", async () => {
await runModeNoneDeliveryCase({
delivery: { mode: "none", threadId: 42 },
plan: { threadId: 42 },
@@ -291,8 +297,8 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
});
});
it("disables the message tool when cron delivery is active", async () => {
await expectMessageToolDisabledForPlan({
it("keeps the message tool enabled when announce delivery is active", async () => {
await expectMessageToolEnabledForPlan({
requested: true,
mode: "announce",
channel: "telegram",
@@ -308,17 +314,14 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
});
});
it("keeps the message tool enabled for shared callers when delivery is not requested", async () => {
it("keeps the message tool enabled when delivery is not requested", async () => {
mockRunCronFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
mode: "none",
});
await runCronIsolatedAgentTurn({
...makeParams(),
deliveryContract: "shared",
});
await runCronIsolatedAgentTurn(makeParams());
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
@@ -355,7 +358,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
);
});
it("skips cron delivery when a shared caller already sent to the same target", async () => {
it("skips cron fallback delivery when the message tool already sent to the same target", async () => {
mockRunCronFallbackPassthrough();
const params = makeParams();
const job = {
@@ -381,7 +384,6 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
await runCronIsolatedAgentTurn({
...params,
deliveryContract: "shared",
job: job as never,
});
@@ -393,6 +395,33 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
}),
);
});
it("marks no-deliver runs delivered when the message tool sends to the current target", async () => {
mockRunCronFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
mode: "none",
channel: "last",
});
runEmbeddedPiAgentMock.mockResolvedValue({
payloads: [{ text: "sent" }],
didSendViaMessagingTool: true,
messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }],
meta: { agentMeta: { usage: { input: 10, output: 20 } } },
});
const result = await runCronIsolatedAgentTurn(makeParams());
expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1);
expect(dispatchCronDeliveryMock.mock.calls[0]?.[0]).toEqual(
expect.objectContaining({
deliveryRequested: false,
skipMessagingToolDelivery: true,
}),
);
expect(result.delivered).toBe(true);
expect(result.deliveryAttempted).toBe(true);
});
});
describe("runCronIsolatedAgentTurn delivery instruction", () => {
@@ -414,7 +443,7 @@ describe("runCronIsolatedAgentTurn delivery instruction", () => {
restoreFastTestEnv(previousFastTestEnv);
});
it("appends a plain-text delivery instruction to the prompt when delivery is requested", async () => {
it("appends shared delivery guidance to the prompt when announce delivery is requested", async () => {
mockRunCronFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: true,
@@ -427,8 +456,9 @@ describe("runCronIsolatedAgentTurn delivery instruction", () => {
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const prompt: string = runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.prompt ?? "";
expect(prompt).toContain("Return your response as plain text");
expect(prompt).toContain("it will be delivered automatically");
expect(prompt).toContain("Use the message tool");
expect(prompt).toContain("will be delivered automatically");
expect(prompt).not.toContain("note who/where");
});
it("does not append a delivery instruction when delivery is not requested", async () => {

View File

@@ -388,9 +388,13 @@ function resetRunOutcomeMocks(): void {
skipMessagingToolDelivery,
}) => ({
result: undefined,
delivered: Boolean(deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery),
delivered: Boolean(
skipMessagingToolDelivery ||
(deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery),
),
deliveryAttempted: Boolean(
deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery,
skipMessagingToolDelivery ||
(deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery),
),
summary,
outputText,

View File

@@ -120,19 +120,12 @@ type CronModelCatalogRuntime = typeof import("./run-model-catalog.runtime.js");
type CronDeliveryRuntime = typeof import("./run-delivery.runtime.js");
type ResolvedCronDeliveryTarget = Awaited<ReturnType<CronDeliveryRuntime["resolveDeliveryTarget"]>>;
type IsolatedDeliveryContract = "cron-owned" | "shared";
function resolveCronToolPolicy(params: {
deliveryRequested: boolean;
resolvedDelivery: ResolvedCronDeliveryTarget;
deliveryMode: "announce" | "webhook" | "none";
}) {
function resolveCronToolPolicy(params: { deliveryMode: "announce" | "webhook" | "none" }) {
const enableMessageTool = params.deliveryMode !== "webhook";
return {
// Only enforce an explicit message target when the cron delivery target
// was successfully resolved. When resolution fails the agent should not
// be blocked by a target it cannot satisfy (#27898).
requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok,
disableMessageTool: params.deliveryMode !== "none",
requireExplicitMessageTarget: false,
disableMessageTool: !enableMessageTool,
forceMessageTool: enableMessageTool,
};
}
@@ -140,11 +133,9 @@ async function resolveCronDeliveryContext(params: {
cfg: OpenClawConfig;
job: CronJob;
agentId: string;
deliveryContract: IsolatedDeliveryContract;
}) {
const deliveryPlan = resolveCronDeliveryPlan(params.job);
const hasMessageTargetContext = deliveryPlan.mode !== "webhook" && deliveryPlan.to !== undefined;
if (!deliveryPlan.requested && !hasMessageTargetContext) {
if (deliveryPlan.mode === "webhook") {
const resolvedDelivery = {
ok: false as const,
channel: undefined,
@@ -152,15 +143,13 @@ async function resolveCronDeliveryContext(params: {
accountId: undefined,
threadId: undefined,
mode: "implicit" as const,
error: new Error("cron delivery not requested"),
error: new Error("webhook delivery has no chat target"),
};
return {
deliveryPlan,
deliveryRequested: false,
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
toolPolicy: resolveCronToolPolicy({
deliveryRequested: false,
resolvedDelivery,
deliveryMode: deliveryPlan.mode,
}),
};
@@ -178,8 +167,6 @@ async function resolveCronDeliveryContext(params: {
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
toolPolicy: resolveCronToolPolicy({
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
deliveryMode: deliveryPlan.mode,
}),
};
@@ -188,10 +175,18 @@ async function resolveCronDeliveryContext(params: {
function appendCronDeliveryInstruction(params: {
commandBody: string;
deliveryRequested: boolean;
messageToolEnabled: boolean;
resolvedDeliveryOk: boolean;
}) {
if (!params.deliveryRequested) {
return params.commandBody;
}
if (params.messageToolEnabled) {
const targetHint = params.resolvedDeliveryOk
? "for the current chat"
: "with an explicit target";
return `${params.commandBody}\n\nUse the message tool if you need to notify the user directly ${targetHint}. If you do not send directly, your final plain-text reply will be delivered automatically.`.trim();
}
return `${params.commandBody}\n\nReturn your response as plain text; it will be delivered automatically. If the task explicitly calls for messaging a specific external recipient, note who/where it should go instead of sending it yourself.`.trim();
}
@@ -217,7 +212,6 @@ type RunCronAgentTurnParams = {
sessionKey: string;
agentId?: string;
lane?: string;
deliveryContract?: IsolatedDeliveryContract;
};
type WithRunSession = (
@@ -413,7 +407,6 @@ async function prepareCronRunContext(params: {
cfg: cfgWithAgentDefaults,
job: input.job,
agentId,
deliveryContract: input.deliveryContract ?? "cron-owned",
});
const { formattedTime, timeLine } = resolveCronStyleNow(input.cfg, now);
@@ -451,7 +444,12 @@ async function prepareCronRunContext(params: {
} else {
commandBody = `${base}\n${timeLine}`.trim();
}
commandBody = appendCronDeliveryInstruction({ commandBody, deliveryRequested });
commandBody = appendCronDeliveryInstruction({
commandBody,
deliveryRequested,
messageToolEnabled: !toolPolicy.disableMessageTool,
resolvedDeliveryOk: resolvedDelivery.ok,
});
const skillsSnapshot = await resolveCronSkillsSnapshot({
workspaceDir,
@@ -665,16 +663,16 @@ async function finalizeCronRun(params: {
resolveCronDeliveryBestEffort,
} = await loadCronDeliveryRuntime();
const skipMessagingToolDelivery =
(prepared.input.deliveryContract ?? "cron-owned") === "shared" &&
prepared.deliveryRequested &&
finalRunResult.didSendViaMessagingTool === true &&
(finalRunResult.messagingToolSentTargets ?? []).some((target) =>
matchesMessagingToolDeliveryTarget(target, {
channel: prepared.resolvedDelivery.channel,
to: prepared.resolvedDelivery.to,
accountId: prepared.resolvedDelivery.accountId,
}),
);
(prepared.resolvedDelivery.ok
? (finalRunResult.messagingToolSentTargets ?? []).some((target) =>
matchesMessagingToolDeliveryTarget(target, {
channel: prepared.resolvedDelivery.channel,
to: prepared.resolvedDelivery.to,
accountId: prepared.resolvedDelivery.accountId,
}),
)
: (finalRunResult.messagingToolSentTargets ?? []).length > 0);
const deliveryResult = await dispatchCronDelivery({
cfg: prepared.input.cfg,
cfgWithAgentDefaults: prepared.cfgWithAgentDefaults,
@@ -733,7 +731,6 @@ export async function runCronIsolatedAgentTurn(params: {
sessionKey: string;
agentId?: string;
lane?: string;
deliveryContract?: IsolatedDeliveryContract;
}): Promise<RunCronAgentTurnResult> {
const abortSignal = params.abortSignal ?? params.signal;
const isAborted = () => abortSignal?.aborted === true;

View File

@@ -5,9 +5,9 @@ export type RunCronAgentTurnResult = {
outputText?: string;
/**
* `true` when the isolated runner already handled the run's user-visible
* delivery outcome. Cron-owned callers use this for cron delivery or
* explicit suppression; shared callers may also use it for a matching
* message-tool send that already reached the target.
* delivery outcome, either through runner fallback delivery, explicit
* suppression, or a matching message-tool send that already reached the
* target.
*/
delivered?: boolean;
/**

View File

@@ -155,10 +155,8 @@ describe("gateway server hooks", () => {
const agentEvents = await waitForSystemEvent();
expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true);
const firstCall = (cronIsolatedRun.mock.calls[0] as unknown[] | undefined)?.[0] as {
deliveryContract?: string;
job?: { payload?: { externalContentSource?: string } };
};
expect(firstCall?.deliveryContract).toBe("shared");
expect(firstCall?.job?.payload?.externalContentSource).toBe("webhook");
drainSystemEvents(resolveMainKey());

View File

@@ -87,7 +87,6 @@ export function createGatewayHooksRequestHandler(params: {
message: value.message,
sessionKey,
lane: "cron",
deliveryContract: "shared",
});
const summary =
normalizeOptionalString(result.summary) ||