diff --git a/CHANGELOG.md b/CHANGELOG.md index b611b6749ca..5c493cdf831 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -146,6 +146,7 @@ Docs: https://docs.openclaw.ai - Sandbox/security: auto-derive CDP source-range from Docker network gateway and refuse to start the socat relay without one, so peer containers cannot reach CDP unauthenticated. (#61404) Thanks @dims. - Daemon/launchd: keep `openclaw gateway stop` persistent without uninstalling the macOS LaunchAgent, re-enable it on explicit restart or repair, and harden launchd label handling. (#64447) Thanks @ngutman. +- Agents/Slack: preserve threaded announce delivery when `sessions.list` rows lack stored thread metadata by falling back to the thread id encoded in the session key. (#63143) Thanks @mariosousa-finn. ## 2026.4.9 diff --git a/src/agents/tools/sessions-announce-target.ts b/src/agents/tools/sessions-announce-target.ts index 39cd9e6656e..4d2ef68fe42 100644 --- a/src/agents/tools/sessions-announce-target.ts +++ b/src/agents/tools/sessions-announce-target.ts @@ -1,5 +1,6 @@ import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js"; import { callGateway } from "../../gateway/call.js"; +import { parseThreadSessionSuffix } from "../../sessions/session-key-utils.js"; import { normalizeOptionalStringifiedId } from "../../shared/string-coerce.js"; import { SessionListRow } from "./sessions-helpers.js"; import type { AnnounceTarget } from "./sessions-send-helpers.js"; @@ -12,6 +13,10 @@ export async function resolveAnnounceTarget(params: { const parsed = resolveAnnounceTargetFromKey(params.sessionKey); const parsedDisplay = resolveAnnounceTargetFromKey(params.displayKey); const fallback = parsed ?? parsedDisplay ?? null; + const fallbackThreadId = + fallback?.threadId ?? + parseThreadSessionSuffix(params.sessionKey).threadId ?? + parseThreadSessionSuffix(params.displayKey).threadId; if (fallback) { const normalized = normalizeChannelId(fallback.channel); @@ -55,7 +60,10 @@ export async function resolveAnnounceTarget(params: { (typeof match?.lastAccountId === "string" ? match.lastAccountId : undefined) ?? (typeof origin?.accountId === "string" ? origin.accountId : undefined); const threadId = normalizeOptionalStringifiedId( - deliveryContext?.threadId ?? match?.lastThreadId, + deliveryContext?.threadId ?? + match?.lastThreadId ?? + origin?.threadId ?? + fallbackThreadId, ); if (channel && to) { return { channel, to, accountId, threadId }; diff --git a/src/agents/tools/sessions-send-tool.a2a.test.ts b/src/agents/tools/sessions-send-tool.a2a.test.ts index a118cd1297e..9d9a9d21048 100644 --- a/src/agents/tools/sessions-send-tool.a2a.test.ts +++ b/src/agents/tools/sessions-send-tool.a2a.test.ts @@ -42,7 +42,6 @@ describe("runSessionsSendA2AFlow announce delivery", () => { roundOneReply: "Worker completed successfully", }); - // Find the gateway send call (not the waitForAgentRun call) const sendCall = gatewayCalls.find((call) => call.method === "send"); expect(sendCall).toBeDefined(); const sendParams = sendCall?.params as Record; diff --git a/src/agents/tools/sessions.test.ts b/src/agents/tools/sessions.test.ts index 460001e1077..b916eac1a18 100644 --- a/src/agents/tools/sessions.test.ts +++ b/src/agents/tools/sessions.test.ts @@ -80,7 +80,6 @@ const installRegistry = async () => { }, capabilities: { chatTypes: ["direct", "channel", "thread"] }, messaging: { - resolveSessionConversation: resolveSessionConversationStub, resolveSessionTarget: resolveSessionTargetStub, }, config: { @@ -113,6 +112,30 @@ const installRegistry = async () => { }, }, }, + { + pluginId: "slack", + source: "test", + plugin: { + id: "slack", + meta: { + id: "slack", + label: "Slack", + selectionLabel: "Slack", + docsPath: "/channels/slack", + blurb: "Slack test stub.", + preferSessionLookupForAnnounceTarget: true, + }, + capabilities: { chatTypes: ["direct", "channel", "thread"] }, + messaging: { + resolveSessionConversation: resolveSessionConversationStub, + resolveSessionTarget: resolveSessionTargetStub, + }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({}), + }, + }, + }, ]), ); }; @@ -145,7 +168,7 @@ function expectWorkerTranscriptPath( ) { const session = getFirstListedSession(result); expect(session).toMatchObject({ key: "agent:worker:main" }); - const transcriptPath = String(session?.transcriptPath ?? ""); + const transcriptPath = session?.transcriptPath ?? ""; expect(path.normalize(transcriptPath)).toContain(path.normalize(params.containsPath)); expect(transcriptPath).toMatch(new RegExp(`${params.sessionId}\\.jsonl$`)); } @@ -336,6 +359,59 @@ describe("resolveAnnounceTarget", () => { threadId: "271", }); }); + + it("keeps threadId from sessions.list delivery context for announce delivery", async () => { + callGatewayMock.mockResolvedValueOnce({ + sessions: [ + { + key: "agent:main:whatsapp:group:123@g.us", + deliveryContext: { + channel: "whatsapp", + to: "123@g.us", + accountId: "work", + threadId: "thread-77", + }, + }, + ], + }); + + const target = await resolveAnnounceTarget({ + sessionKey: "agent:main:whatsapp:group:123@g.us", + displayKey: "agent:main:whatsapp:group:123@g.us", + }); + expect(target).toEqual({ + channel: "whatsapp", + to: "123@g.us", + accountId: "work", + threadId: "thread-77", + }); + }); + + it("preserves threaded Slack session keys when sessions.list lacks stored thread metadata", async () => { + callGatewayMock.mockResolvedValueOnce({ + sessions: [ + { + key: "agent:main:slack:channel:C123:thread:1710000000.000100", + deliveryContext: { + channel: "slack", + to: "channel:C123", + accountId: "workspace", + }, + }, + ], + }); + + const target = await resolveAnnounceTarget({ + sessionKey: "agent:main:slack:channel:C123:thread:1710000000.000100", + displayKey: "agent:main:slack:channel:C123:thread:1710000000.000100", + }); + expect(target).toEqual({ + channel: "slack", + to: "channel:C123", + accountId: "workspace", + threadId: "1710000000.000100", + }); + }); }); describe("sessions_list gating", () => { diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 5a44d3f076d..806a2ec3aaf 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -34,6 +34,7 @@ import { } from "../../shared/string-coerce.js"; import { createRunningTaskRun } from "../../tasks/task-executor.js"; import { + mergeDeliveryContext, normalizeDeliveryContext, normalizeSessionDeliveryFields, } from "../../utils/delivery-context.js"; @@ -563,6 +564,26 @@ export const agentHandlers: GatewayRequestHandlers = { resolvedGroupChannel = resolvedGroupChannel || inheritedGroup?.groupChannel; resolvedGroupSpace = resolvedGroupSpace || inheritedGroup?.groupSpace; const deliveryFields = normalizeSessionDeliveryFields(entry); + // When the session has no delivery context yet (e.g. a freshly-spawned subagent + // with deliver: false), seed it from the request's channel/to/threadId params. + // Without this, subagent sessions end up with deliveryContext: {channel: "slack"} + // and no `to`/`threadId`, which causes announce delivery to either target the + // wrong channel (when the parent's lastTo drifts) or fail entirely. + const requestDeliveryHint = normalizeDeliveryContext({ + channel: request.channel?.trim(), + to: request.to?.trim(), + accountId: request.accountId?.trim(), + // Pass threadId directly — normalizeDeliveryContext handles both + // string and numeric threadIds (e.g., Matrix uses integers). + threadId: request.threadId, + }); + const effectiveDelivery = mergeDeliveryContext( + deliveryFields.deliveryContext, + requestDeliveryHint, + ); + const effectiveDeliveryFields = normalizeSessionDeliveryFields({ + deliveryContext: effectiveDelivery, + }); const nextEntryPatch: SessionEntry = { sessionId, updatedAt: now, @@ -573,11 +594,11 @@ export const agentHandlers: GatewayRequestHandlers = { systemSent: entry?.systemSent, sendPolicy: entry?.sendPolicy, skillsSnapshot: entry?.skillsSnapshot, - deliveryContext: deliveryFields.deliveryContext, - lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel, - lastTo: deliveryFields.lastTo ?? entry?.lastTo, - lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId, - lastThreadId: deliveryFields.lastThreadId ?? entry?.lastThreadId, + deliveryContext: effectiveDeliveryFields.deliveryContext, + lastChannel: effectiveDeliveryFields.lastChannel ?? entry?.lastChannel, + lastTo: effectiveDeliveryFields.lastTo ?? entry?.lastTo, + lastAccountId: effectiveDeliveryFields.lastAccountId ?? entry?.lastAccountId, + lastThreadId: effectiveDeliveryFields.lastThreadId ?? entry?.lastThreadId, modelOverride: entry?.modelOverride, providerOverride: entry?.providerOverride, label: labelValue, diff --git a/src/gateway/server.agent.subagent-delivery-context.test.ts b/src/gateway/server.agent.subagent-delivery-context.test.ts new file mode 100644 index 00000000000..38a80e02731 --- /dev/null +++ b/src/gateway/server.agent.subagent-delivery-context.test.ts @@ -0,0 +1,222 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterAll, beforeAll, describe, expect, test } from "vitest"; +import type { ChannelPlugin } from "../channels/plugins/types.js"; +import { createChannelTestPluginBase } from "../test-utils/channel-plugins.js"; +import { setRegistry } from "./server.agent.gateway-server-agent.mocks.js"; +import { createRegistry } from "./server.e2e-registry-helpers.js"; +import { + connectOk, + installGatewayTestHooks, + rpcReq, + startServerWithClient, + testState, + writeSessionStore, +} from "./test-helpers.js"; + +installGatewayTestHooks({ scope: "suite" }); + +let server: Awaited>["server"]; +let ws: Awaited>["ws"]; +let sessionStoreDir: string; +let sessionStorePath: string; + +const createStubChannelPlugin = (params: { + id: ChannelPlugin["id"]; + label: string; +}): ChannelPlugin => ({ + ...createChannelTestPluginBase({ + id: params.id, + label: params.label, + }), + outbound: { + deliveryMode: "direct", + resolveTarget: ({ to }) => { + const trimmed = to?.trim() ?? ""; + if (trimmed) { + return { ok: true, to: trimmed }; + } + return { ok: false, error: new Error(`missing target for ${params.id}`) }; + }, + sendText: async () => ({ channel: params.id, messageId: "msg-test" }), + sendMedia: async () => ({ channel: params.id, messageId: "msg-test" }), + }, +}); + +const defaultRegistry = createRegistry([ + { + pluginId: "slack", + source: "test", + plugin: createStubChannelPlugin({ id: "slack", label: "Slack" }), + }, +]); + +beforeAll(async () => { + const started = await startServerWithClient(); + server = started.server; + ws = started.ws; + await connectOk(ws); + sessionStoreDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-subagent-delivery-ctx-")); + sessionStorePath = path.join(sessionStoreDir, "sessions.json"); +}); + +afterAll(async () => { + ws.close(); + await server.close(); + await fs.rm(sessionStoreDir, { recursive: true, force: true }); +}); + +type StoredEntry = { + deliveryContext?: { channel?: string; to?: string; threadId?: string; accountId?: string }; + lastChannel?: string; + lastTo?: string; + lastThreadId?: string | number; + lastAccountId?: string; +}; + +describe("subagent session deliveryContext from spawn request params", () => { + test("new subagent session inherits deliveryContext from request channel/to/threadId", async () => { + setRegistry(defaultRegistry); + testState.sessionStorePath = sessionStorePath; + await writeSessionStore({ entries: {} }); + + const res = await rpcReq(ws, "agent", { + message: "[Subagent Task]: analyze data", + sessionKey: "agent:main:subagent:test-delivery-ctx", + channel: "slack", + to: "channel:C0AF8TW48UQ", + accountId: "default", + threadId: "1774374945.091819", + deliver: false, + idempotencyKey: "idem-subagent-delivery-ctx-1", + }); + expect(res.ok).toBe(true); + + const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record< + string, + StoredEntry + >; + const entry = stored["agent:main:subagent:test-delivery-ctx"]; + expect(entry).toBeDefined(); + expect(entry?.deliveryContext?.channel).toBe("slack"); + expect(entry?.deliveryContext?.to).toBe("channel:C0AF8TW48UQ"); + expect(entry?.deliveryContext?.threadId).toBe("1774374945.091819"); + expect(entry?.deliveryContext?.accountId).toBe("default"); + expect(entry?.lastChannel).toBe("slack"); + expect(entry?.lastTo).toBe("channel:C0AF8TW48UQ"); + }); + + test("existing session deliveryContext is NOT overwritten by request params", async () => { + setRegistry(defaultRegistry); + testState.sessionStorePath = sessionStorePath; + await writeSessionStore({ + entries: { + "agent:main:subagent:existing-ctx": { + sessionId: "sess-existing", + updatedAt: Date.now(), + deliveryContext: { + channel: "slack", + to: "user:U09U1LV7JDN", + accountId: "default", + threadId: "1771242986.529939", + }, + lastChannel: "slack", + lastTo: "user:U09U1LV7JDN", + lastAccountId: "default", + lastThreadId: "1771242986.529939", + }, + }, + }); + + const res = await rpcReq(ws, "agent", { + message: "follow-up", + sessionKey: "agent:main:subagent:existing-ctx", + channel: "slack", + to: "channel:C0AF8TW48UQ", + threadId: "9999999999.000000", + deliver: false, + idempotencyKey: "idem-subagent-delivery-ctx-2", + }); + expect(res.ok).toBe(true); + + const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record< + string, + StoredEntry + >; + const entry = stored["agent:main:subagent:existing-ctx"]; + expect(entry).toBeDefined(); + // The ORIGINAL deliveryContext should be preserved (primary wins in merge). + expect(entry?.deliveryContext?.to).toBe("user:U09U1LV7JDN"); + expect(entry?.deliveryContext?.threadId).toBe("1771242986.529939"); + expect(entry?.lastTo).toBe("user:U09U1LV7JDN"); + }); + + test("pre-patched subagent session (via sessions.patch) inherits deliveryContext from agent request", async () => { + // Simulates the real subagent spawn flow: spawnSubagentDirect calls sessions.patch + // first (to set spawnDepth, spawnedBy, etc.), then calls callSubagentGateway({method: "agent"}). + // The sessions.patch creates a partial entry without deliveryContext. + // The agent handler must seed deliveryContext from the request params. + setRegistry(defaultRegistry); + testState.sessionStorePath = sessionStorePath; + await writeSessionStore({ + entries: { + "agent:main:subagent:pre-patched": { + sessionId: "sess-pre-patched", + updatedAt: Date.now(), + spawnDepth: 1, + spawnedBy: "agent:main:slack:direct:u07fdr83w6n:thread:1775577152.364109", + }, + }, + }); + + const res = await rpcReq(ws, "agent", { + message: "[Subagent Task]: investigate data", + sessionKey: "agent:main:subagent:pre-patched", + channel: "slack", + to: "user:U07FDR83W6N", + accountId: "default", + threadId: "1775577152.364109", + deliver: false, + idempotencyKey: "idem-subagent-delivery-ctx-prepatched", + }); + expect(res.ok).toBe(true); + + const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record< + string, + StoredEntry + >; + const entry = stored["agent:main:subagent:pre-patched"]; + expect(entry).toBeDefined(); + expect(entry?.deliveryContext?.channel).toBe("slack"); + expect(entry?.deliveryContext?.to).toBe("user:U07FDR83W6N"); + expect(entry?.deliveryContext?.threadId).toBe("1775577152.364109"); + expect(entry?.deliveryContext?.accountId).toBe("default"); + expect(entry?.lastThreadId).toBe("1775577152.364109"); + }); + + test("request without to/threadId does not inject empty values", async () => { + setRegistry(defaultRegistry); + testState.sessionStorePath = sessionStorePath; + await writeSessionStore({ entries: {} }); + + const res = await rpcReq(ws, "agent", { + message: "internal task", + sessionKey: "agent:main:subagent:no-routing", + channel: "slack", + deliver: false, + idempotencyKey: "idem-subagent-delivery-ctx-3", + }); + expect(res.ok).toBe(true); + + const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record< + string, + StoredEntry + >; + const entry = stored["agent:main:subagent:no-routing"]; + expect(entry).toBeDefined(); + expect(entry?.deliveryContext?.channel).toBe("slack"); + expect(entry?.deliveryContext?.to).toBeUndefined(); + expect(entry?.deliveryContext?.threadId).toBeUndefined(); + }); +});