mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 07:54:07 +00:00
[Fix] Prefer external session delivery context (#87476)
* fix(sessions): prefer external delivery context Signed-off-by: samzong <samzong.lu@gmail.com> * fix: route Feishu session announces from delivery context * fix: accept normalized cron schedule inputs --------- Signed-off-by: samzong <samzong.lu@gmail.com> Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
@@ -96,6 +96,12 @@ afterAll(() => {
|
||||
vi.resetModules();
|
||||
});
|
||||
|
||||
describe("feishuPlugin metadata", () => {
|
||||
it("opts announce delivery into persisted session lookup", () => {
|
||||
expect(feishuPlugin.meta.preferSessionLookupForAnnounceTarget).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("feishuPlugin.status.probeAccount", () => {
|
||||
it("uses current account credentials for multi-account config", async () => {
|
||||
const cfg = {
|
||||
|
||||
@@ -139,6 +139,7 @@ const meta: ChannelMeta = {
|
||||
blurb: "飞书/Lark enterprise messaging.",
|
||||
aliases: ["lark"],
|
||||
order: 70,
|
||||
preferSessionLookupForAnnounceTarget: true,
|
||||
};
|
||||
|
||||
const loadFeishuChannelRuntime = createLazyRuntimeNamedExport(
|
||||
|
||||
@@ -111,6 +111,30 @@ const installRegistry = async () => {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
pluginId: "feishu",
|
||||
source: "test",
|
||||
plugin: {
|
||||
id: "feishu",
|
||||
meta: {
|
||||
id: "feishu",
|
||||
label: "Feishu",
|
||||
selectionLabel: "Feishu",
|
||||
docsPath: "/channels/feishu",
|
||||
blurb: "Feishu test stub.",
|
||||
preferSessionLookupForAnnounceTarget: true,
|
||||
},
|
||||
capabilities: { chatTypes: ["direct", "group"] },
|
||||
messaging: {
|
||||
resolveSessionConversation: resolveSessionConversationStub,
|
||||
resolveSessionTarget: resolveSessionTargetStub,
|
||||
},
|
||||
config: {
|
||||
listAccountIds: () => ["default"],
|
||||
resolveAccount: () => ({}),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
pluginId: "whatsapp",
|
||||
source: "test",
|
||||
@@ -408,6 +432,43 @@ describe("resolveAnnounceTarget", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("hydrates announce delivery from explicit external context over stale webchat session fields", async () => {
|
||||
callGatewayMock.mockResolvedValueOnce({
|
||||
sessions: [
|
||||
{
|
||||
key: "agent:main:feishu:direct:ou_user",
|
||||
channel: "webchat",
|
||||
lastChannel: "webchat",
|
||||
lastTo: "session:dashboard",
|
||||
route: {
|
||||
channel: "webchat",
|
||||
target: { to: "session:dashboard" },
|
||||
},
|
||||
deliveryContext: {
|
||||
channel: "feishu",
|
||||
to: "user:ou_user",
|
||||
},
|
||||
origin: {
|
||||
provider: "feishu",
|
||||
accountId: "work",
|
||||
threadId: "thread-77",
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const target = await resolveAnnounceTarget({
|
||||
sessionKey: "agent:main:feishu:direct:ou_user",
|
||||
displayKey: "agent:main:feishu:direct:ou_user",
|
||||
});
|
||||
expect(target).toEqual({
|
||||
channel: "feishu",
|
||||
to: "user:ou_user",
|
||||
accountId: "work",
|
||||
threadId: "thread-77",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves threaded Slack session keys when sessions.list lacks stored thread metadata", async () => {
|
||||
callGatewayMock.mockResolvedValueOnce({
|
||||
sessions: [
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { normalizeOptionalString } from "../shared/string-coerce.js";
|
||||
import { coerceFiniteScheduleNumber } from "./schedule-number.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
|
||||
function readString(record: Record<string, unknown>, key: string): string | undefined {
|
||||
return normalizeOptionalString(record[key]);
|
||||
@@ -76,8 +75,8 @@ export function tryCronScheduleIdentity(
|
||||
}
|
||||
|
||||
export function cronSchedulingInputsEqual(
|
||||
previous: Pick<CronJob, "schedule"> & { enabled?: unknown },
|
||||
next: Pick<CronJob, "schedule"> & { enabled?: unknown },
|
||||
previous: { schedule?: unknown; enabled?: unknown },
|
||||
next: { schedule?: unknown; enabled?: unknown },
|
||||
): boolean {
|
||||
const previousIdentity = tryCronScheduleIdentity(previous as Record<string, unknown>);
|
||||
const nextIdentity = tryCronScheduleIdentity(next as Record<string, unknown>);
|
||||
|
||||
@@ -1,16 +1,20 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, type Mock } from "vitest";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi, type Mock } from "vitest";
|
||||
import { testing as agentStepTesting } from "../agents/tools/agent-step.js";
|
||||
import { runSessionsSendA2AFlow } from "../agents/tools/sessions-send-tool.a2a.js";
|
||||
import { resolveSessionTranscriptPath } from "../config/sessions.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
|
||||
import { captureEnv } from "../test-utils/env.js";
|
||||
import {
|
||||
agentCommand,
|
||||
getFreePort,
|
||||
installGatewayTestHooks,
|
||||
startGatewayServer,
|
||||
setTestPluginRegistry,
|
||||
testState,
|
||||
writeSessionStore,
|
||||
} from "./test-helpers.js";
|
||||
@@ -171,6 +175,114 @@ describe("sessions_send gateway loopback", () => {
|
||||
expect(firstCall?.inputProvenance?.kind).toBe("inter_session");
|
||||
expect(firstCall?.inputProvenance?.sourceTool).toBe("sessions_send");
|
||||
});
|
||||
|
||||
it(
|
||||
"announces through gateway send using external deliveryContext over stale webchat session fields",
|
||||
{ timeout: SESSION_SEND_E2E_TIMEOUT_MS },
|
||||
async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-send-route-"));
|
||||
const sendCalls: Array<{
|
||||
to?: string;
|
||||
text?: string;
|
||||
accountId?: string | null;
|
||||
threadId?: string | number | null;
|
||||
}> = [];
|
||||
setTestPluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "whatsapp",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "whatsapp",
|
||||
label: "WhatsApp",
|
||||
outbound: {
|
||||
deliveryMode: "direct",
|
||||
resolveTarget: ({ to }) => {
|
||||
const target = to?.trim();
|
||||
return target
|
||||
? { ok: true, to: target }
|
||||
: { ok: false, error: new Error("missing target") };
|
||||
},
|
||||
sendText: async (ctx) => {
|
||||
sendCalls.push({
|
||||
to: ctx.to,
|
||||
text: ctx.text,
|
||||
accountId: ctx.accountId,
|
||||
threadId: ctx.threadId,
|
||||
});
|
||||
return { channel: "whatsapp", messageId: "wa-proof-msg" };
|
||||
},
|
||||
},
|
||||
messaging: {
|
||||
normalizeTarget: (raw) => raw,
|
||||
},
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
testState.sessionStorePath = path.join(dir, "sessions.json");
|
||||
try {
|
||||
await writeSessionStore({
|
||||
entries: {
|
||||
"agent:main:whatsapp:direct:peer-1": {
|
||||
sessionId: "sess-whatsapp-peer",
|
||||
updatedAt: Date.now(),
|
||||
channel: "webchat",
|
||||
lastChannel: "webchat",
|
||||
lastTo: "session:dashboard",
|
||||
route: {
|
||||
channel: "webchat",
|
||||
target: { to: "session:dashboard" },
|
||||
},
|
||||
deliveryContext: {
|
||||
channel: "whatsapp",
|
||||
to: "peer-1",
|
||||
},
|
||||
origin: {
|
||||
provider: "whatsapp",
|
||||
accountId: "work",
|
||||
threadId: "thread-77",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
agentStepTesting.setDepsForTest({
|
||||
agentCommandFromIngress: async () => ({
|
||||
payloads: [{ text: "announce through channel", mediaUrl: null }],
|
||||
meta: { durationMs: 1 },
|
||||
}),
|
||||
});
|
||||
|
||||
await runSessionsSendA2AFlow({
|
||||
targetSessionKey: "agent:main:whatsapp:direct:peer-1",
|
||||
displayKey: "agent:main:whatsapp:direct:peer-1",
|
||||
message: "ping",
|
||||
announceTimeoutMs: 5_000,
|
||||
maxPingPongTurns: 0,
|
||||
roundOneReply: "target response",
|
||||
});
|
||||
|
||||
await vi.waitFor(
|
||||
() =>
|
||||
expect(sendCalls).toEqual([
|
||||
{
|
||||
to: "peer-1",
|
||||
text: "announce through channel",
|
||||
accountId: "work",
|
||||
threadId: "thread-77",
|
||||
},
|
||||
]),
|
||||
{ timeout: 5_000 },
|
||||
);
|
||||
} finally {
|
||||
agentStepTesting.setDepsForTest();
|
||||
testState.sessionStorePath = undefined;
|
||||
await fs.rm(dir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe("sessions_send label lookup", () => {
|
||||
|
||||
@@ -8,7 +8,12 @@ import {
|
||||
} from "../plugin-sdk/channel-route.js";
|
||||
import { normalizeAccountId } from "./account-id.js";
|
||||
import type { DeliveryContext, DeliveryContextSessionSource } from "./delivery-context.types.js";
|
||||
import {
|
||||
INTERNAL_MESSAGE_CHANNEL,
|
||||
isInternalNonDeliveryChannel,
|
||||
} from "./message-channel-constants.js";
|
||||
import { normalizeMessageChannel } from "./message-channel-core.js";
|
||||
import { isDeliverableMessageChannel } from "./message-channel-normalize.js";
|
||||
export type { DeliveryContext, DeliveryContextSessionSource } from "./delivery-context.types.js";
|
||||
|
||||
export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
|
||||
@@ -93,6 +98,30 @@ function mergeRouteMetadataWithDeliveryContext(
|
||||
});
|
||||
}
|
||||
|
||||
function isInternalRouteContext(context?: DeliveryContext): boolean {
|
||||
const channel = context?.channel;
|
||||
return Boolean(
|
||||
channel && (channel === INTERNAL_MESSAGE_CHANNEL || isInternalNonDeliveryChannel(channel)),
|
||||
);
|
||||
}
|
||||
|
||||
function hasExternalDeliveryTarget(context?: DeliveryContext): boolean {
|
||||
const channel = normalizeMessageChannel(context?.channel);
|
||||
return Boolean(channel && isDeliverableMessageChannel(channel) && context?.to);
|
||||
}
|
||||
|
||||
function mergeExternalDeliveryContextOverInternalRoute(
|
||||
deliveryContext?: DeliveryContext,
|
||||
internalContext?: DeliveryContext,
|
||||
): DeliveryContext | undefined {
|
||||
return normalizeDeliveryContext({
|
||||
channel: deliveryContext?.channel,
|
||||
to: deliveryContext?.to,
|
||||
accountId: deliveryContext?.accountId ?? internalContext?.accountId,
|
||||
threadId: deliveryContext?.threadId ?? internalContext?.threadId,
|
||||
});
|
||||
}
|
||||
|
||||
export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): {
|
||||
route?: ChannelRouteRef;
|
||||
deliveryContext?: DeliveryContext;
|
||||
@@ -120,10 +149,17 @@ export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSo
|
||||
accountId: source.lastAccountId,
|
||||
threadId: source.lastThreadId,
|
||||
});
|
||||
const merged = mergeDeliveryContext(
|
||||
routeContext,
|
||||
mergeDeliveryContext(legacyContext, normalizeDeliveryContext(source.deliveryContext)),
|
||||
);
|
||||
const deliveryContext = normalizeDeliveryContext(source.deliveryContext);
|
||||
const sessionContext =
|
||||
isInternalRouteContext(legacyContext) && hasExternalDeliveryTarget(deliveryContext)
|
||||
? mergeExternalDeliveryContextOverInternalRoute(deliveryContext, legacyContext)
|
||||
: mergeDeliveryContext(legacyContext, deliveryContext);
|
||||
const routeInternalContext = mergeDeliveryContext(routeContext, legacyContext);
|
||||
const routeIsInternalFallback =
|
||||
isInternalRouteContext(routeContext) && hasExternalDeliveryTarget(deliveryContext);
|
||||
const merged = routeIsInternalFallback
|
||||
? mergeExternalDeliveryContextOverInternalRoute(deliveryContext, routeInternalContext)
|
||||
: mergeDeliveryContext(routeContext, sessionContext);
|
||||
|
||||
if (!merged) {
|
||||
return {
|
||||
@@ -137,7 +173,10 @@ export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSo
|
||||
}
|
||||
|
||||
return {
|
||||
route: mergeRouteMetadataWithDeliveryContext(normalizedRoute, merged),
|
||||
route: mergeRouteMetadataWithDeliveryContext(
|
||||
routeIsInternalFallback ? undefined : normalizedRoute,
|
||||
merged,
|
||||
),
|
||||
deliveryContext: merged,
|
||||
lastChannel: merged.channel,
|
||||
lastTo: merged.to,
|
||||
|
||||
@@ -261,6 +261,131 @@ describe("delivery context helpers", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("prefers explicit external delivery context over stale webchat legacy fields", () => {
|
||||
expect(
|
||||
deliveryContextFromSession({
|
||||
channel: "webchat",
|
||||
deliveryContext: {
|
||||
channel: "room-chat",
|
||||
to: " peer-1 ",
|
||||
accountId: " acct-1 ",
|
||||
threadId: " thread-1 ",
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "room-chat",
|
||||
to: "peer-1",
|
||||
accountId: "acct-1",
|
||||
threadId: "thread-1",
|
||||
});
|
||||
|
||||
expect(
|
||||
deliveryContextFromSession({
|
||||
channel: "webchat",
|
||||
lastChannel: "webchat",
|
||||
lastTo: "session:dashboard",
|
||||
lastAccountId: "work",
|
||||
lastThreadId: "thread-2",
|
||||
deliveryContext: {
|
||||
channel: "room-chat",
|
||||
to: "peer-2",
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "room-chat",
|
||||
to: "peer-2",
|
||||
accountId: "work",
|
||||
threadId: "thread-2",
|
||||
});
|
||||
|
||||
expect(
|
||||
deliveryContextFromSession({
|
||||
lastChannel: "heartbeat",
|
||||
lastTo: "heartbeat",
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "-100123",
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "telegram",
|
||||
to: "-100123",
|
||||
accountId: undefined,
|
||||
});
|
||||
|
||||
const routeNormalized = normalizeSessionDeliveryFields({
|
||||
route: {
|
||||
channel: "webchat",
|
||||
accountId: "work",
|
||||
target: { to: "session:dashboard" },
|
||||
thread: { id: "thread-route" },
|
||||
},
|
||||
deliveryContext: {
|
||||
channel: "room-chat",
|
||||
to: "peer-route",
|
||||
},
|
||||
});
|
||||
expect(routeNormalized.deliveryContext).toEqual({
|
||||
channel: "room-chat",
|
||||
to: "peer-route",
|
||||
accountId: "work",
|
||||
threadId: "thread-route",
|
||||
});
|
||||
expect(routeNormalized.route).toEqual({
|
||||
channel: "room-chat",
|
||||
accountId: "work",
|
||||
target: { to: "peer-route" },
|
||||
thread: { id: "thread-route" },
|
||||
});
|
||||
});
|
||||
|
||||
it("does not promote tool-only context over internal session delivery", () => {
|
||||
const normalized = normalizeSessionDeliveryFields({
|
||||
route: {
|
||||
channel: "webchat",
|
||||
accountId: "work",
|
||||
target: { to: "session:dashboard" },
|
||||
},
|
||||
deliveryContext: {
|
||||
channel: "sessions_send",
|
||||
to: "session:handoff",
|
||||
},
|
||||
});
|
||||
|
||||
expect(normalized.deliveryContext).toEqual({
|
||||
channel: "webchat",
|
||||
to: "session:dashboard",
|
||||
accountId: "work",
|
||||
});
|
||||
expect(normalized.route).toEqual({
|
||||
channel: "webchat",
|
||||
accountId: "work",
|
||||
target: { to: "session:dashboard" },
|
||||
});
|
||||
|
||||
const staleLegacyExternal = normalizeSessionDeliveryFields({
|
||||
route: {
|
||||
channel: "webchat",
|
||||
accountId: "work",
|
||||
target: { to: "session:dashboard" },
|
||||
},
|
||||
lastChannel: "room-chat",
|
||||
lastTo: "peer-old",
|
||||
lastAccountId: "old-workspace",
|
||||
});
|
||||
|
||||
expect(staleLegacyExternal.deliveryContext).toEqual({
|
||||
channel: "webchat",
|
||||
to: "session:dashboard",
|
||||
accountId: "work",
|
||||
});
|
||||
expect(staleLegacyExternal.route).toEqual({
|
||||
channel: "webchat",
|
||||
accountId: "work",
|
||||
target: { to: "session:dashboard" },
|
||||
});
|
||||
});
|
||||
|
||||
it("normalizes delivery fields, mirrors session fields, and avoids cross-channel carryover", () => {
|
||||
const normalized = normalizeSessionDeliveryFields({
|
||||
deliveryContext: {
|
||||
|
||||
Reference in New Issue
Block a user