refactor: harden reset notice + cron delivery target flow

This commit is contained in:
Peter Steinberger
2026-02-23 19:00:30 +00:00
parent d266d12be1
commit bf373eeb43
6 changed files with 366 additions and 210 deletions

View File

@@ -382,8 +382,9 @@ describe("trigger handling", () => {
);
const text = Array.isArray(res) ? res[0]?.text : res?.text;
expect(text).toContain("api-key");
expect(text).toContain("****");
expect(text).toContain("sk-t");
expect(text).toMatch(/\u2026|\.{3}/);
expect(text).toContain("sk-tes");
expect(text).toContain("abcdef");
expect(text).not.toContain("1234567890abcdef");
expect(text).toContain("(anthropic:work)");
expect(text).not.toContain("mixed");

View File

@@ -51,6 +51,76 @@ import { appendUntrustedContext } from "./untrusted-context.js";
type AgentDefaults = NonNullable<OpenClawConfig["agents"]>["defaults"];
type ExecOverrides = Pick<ExecToolDefaults, "host" | "security" | "ask" | "node">;
function buildResetSessionNoticeText(params: {
provider: string;
model: string;
defaultProvider: string;
defaultModel: string;
}): string {
const modelLabel = `${params.provider}/${params.model}`;
const defaultLabel = `${params.defaultProvider}/${params.defaultModel}`;
return modelLabel === defaultLabel
? `✅ New session started · model: ${modelLabel}`
: `✅ New session started · model: ${modelLabel} (default: ${defaultLabel})`;
}
function resolveResetSessionNoticeRoute(params: {
ctx: MsgContext;
command: ReturnType<typeof buildCommandContext>;
}): {
channel: Parameters<typeof routeReply>[0]["channel"];
to: string;
} | null {
const commandChannel = params.command.channel?.trim().toLowerCase();
const fallbackChannel =
commandChannel && commandChannel !== "webchat"
? (commandChannel as Parameters<typeof routeReply>[0]["channel"])
: undefined;
const channel = params.ctx.OriginatingChannel ?? fallbackChannel;
const to = params.ctx.OriginatingTo ?? params.command.from ?? params.command.to;
if (!channel || channel === "webchat" || !to) {
return null;
}
return { channel, to };
}
async function sendResetSessionNotice(params: {
ctx: MsgContext;
command: ReturnType<typeof buildCommandContext>;
sessionKey: string;
cfg: OpenClawConfig;
accountId: string | undefined;
threadId: string | number | undefined;
provider: string;
model: string;
defaultProvider: string;
defaultModel: string;
}): Promise<void> {
const route = resolveResetSessionNoticeRoute({
ctx: params.ctx,
command: params.command,
});
if (!route) {
return;
}
await routeReply({
payload: {
text: buildResetSessionNoticeText({
provider: params.provider,
model: params.model,
defaultProvider: params.defaultProvider,
defaultModel: params.defaultModel,
}),
},
channel: route.channel,
to: route.to,
sessionKey: params.sessionKey,
accountId: params.accountId,
threadId: params.threadId,
cfg: params.cfg,
});
}
type RunPreparedReplyParams = {
ctx: MsgContext;
sessionCtx: TemplateContext;
@@ -318,26 +388,18 @@ export async function runPreparedReply(
}
}
if (resetTriggered && command.isAuthorizedSender) {
// oxlint-disable-next-line typescript/no-explicit-any
const channel = ctx.OriginatingChannel || (command.channel as any);
const to = ctx.OriginatingTo || command.from || command.to;
if (channel && to) {
const modelLabel = `${provider}/${model}`;
const defaultLabel = `${defaultProvider}/${defaultModel}`;
const text =
modelLabel === defaultLabel
? `✅ New session started · model: ${modelLabel}`
: `✅ New session started · model: ${modelLabel} (default: ${defaultLabel})`;
await routeReply({
payload: { text },
channel,
to,
sessionKey,
accountId: ctx.AccountId,
threadId: ctx.MessageThreadId,
cfg,
});
}
await sendResetSessionNotice({
ctx,
command,
sessionKey,
cfg,
accountId: ctx.AccountId,
threadId: ctx.MessageThreadId,
provider,
model,
defaultProvider,
defaultModel,
});
}
const sessionIdFinal = sessionId ?? crypto.randomUUID();
const sessionFile = resolveSessionFilePath(

View File

@@ -74,6 +74,48 @@ describe("runCronIsolatedAgentTurn", () => {
setupIsolatedAgentTurnMocks({ fast: true });
});
it("does not fan out telegram cron delivery across allowFrom entries", async () => {
await withTempCronHome(async (home) => {
const { storePath, deps } = await createTelegramDeliveryFixture(home);
mockEmbeddedAgentPayloads([
{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" },
]);
const cfg = makeCfg(home, storePath, {
channels: {
telegram: {
botToken: "tok",
allowFrom: ["111", "222", "333"],
},
},
});
const res = await runCronIsolatedAgentTurn({
cfg,
deps,
job: {
...makeJob({
kind: "agentTurn",
message: "deliver once",
}),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "deliver once",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"HEARTBEAT_OK",
expect.objectContaining({ accountId: undefined }),
);
});
});
it("handles media heartbeat delivery and announce cleanup modes", async () => {
await withTempCronHome(async (home) => {
const { storePath, deps } = await createTelegramDeliveryFixture(home);

View File

@@ -230,7 +230,11 @@ describe("resolveDeliveryTarget", () => {
target: { channel: "last", to: undefined },
});
expect(result.channel).toBe("telegram");
expect(result.error).toBeUndefined();
expect(result.ok).toBe(false);
if (result.ok) {
throw new Error("expected unresolved delivery target");
}
expect(result.error.message).toContain('No delivery target resolved for channel "telegram"');
});
it("returns an error when channel selection is ambiguous", async () => {
@@ -245,7 +249,11 @@ describe("resolveDeliveryTarget", () => {
});
expect(result.channel).toBeUndefined();
expect(result.to).toBeUndefined();
expect(result.error?.message).toContain("Channel is required");
expect(result.ok).toBe(false);
if (result.ok) {
throw new Error("expected ambiguous channel selection error");
}
expect(result.error.message).toContain("Channel is required");
});
it("uses sessionKey thread entry before main session entry", async () => {
@@ -289,6 +297,6 @@ describe("resolveDeliveryTarget", () => {
expect(result.channel).toBe("telegram");
expect(result.to).toBe("987654");
expect(result.error).toBeUndefined();
expect(result.ok).toBe(true);
});
});

View File

@@ -17,6 +17,25 @@ import { normalizeAgentId } from "../../routing/session-key.js";
import { resolveWhatsAppAccount } from "../../web/accounts.js";
import { normalizeWhatsAppTarget } from "../../whatsapp/normalize.js";
export type DeliveryTargetResolution =
| {
ok: true;
channel: Exclude<OutboundChannel, "none">;
to: string;
accountId?: string;
threadId?: string | number;
mode: "explicit" | "implicit";
}
| {
ok: false;
channel?: Exclude<OutboundChannel, "none">;
to?: string;
accountId?: string;
threadId?: string | number;
mode: "explicit" | "implicit";
error: Error;
};
export async function resolveDeliveryTarget(
cfg: OpenClawConfig,
agentId: string,
@@ -25,14 +44,7 @@ export async function resolveDeliveryTarget(
to?: string;
sessionKey?: string;
},
): Promise<{
channel?: Exclude<OutboundChannel, "none">;
to?: string;
accountId?: string;
threadId?: string | number;
mode: "explicit" | "implicit";
error?: Error;
}> {
): Promise<DeliveryTargetResolution> {
const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last";
const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined;
const allowMismatchedLastTo = requestedChannel === "last";
@@ -114,23 +126,29 @@ export async function resolveDeliveryTarget(
if (!channel) {
return {
ok: false,
channel: undefined,
to: undefined,
accountId,
threadId,
mode,
error: channelResolutionError,
error:
channelResolutionError ??
new Error("Channel is required when delivery.channel=last has no previous channel."),
};
}
if (!toCandidate) {
return {
ok: false,
channel,
to: undefined,
accountId,
threadId,
mode,
error: channelResolutionError,
error:
channelResolutionError ??
new Error(`No delivery target resolved for channel "${channel}". Set delivery.to.`),
};
}
@@ -163,12 +181,23 @@ export async function resolveDeliveryTarget(
mode,
allowFrom: allowFromOverride,
});
if (!docked.ok) {
return {
ok: false,
channel,
to: undefined,
accountId,
threadId,
mode,
error: docked.error,
};
}
return {
ok: true,
channel,
to: docked.ok ? docked.to : undefined,
to: docked.to,
accountId,
threadId,
mode,
error: docked.ok ? channelResolutionError : docked.error,
};
}

View File

@@ -635,6 +635,10 @@ export async function runCronIsolatedAgentTurn(params: {
// `true` means we confirmed at least one outbound send reached the target.
// Keep this strict so timer fallback can safely decide whether to wake main.
let delivered = skipMessagingToolDelivery;
type SuccessfulDeliveryTarget = Extract<
Awaited<ReturnType<typeof resolveDeliveryTarget>>,
{ ok: true }
>;
const failDeliveryTarget = (error: string) =>
withRunSession({
status: "error",
@@ -644,28 +648,189 @@ export async function runCronIsolatedAgentTurn(params: {
outputText,
...telemetry,
});
const deliverViaDirect = async (
delivery: SuccessfulDeliveryTarget,
): Promise<RunCronAgentTurnResult | null> => {
const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId);
try {
const payloadsForDelivery =
deliveryPayloads.length > 0
? deliveryPayloads
: synthesizedText
? [{ text: synthesizedText }]
: [];
if (payloadsForDelivery.length === 0) {
return null;
}
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
}
const deliveryResults = await deliverOutboundPayloads({
cfg: cfgWithAgentDefaults,
channel: delivery.channel,
to: delivery.to,
accountId: delivery.accountId,
threadId: delivery.threadId,
payloads: payloadsForDelivery,
agentId,
identity,
bestEffort: deliveryBestEffort,
deps: createOutboundSendDeps(params.deps),
abortSignal,
});
delivered = deliveryResults.length > 0;
return null;
} catch (err) {
if (!deliveryBestEffort) {
return withRunSession({
status: "error",
summary,
outputText,
error: String(err),
...telemetry,
});
}
return null;
}
};
const deliverViaAnnounce = async (
delivery: SuccessfulDeliveryTarget,
): Promise<RunCronAgentTurnResult | null> => {
if (!synthesizedText) {
return null;
}
const announceMainSessionKey = resolveAgentMainSessionKey({
cfg: params.cfg,
agentId,
});
const announceSessionKey = await resolveCronAnnounceSessionKey({
cfg: cfgWithAgentDefaults,
agentId,
fallbackSessionKey: announceMainSessionKey,
delivery: {
channel: delivery.channel,
to: delivery.to,
accountId: delivery.accountId,
threadId: delivery.threadId,
},
});
const taskLabel =
typeof params.job.name === "string" && params.job.name.trim()
? params.job.name.trim()
: `cron:${params.job.id}`;
const initialSynthesizedText = synthesizedText.trim();
let activeSubagentRuns = countActiveDescendantRuns(agentSessionKey);
const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText);
const hadActiveDescendants = activeSubagentRuns > 0;
if (activeSubagentRuns > 0 || expectedSubagentFollowup) {
let finalReply = await waitForDescendantSubagentSummary({
sessionKey: agentSessionKey,
initialReply: initialSynthesizedText,
timeoutMs,
observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup,
});
activeSubagentRuns = countActiveDescendantRuns(agentSessionKey);
if (
!finalReply &&
activeSubagentRuns === 0 &&
(hadActiveDescendants || expectedSubagentFollowup)
) {
finalReply = await readDescendantSubagentFallbackReply({
sessionKey: agentSessionKey,
runStartedAt,
});
}
if (finalReply && activeSubagentRuns === 0) {
outputText = finalReply;
summary = pickSummaryFromOutput(finalReply) ?? summary;
synthesizedText = finalReply;
deliveryPayloads = [{ text: finalReply }];
}
}
if (activeSubagentRuns > 0) {
// Parent orchestration is still in progress; avoid announcing a partial
// update to the main requester.
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
}
if (
(hadActiveDescendants || expectedSubagentFollowup) &&
synthesizedText.trim() === initialSynthesizedText &&
isLikelyInterimCronMessage(initialSynthesizedText) &&
initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase()
) {
// Descendants existed but no post-orchestration synthesis arrived, so
// suppress stale parent text like "on it, pulling everything together".
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
}
if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry });
}
try {
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
}
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: agentSessionKey,
childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`,
requesterSessionKey: announceSessionKey,
requesterOrigin: {
channel: delivery.channel,
to: delivery.to,
accountId: delivery.accountId,
threadId: delivery.threadId,
},
requesterDisplayKey: announceSessionKey,
task: taskLabel,
timeoutMs,
cleanup: params.job.deleteAfterRun ? "delete" : "keep",
roundOneReply: synthesizedText,
// Keep delivery outcome truthful for cron state: if outbound send fails,
// announce flow must report false so caller can apply best-effort policy.
bestEffortDeliver: false,
waitForCompletion: false,
startedAt: runStartedAt,
endedAt: runEndedAt,
outcome: { status: "ok" },
announceType: "cron job",
signal: abortSignal,
});
if (didAnnounce) {
delivered = true;
} else {
const message = "cron announce delivery failed";
if (!deliveryBestEffort) {
return withRunSession({
status: "error",
summary,
outputText,
error: message,
...telemetry,
});
}
logWarn(`[cron:${params.job.id}] ${message}`);
}
} catch (err) {
if (!deliveryBestEffort) {
return withRunSession({
status: "error",
summary,
outputText,
error: String(err),
...telemetry,
});
}
logWarn(`[cron:${params.job.id}] ${String(err)}`);
}
return null;
};
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
if (resolvedDelivery.error) {
if (!resolvedDelivery.ok) {
if (!deliveryBestEffort) {
return failDeliveryTarget(resolvedDelivery.error.message);
}
logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`);
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
}
const failOrWarnMissingDeliveryField = (message: string) => {
if (!deliveryBestEffort) {
return failDeliveryTarget(message);
}
logWarn(`[cron:${params.job.id}] ${message}`);
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
};
if (!resolvedDelivery.channel) {
return failOrWarnMissingDeliveryField("cron delivery channel is missing");
}
if (!resolvedDelivery.to) {
return failOrWarnMissingDeliveryField("cron delivery target is missing");
}
const identity = resolveAgentOutboundIdentity(cfgWithAgentDefaults, agentId);
// Route text-only cron announce output back through the main session so it
// follows the same system-message injection path as subagent completions.
@@ -678,165 +843,14 @@ export async function runCronIsolatedAgentTurn(params: {
const useDirectDelivery =
deliveryPayloadHasStructuredContent || resolvedDelivery.threadId != null;
if (useDirectDelivery) {
try {
const payloadsForDelivery =
deliveryPayloads.length > 0
? deliveryPayloads
: synthesizedText
? [{ text: synthesizedText }]
: [];
if (payloadsForDelivery.length > 0) {
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
}
const deliveryResults = await deliverOutboundPayloads({
cfg: cfgWithAgentDefaults,
channel: resolvedDelivery.channel,
to: resolvedDelivery.to,
accountId: resolvedDelivery.accountId,
threadId: resolvedDelivery.threadId,
payloads: payloadsForDelivery,
agentId,
identity,
bestEffort: deliveryBestEffort,
deps: createOutboundSendDeps(params.deps),
abortSignal,
});
delivered = deliveryResults.length > 0;
}
} catch (err) {
if (!deliveryBestEffort) {
return withRunSession({
status: "error",
summary,
outputText,
error: String(err),
...telemetry,
});
}
const directResult = await deliverViaDirect(resolvedDelivery);
if (directResult) {
return directResult;
}
} else if (synthesizedText) {
const announceMainSessionKey = resolveAgentMainSessionKey({
cfg: params.cfg,
agentId,
});
const announceSessionKey = await resolveCronAnnounceSessionKey({
cfg: cfgWithAgentDefaults,
agentId,
fallbackSessionKey: announceMainSessionKey,
delivery: {
channel: resolvedDelivery.channel,
to: resolvedDelivery.to,
accountId: resolvedDelivery.accountId,
threadId: resolvedDelivery.threadId,
},
});
const taskLabel =
typeof params.job.name === "string" && params.job.name.trim()
? params.job.name.trim()
: `cron:${params.job.id}`;
const initialSynthesizedText = synthesizedText.trim();
let activeSubagentRuns = countActiveDescendantRuns(agentSessionKey);
const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText);
const hadActiveDescendants = activeSubagentRuns > 0;
if (activeSubagentRuns > 0 || expectedSubagentFollowup) {
let finalReply = await waitForDescendantSubagentSummary({
sessionKey: agentSessionKey,
initialReply: initialSynthesizedText,
timeoutMs,
observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup,
});
activeSubagentRuns = countActiveDescendantRuns(agentSessionKey);
if (
!finalReply &&
activeSubagentRuns === 0 &&
(hadActiveDescendants || expectedSubagentFollowup)
) {
finalReply = await readDescendantSubagentFallbackReply({
sessionKey: agentSessionKey,
runStartedAt,
});
}
if (finalReply && activeSubagentRuns === 0) {
outputText = finalReply;
summary = pickSummaryFromOutput(finalReply) ?? summary;
synthesizedText = finalReply;
deliveryPayloads = [{ text: finalReply }];
}
}
if (activeSubagentRuns > 0) {
// Parent orchestration is still in progress; avoid announcing a partial
// update to the main requester.
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
}
if (
(hadActiveDescendants || expectedSubagentFollowup) &&
synthesizedText.trim() === initialSynthesizedText &&
isLikelyInterimCronMessage(initialSynthesizedText) &&
initialSynthesizedText.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase()
) {
// Descendants existed but no post-orchestration synthesis arrived, so
// suppress stale parent text like "on it, pulling everything together".
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
}
if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry });
}
try {
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
}
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: agentSessionKey,
childRunId: `${params.job.id}:${runSessionId}:${runStartedAt}`,
requesterSessionKey: announceSessionKey,
requesterOrigin: {
channel: resolvedDelivery.channel,
to: resolvedDelivery.to,
accountId: resolvedDelivery.accountId,
threadId: resolvedDelivery.threadId,
},
requesterDisplayKey: announceSessionKey,
task: taskLabel,
timeoutMs,
cleanup: params.job.deleteAfterRun ? "delete" : "keep",
roundOneReply: synthesizedText,
// Keep delivery outcome truthful for cron state: if outbound send fails,
// announce flow must report false so caller can apply best-effort policy.
bestEffortDeliver: false,
waitForCompletion: false,
startedAt: runStartedAt,
endedAt: runEndedAt,
outcome: { status: "ok" },
announceType: "cron job",
signal: abortSignal,
});
if (didAnnounce) {
delivered = true;
} else {
const message = "cron announce delivery failed";
if (!deliveryBestEffort) {
return withRunSession({
status: "error",
summary,
outputText,
error: message,
...telemetry,
});
}
logWarn(`[cron:${params.job.id}] ${message}`);
}
} catch (err) {
if (!deliveryBestEffort) {
return withRunSession({
status: "error",
summary,
outputText,
error: String(err),
...telemetry,
});
}
logWarn(`[cron:${params.job.id}] ${String(err)}`);
} else {
const announceResult = await deliverViaAnnounce(resolvedDelivery);
if (announceResult) {
return announceResult;
}
}
}