mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 16:54:46 +00:00
refactor: resolve cron delivery from context (#82241)
This commit is contained in:
committed by
GitHub
parent
daef8e73fc
commit
fc9798a788
@@ -1,7 +1,8 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const { callGatewayMock } = vi.hoisted(() => ({
|
||||
const { callGatewayMock, extractDeliveryInfoMock } = vi.hoisted(() => ({
|
||||
callGatewayMock: vi.fn(),
|
||||
extractDeliveryInfoMock: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../agent-scope.js", async () => {
|
||||
@@ -12,6 +13,10 @@ vi.mock("../agent-scope.js", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../../config/sessions/delivery-info.js", () => ({
|
||||
extractDeliveryInfo: extractDeliveryInfoMock,
|
||||
}));
|
||||
|
||||
import { buildAgentPeerSessionKey } from "../../routing/session-key.js";
|
||||
import { createCronTool } from "./cron-tool.js";
|
||||
|
||||
@@ -164,6 +169,8 @@ describe("cron tool", () => {
|
||||
beforeEach(() => {
|
||||
callGatewayMock.mockClear();
|
||||
callGatewayMock.mockResolvedValue({ ok: true });
|
||||
extractDeliveryInfoMock.mockReset();
|
||||
extractDeliveryInfoMock.mockReturnValue({ deliveryContext: undefined, threadId: undefined });
|
||||
});
|
||||
|
||||
it("marks cron as owner-only", () => {
|
||||
@@ -721,89 +728,56 @@ describe("cron tool", () => {
|
||||
expect(call.params?.agentId).toBeNull();
|
||||
});
|
||||
|
||||
it("infers delivery from threaded session keys", async () => {
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-thread",
|
||||
agentSessionKey: "agent:main:slack:channel:general:thread:1699999999.0001",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "slack",
|
||||
to: "general",
|
||||
it("does not infer delivery from raw session-key fragments without delivery context", async () => {
|
||||
const slackDelivery = await executeAddAndReadDelivery({
|
||||
callId: "call-thread",
|
||||
agentSessionKey: "agent:main:slack:channel:general:thread:1699999999.0001",
|
||||
});
|
||||
const telegramDelivery = await executeAddAndReadDelivery({
|
||||
callId: "call-telegram-topic",
|
||||
agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99",
|
||||
});
|
||||
|
||||
expect(slackDelivery?.channel).toBeUndefined();
|
||||
expect(slackDelivery?.to).toBeUndefined();
|
||||
expect(telegramDelivery?.channel).toBeUndefined();
|
||||
expect(telegramDelivery?.to).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves telegram forum topics when inferring delivery", async () => {
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-telegram-topic",
|
||||
agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-1001234567890:topic:99",
|
||||
it("uses stored delivery context when current context is unavailable", async () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: {
|
||||
channel: "matrix",
|
||||
to: "room:!AbCdEf1234567890:example.org",
|
||||
accountId: "bot-a",
|
||||
threadId: "$RootEvent:Example.Org",
|
||||
},
|
||||
threadId: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves telegram direct-chat thread ids when inferring delivery", async () => {
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-telegram-direct-thread",
|
||||
agentSessionKey: "agent:main:telegram:direct:123456789:thread:123456789:99",
|
||||
callId: "call-stored-context",
|
||||
agentSessionKey: "agent:main:matrix:channel:!abcdef1234567890:example.org",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "123456789",
|
||||
threadId: "99",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves telegram account ids with direct-chat thread inference", async () => {
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-telegram-account-direct-thread",
|
||||
agentSessionKey: "agent:main:telegram:bot-a:direct:123456789:thread:123456789:99",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "123456789",
|
||||
channel: "matrix",
|
||||
to: "room:!AbCdEf1234567890:example.org",
|
||||
accountId: "bot-a",
|
||||
threadId: "99",
|
||||
threadId: "$RootEvent:Example.Org",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves legacy telegram dm thread ids when inferring delivery", async () => {
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-telegram-dm-thread",
|
||||
agentSessionKey: "agent:main:telegram:dm:123456789:thread:123456789:99",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "123456789",
|
||||
threadId: "99",
|
||||
it("prefers current delivery context over stored session context", async () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: {
|
||||
channel: "matrix",
|
||||
to: "!stored:example.org",
|
||||
},
|
||||
threadId: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("drops mismatched telegram direct-chat thread ids when inferring delivery", async () => {
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-telegram-mismatched-direct-thread",
|
||||
agentSessionKey: "agent:main:telegram:direct:123456789:thread:987654321:99",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "123456789",
|
||||
});
|
||||
});
|
||||
|
||||
it("prefers current delivery context over lowercased session-key targets", async () => {
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-current-context",
|
||||
@@ -825,12 +799,8 @@ describe("cron tool", () => {
|
||||
});
|
||||
|
||||
it("does not surface lowercased LINE recipients when current delivery context is unavailable (#81628)", async () => {
|
||||
// Reproduces openclaw/openclaw#81628. LINE chat IDs are case-sensitive — push
|
||||
// requires capital C/U/R; lowercased recipients return HTTP 400. The runtime
|
||||
// already lowercases LINE peer IDs when canonicalizing the session key, and
|
||||
// when the delivery-recovery / post-reply-token-expiry push path is missing
|
||||
// currentDeliveryContext, inferDeliveryFromSessionKey lifts the lowercased
|
||||
// fragment straight into delivery.to.
|
||||
// LINE chat IDs are case-sensitive; without current/persisted deliveryContext,
|
||||
// cron must not rebuild delivery.to from the lowercased session-key fragment.
|
||||
const sessionKey = buildAgentPeerSessionKey({
|
||||
agentId: "main",
|
||||
channel: "line",
|
||||
@@ -842,9 +812,7 @@ describe("cron tool", () => {
|
||||
const delivery = await executeAddAndReadDelivery({
|
||||
callId: "call-line-group-no-context-81628",
|
||||
agentSessionKey: sessionKey,
|
||||
// Intentionally no currentDeliveryContext — emulates the delivery-recovery
|
||||
// boundary that reloads queued entries from disk after the reply token has
|
||||
// expired.
|
||||
// Intentionally no currentDeliveryContext.
|
||||
});
|
||||
|
||||
expect(delivery?.to).toBeUndefined();
|
||||
@@ -990,7 +958,15 @@ describe("cron tool", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to session-key inference when current context has no target", async () => {
|
||||
it("falls back to stored delivery context when current context has no target", async () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "-1001234567890",
|
||||
},
|
||||
threadId: "99",
|
||||
});
|
||||
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-empty-current-context",
|
||||
@@ -1003,7 +979,8 @@ describe("cron tool", () => {
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-1001234567890:topic:99",
|
||||
to: "-1001234567890",
|
||||
threadId: "99",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1022,6 +999,13 @@ describe("cron tool", () => {
|
||||
});
|
||||
|
||||
it("infers delivery when delivery is null", async () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: {
|
||||
to: "alice",
|
||||
},
|
||||
threadId: undefined,
|
||||
});
|
||||
|
||||
expect(
|
||||
await executeAddAndReadDelivery({
|
||||
callId: "call-null-delivery",
|
||||
|
||||
@@ -1,23 +1,13 @@
|
||||
import { Type, type TSchema } from "typebox";
|
||||
import { getRuntimeConfig } from "../../config/config.js";
|
||||
import { resolveCronCreationDelivery } from "../../cron/delivery-context.js";
|
||||
import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js";
|
||||
import type { CronDelivery, CronMessageChannel } from "../../cron/types.js";
|
||||
import type { CronDelivery } from "../../cron/types.js";
|
||||
import { normalizeHttpWebhookUrl } from "../../cron/webhook-url.js";
|
||||
import {
|
||||
parseAgentSessionKey,
|
||||
parseThreadSessionSuffix,
|
||||
} from "../../sessions/session-key-utils.js";
|
||||
import { extractTextFromChatContent } from "../../shared/chat-content.js";
|
||||
import {
|
||||
normalizeLowercaseStringOrEmpty,
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
} from "../../shared/string-coerce.js";
|
||||
import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js";
|
||||
import { isRecord, truncateUtf16Safe } from "../../utils.js";
|
||||
import {
|
||||
normalizeDeliveryContext,
|
||||
type DeliveryContext,
|
||||
} from "../../utils/delivery-context.shared.js";
|
||||
import type { DeliveryContext } from "../../utils/delivery-context.shared.js";
|
||||
import { resolveSessionAgentId } from "../agent-scope.js";
|
||||
import { optionalStringEnum, stringEnum } from "../schema/typebox.js";
|
||||
import { CRON_TOOL_DISPLAY_SUMMARY } from "../tool-description-presets.js";
|
||||
@@ -500,143 +490,6 @@ async function buildReminderContextLines(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function stripThreadSuffixFromSessionKey(sessionKey: string): string {
|
||||
const normalized = normalizeLowercaseStringOrEmpty(sessionKey);
|
||||
const idx = normalized.lastIndexOf(":thread:");
|
||||
if (idx <= 0) {
|
||||
return sessionKey;
|
||||
}
|
||||
const parent = sessionKey.slice(0, idx).trim();
|
||||
return parent ? parent : sessionKey;
|
||||
}
|
||||
|
||||
function resolveTelegramDirectThreadId(params: {
|
||||
peerId: string;
|
||||
threadId?: string;
|
||||
}): string | undefined {
|
||||
const threadId = normalizeOptionalString(params.threadId);
|
||||
if (!threadId) {
|
||||
return undefined;
|
||||
}
|
||||
const peerId = normalizeOptionalString(params.peerId);
|
||||
if (!peerId) {
|
||||
return undefined;
|
||||
}
|
||||
const [threadChatId, ...threadIdParts] = threadId.split(":");
|
||||
if (threadIdParts.length === 0) {
|
||||
return threadId;
|
||||
}
|
||||
if (normalizeOptionalLowercaseString(threadChatId) !== peerId) {
|
||||
return undefined;
|
||||
}
|
||||
return normalizeOptionalString(threadIdParts.join(":"));
|
||||
}
|
||||
|
||||
function inferDeliveryFromSessionKey(agentSessionKey?: string): CronDelivery | null {
|
||||
const rawSessionKey = agentSessionKey?.trim();
|
||||
if (!rawSessionKey) {
|
||||
return null;
|
||||
}
|
||||
const threadSuffix = parseThreadSessionSuffix(rawSessionKey);
|
||||
const parsed = parseAgentSessionKey(
|
||||
threadSuffix.baseSessionKey ?? stripThreadSuffixFromSessionKey(rawSessionKey),
|
||||
);
|
||||
if (!parsed || !parsed.rest) {
|
||||
return null;
|
||||
}
|
||||
const parts = parsed.rest.split(":").filter(Boolean);
|
||||
if (parts.length === 0) {
|
||||
return null;
|
||||
}
|
||||
const head = normalizeOptionalLowercaseString(parts[0]);
|
||||
if (!head || head === "main" || head === "subagent" || head === "acp") {
|
||||
return null;
|
||||
}
|
||||
|
||||
// buildAgentPeerSessionKey encodes peers as:
|
||||
// - direct:<peerId>
|
||||
// - <channel>:direct:<peerId>
|
||||
// - <channel>:<accountId>:direct:<peerId>
|
||||
// - <channel>:group:<peerId>
|
||||
// - <channel>:channel:<peerId>
|
||||
// Note: legacy keys may use "dm" instead of "direct".
|
||||
// Threaded sessions append :thread:<id>, which we strip so delivery targets the parent peer.
|
||||
// NOTE: Telegram forum topics encode as <chatId>:topic:<topicId> and should be preserved.
|
||||
const markerIndex = parts.findIndex(
|
||||
(part) => part === "direct" || part === "dm" || part === "group" || part === "channel",
|
||||
);
|
||||
if (markerIndex === -1) {
|
||||
return null;
|
||||
}
|
||||
const peerId = parts
|
||||
.slice(markerIndex + 1)
|
||||
.join(":")
|
||||
.trim();
|
||||
if (!peerId) {
|
||||
return null;
|
||||
}
|
||||
const marker = parts[markerIndex];
|
||||
|
||||
let channel: CronMessageChannel | undefined;
|
||||
if (markerIndex >= 1) {
|
||||
channel = normalizeOptionalLowercaseString(parts[0]) as CronMessageChannel | undefined;
|
||||
}
|
||||
|
||||
// LINE chat ids are case-sensitive (push requires capital C/U/R) but the
|
||||
// session key holds the peer id lowercased for canonical routing. Rebuilding
|
||||
// `to` from the session-key fragment would yield a value LINE rejects with
|
||||
// HTTP 400, so refuse the fallback for LINE and let the caller surface the
|
||||
// missing target instead of silently scheduling an undeliverable job.
|
||||
// openclaw/openclaw#81628
|
||||
const isChannellessLineDirectId =
|
||||
!channel && (marker === "direct" || marker === "dm") && /^[ucr][a-f0-9]{32}$/.test(peerId);
|
||||
if (channel === "line" || isChannellessLineDirectId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const delivery: CronDelivery = { mode: "announce", to: peerId };
|
||||
if (channel) {
|
||||
delivery.channel = channel;
|
||||
}
|
||||
if (channel === "telegram" && markerIndex === 2) {
|
||||
const accountId = normalizeOptionalString(parts[1]);
|
||||
if (accountId) {
|
||||
delivery.accountId = accountId;
|
||||
}
|
||||
}
|
||||
if (channel === "telegram" && (marker === "direct" || marker === "dm")) {
|
||||
const threadId = resolveTelegramDirectThreadId({
|
||||
peerId,
|
||||
threadId: threadSuffix.threadId,
|
||||
});
|
||||
if (threadId) {
|
||||
delivery.threadId = threadId;
|
||||
}
|
||||
}
|
||||
return delivery;
|
||||
}
|
||||
|
||||
function inferDeliveryFromContext(context?: DeliveryContext): CronDelivery | null {
|
||||
const normalized = normalizeDeliveryContext(context);
|
||||
if (!normalized?.to) {
|
||||
return null;
|
||||
}
|
||||
const delivery: CronDelivery = {
|
||||
mode: "announce",
|
||||
to: normalized.to,
|
||||
};
|
||||
if (normalized.channel) {
|
||||
delivery.channel = normalized.channel as CronMessageChannel;
|
||||
}
|
||||
if (normalized.accountId) {
|
||||
delivery.accountId = normalized.accountId;
|
||||
}
|
||||
if (normalized.threadId != null) {
|
||||
delivery.threadId = normalized.threadId;
|
||||
}
|
||||
return delivery;
|
||||
}
|
||||
|
||||
export function createCronTool(opts?: CronToolOptions, deps?: CronToolDeps): AnyAgentTool {
|
||||
const callGateway = deps?.callGatewayTool ?? callGatewayTool;
|
||||
return {
|
||||
@@ -808,8 +661,8 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
|
||||
normalizeCronJobCreate(params.job, {
|
||||
sessionContext: { sessionKey: opts?.agentSessionKey },
|
||||
}) ?? params.job;
|
||||
const cfg = getRuntimeConfig();
|
||||
if (job && typeof job === "object") {
|
||||
const cfg = getRuntimeConfig();
|
||||
const { mainKey, alias } = resolveMainSessionAlias(cfg);
|
||||
const resolvedSessionKey = opts?.agentSessionKey
|
||||
? resolveInternalSessionKey({ key: opts.agentSessionKey, alias, mainKey })
|
||||
@@ -858,9 +711,11 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
|
||||
(mode === "" || mode === "announce") &&
|
||||
!hasTarget;
|
||||
if (shouldInfer) {
|
||||
const inferred =
|
||||
inferDeliveryFromContext(opts.currentDeliveryContext) ??
|
||||
inferDeliveryFromSessionKey(opts.agentSessionKey);
|
||||
const inferred = resolveCronCreationDelivery({
|
||||
cfg,
|
||||
currentDeliveryContext: opts.currentDeliveryContext,
|
||||
agentSessionKey: opts.agentSessionKey,
|
||||
});
|
||||
if (inferred) {
|
||||
(job as { delivery?: unknown }).delivery = {
|
||||
...inferred,
|
||||
|
||||
117
src/cron/delivery-context.test.ts
Normal file
117
src/cron/delivery-context.test.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
|
||||
const { extractDeliveryInfoMock } = vi.hoisted(() => ({
|
||||
extractDeliveryInfoMock: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../config/sessions/delivery-info.js", () => ({
|
||||
extractDeliveryInfo: extractDeliveryInfoMock,
|
||||
}));
|
||||
|
||||
import { cronDeliveryFromContext, resolveCronCreationDelivery } from "./delivery-context.js";
|
||||
|
||||
describe("cron delivery context", () => {
|
||||
const cfg = {} as OpenClawConfig;
|
||||
|
||||
beforeEach(() => {
|
||||
extractDeliveryInfoMock.mockReset();
|
||||
extractDeliveryInfoMock.mockReturnValue({ deliveryContext: undefined, threadId: undefined });
|
||||
});
|
||||
|
||||
it("builds announce delivery from deliveryContext without changing target casing", () => {
|
||||
expect(
|
||||
cronDeliveryFromContext({
|
||||
channel: " Matrix ",
|
||||
to: " !AbCdEf1234567890:Example.Org ",
|
||||
accountId: " Bot-A ",
|
||||
threadId: " $RootEvent:Example.Org ",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "matrix",
|
||||
to: "!AbCdEf1234567890:Example.Org",
|
||||
accountId: "bot-a",
|
||||
threadId: "$RootEvent:Example.Org",
|
||||
});
|
||||
});
|
||||
|
||||
it("prefers current deliveryContext over stored session deliveryContext", () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: { channel: "matrix", to: "!stored:example.org" },
|
||||
threadId: undefined,
|
||||
});
|
||||
|
||||
expect(
|
||||
resolveCronCreationDelivery({
|
||||
cfg,
|
||||
agentSessionKey: "agent:main:matrix:channel:!stored:example.org",
|
||||
currentDeliveryContext: {
|
||||
channel: "matrix",
|
||||
to: "!Current:Example.Org",
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "matrix",
|
||||
to: "!Current:Example.Org",
|
||||
});
|
||||
expect(extractDeliveryInfoMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("uses stored session deliveryContext when current context is absent", () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: {
|
||||
channel: "line",
|
||||
to: "Cabcdef0123456789abcdef0123456789",
|
||||
accountId: "primary",
|
||||
},
|
||||
threadId: undefined,
|
||||
});
|
||||
|
||||
expect(
|
||||
resolveCronCreationDelivery({
|
||||
cfg,
|
||||
agentSessionKey: "agent:main:line:group:cabcdef0123456789abcdef0123456789",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "line",
|
||||
to: "Cabcdef0123456789abcdef0123456789",
|
||||
accountId: "primary",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves parsed thread ids from stored session lookups", () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "-1001234567890",
|
||||
threadId: "stale-topic",
|
||||
},
|
||||
threadId: "99",
|
||||
});
|
||||
|
||||
expect(
|
||||
resolveCronCreationDelivery({
|
||||
cfg,
|
||||
agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99",
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-1001234567890",
|
||||
threadId: "99",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not create delivery without a concrete target", () => {
|
||||
expect(cronDeliveryFromContext({ channel: "matrix", to: " " })).toBeNull();
|
||||
expect(
|
||||
resolveCronCreationDelivery({
|
||||
cfg,
|
||||
agentSessionKey: "agent:main:matrix:channel:!abcdef:example.org",
|
||||
}),
|
||||
).toBeNull();
|
||||
});
|
||||
});
|
||||
59
src/cron/delivery-context.ts
Normal file
59
src/cron/delivery-context.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { extractDeliveryInfo } from "../config/sessions/delivery-info.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import {
|
||||
normalizeDeliveryContext,
|
||||
type DeliveryContext,
|
||||
} from "../utils/delivery-context.shared.js";
|
||||
import type { CronDelivery, CronMessageChannel } from "./types.js";
|
||||
|
||||
export function cronDeliveryFromContext(context?: DeliveryContext): CronDelivery | null {
|
||||
const normalized = normalizeDeliveryContext(context);
|
||||
if (!normalized?.to) {
|
||||
return null;
|
||||
}
|
||||
const delivery: CronDelivery = {
|
||||
mode: "announce",
|
||||
to: normalized.to,
|
||||
};
|
||||
if (normalized.channel) {
|
||||
delivery.channel = normalized.channel as CronMessageChannel;
|
||||
}
|
||||
if (normalized.accountId) {
|
||||
delivery.accountId = normalized.accountId;
|
||||
}
|
||||
if (normalized.threadId != null) {
|
||||
delivery.threadId = normalized.threadId;
|
||||
}
|
||||
return delivery;
|
||||
}
|
||||
|
||||
export function resolveCronStoredDeliveryContext(params: {
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey?: string;
|
||||
}): DeliveryContext | undefined {
|
||||
const sessionKey = params.sessionKey?.trim();
|
||||
if (!sessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey, { cfg: params.cfg });
|
||||
if (deliveryContext && threadId) {
|
||||
return { ...deliveryContext, threadId };
|
||||
}
|
||||
return deliveryContext;
|
||||
}
|
||||
|
||||
export function resolveCronCreationDelivery(params: {
|
||||
cfg: OpenClawConfig;
|
||||
currentDeliveryContext?: DeliveryContext;
|
||||
agentSessionKey?: string;
|
||||
}): CronDelivery | null {
|
||||
return (
|
||||
cronDeliveryFromContext(params.currentDeliveryContext) ??
|
||||
cronDeliveryFromContext(
|
||||
resolveCronStoredDeliveryContext({
|
||||
cfg: params.cfg,
|
||||
sessionKey: params.agentSessionKey,
|
||||
}),
|
||||
)
|
||||
);
|
||||
}
|
||||
@@ -8,10 +8,19 @@ import {
|
||||
import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js";
|
||||
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
|
||||
|
||||
const { extractDeliveryInfoMock } = vi.hoisted(() => ({
|
||||
extractDeliveryInfoMock: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/sessions/main-session.js", () => ({
|
||||
canonicalizeMainSessionAlias: vi.fn(({ sessionKey }) => sessionKey),
|
||||
resolveAgentMainSessionKey: vi.fn().mockReturnValue("agent:test:main"),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/sessions/delivery-info.js", () => ({
|
||||
extractDeliveryInfo: extractDeliveryInfoMock,
|
||||
}));
|
||||
|
||||
vi.mock("../../config/sessions/paths.js", () => ({
|
||||
resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"),
|
||||
}));
|
||||
@@ -39,6 +48,7 @@ vi.mock("../../infra/outbound/targets.runtime.js", () => ({
|
||||
}));
|
||||
const mockedModuleIds = [
|
||||
"../../config/sessions/main-session.js",
|
||||
"../../config/sessions/delivery-info.js",
|
||||
"../../config/sessions/paths.js",
|
||||
"../../config/sessions/store-load.js",
|
||||
"../../infra/outbound/channel-selection.runtime.js",
|
||||
@@ -102,6 +112,8 @@ const normalizeTelegramTargetForDeliveryTest = vi.fn((raw: string): string | und
|
||||
|
||||
beforeEach(() => {
|
||||
resetPluginRuntimeStateForTest();
|
||||
extractDeliveryInfoMock.mockReset();
|
||||
extractDeliveryInfoMock.mockReturnValue({ deliveryContext: undefined, threadId: undefined });
|
||||
normalizeTelegramTargetForDeliveryTest.mockClear();
|
||||
vi.mocked(readChannelAllowFromStoreEntriesSync).mockReset();
|
||||
vi.mocked(readChannelAllowFromStoreEntriesSync).mockReturnValue([]);
|
||||
@@ -723,6 +735,76 @@ describe("resolveDeliveryTarget", () => {
|
||||
expect(result.threadId).toBe(42);
|
||||
});
|
||||
|
||||
it("prefers stored deliveryContext lookup over exact session-store entries", async () => {
|
||||
extractDeliveryInfoMock.mockReturnValueOnce({
|
||||
deliveryContext: {
|
||||
channel: "alpha",
|
||||
to: "RoomMixedCase",
|
||||
accountId: "primary",
|
||||
threadId: "thread-old-stored",
|
||||
},
|
||||
threadId: "thread-stored",
|
||||
});
|
||||
setSessionStore({
|
||||
"agent:test:thread:42": {
|
||||
sessionId: "thread-session",
|
||||
updatedAt: 2000,
|
||||
lastChannel: "alpha",
|
||||
lastTo: "room-lowercase",
|
||||
lastThreadId: "thread-old",
|
||||
},
|
||||
} as SessionStore);
|
||||
|
||||
const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, {
|
||||
channel: "last",
|
||||
sessionKey: "agent:test:thread:42",
|
||||
to: undefined,
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
ok: true,
|
||||
channel: "alpha",
|
||||
to: "RoomMixedCase",
|
||||
accountId: "primary",
|
||||
threadId: "thread-stored",
|
||||
});
|
||||
});
|
||||
|
||||
it("scopes unqualified stored delivery lookups to the job agent", async () => {
|
||||
extractDeliveryInfoMock.mockImplementation((sessionKey: string) =>
|
||||
sessionKey === "agent:agent-b:main"
|
||||
? {
|
||||
deliveryContext: {
|
||||
channel: "alpha",
|
||||
to: "ops-room",
|
||||
},
|
||||
threadId: undefined,
|
||||
}
|
||||
: {
|
||||
deliveryContext: {
|
||||
channel: "alpha",
|
||||
to: "default-room",
|
||||
},
|
||||
threadId: undefined,
|
||||
},
|
||||
);
|
||||
|
||||
const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, {
|
||||
channel: "last",
|
||||
sessionKey: "main",
|
||||
to: undefined,
|
||||
});
|
||||
|
||||
expect(extractDeliveryInfoMock).toHaveBeenCalledWith("agent:agent-b:main", {
|
||||
cfg: expect.any(Object),
|
||||
});
|
||||
expect(result).toMatchObject({
|
||||
ok: true,
|
||||
channel: "alpha",
|
||||
to: "ops-room",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to the main session entry when the requested sessionKey is missing", async () => {
|
||||
setSessionStore({
|
||||
"agent:test:main": {
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { ChannelId } from "../../channels/plugins/types.public.js";
|
||||
import { resolveAgentMainSessionKey } from "../../config/sessions/main-session.js";
|
||||
import { resolveStorePath } from "../../config/sessions/paths.js";
|
||||
import { loadSessionStore } from "../../config/sessions/store-load.js";
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-id-resolution.js";
|
||||
@@ -12,6 +13,8 @@ import { resolveSessionDeliveryTarget } from "../../infra/outbound/targets-sessi
|
||||
import type { OutboundChannel } from "../../infra/outbound/targets.js";
|
||||
import { normalizeAccountId } from "../../routing/session-key.js";
|
||||
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
|
||||
import { resolveCronStoredDeliveryContext } from "../delivery-context.js";
|
||||
import { resolveCronAgentSessionKey } from "./session-key.js";
|
||||
|
||||
export type DeliveryTargetResolution =
|
||||
| {
|
||||
@@ -133,9 +136,28 @@ export async function resolveDeliveryTarget(
|
||||
|
||||
// Look up thread-specific session first (e.g. agent:main:main:thread:1234),
|
||||
// then fall back to the main session entry.
|
||||
const threadSessionKey = jobPayload.sessionKey?.trim();
|
||||
const rawSessionKey = jobPayload.sessionKey?.trim();
|
||||
const threadSessionKey = rawSessionKey
|
||||
? resolveCronAgentSessionKey({
|
||||
sessionKey: rawSessionKey,
|
||||
agentId,
|
||||
mainKey: cfg.session?.mainKey,
|
||||
cfg,
|
||||
})
|
||||
: undefined;
|
||||
const storedDeliveryContext = resolveCronStoredDeliveryContext({
|
||||
cfg,
|
||||
sessionKey: threadSessionKey,
|
||||
});
|
||||
const storedDeliveryEntry = storedDeliveryContext
|
||||
? ({
|
||||
sessionId: threadSessionKey ?? mainSessionKey,
|
||||
updatedAt: 0,
|
||||
deliveryContext: storedDeliveryContext,
|
||||
} satisfies SessionEntry)
|
||||
: undefined;
|
||||
const threadEntry = threadSessionKey ? store[threadSessionKey] : undefined;
|
||||
const main = threadEntry ?? store[mainSessionKey];
|
||||
const main = storedDeliveryEntry ?? threadEntry ?? store[mainSessionKey];
|
||||
|
||||
const preliminary = resolveSessionDeliveryTarget({
|
||||
entry: main,
|
||||
|
||||
Reference in New Issue
Block a user