mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-13 02:01:16 +00:00
fix(agents,gateway): keep subagent announces in the original thread (#63143)
Merged via squash.
Prepared head SHA: 9aa5303b48
Co-authored-by: mariosousa-finn <244526439+mariosousa-finn@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
@@ -146,6 +146,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Sandbox/security: auto-derive CDP source-range from Docker network gateway and refuse to start the socat relay without one, so peer containers cannot reach CDP unauthenticated. (#61404) Thanks @dims.
|
||||
- Daemon/launchd: keep `openclaw gateway stop` persistent without uninstalling the macOS LaunchAgent, re-enable it on explicit restart or repair, and harden launchd label handling. (#64447) Thanks @ngutman.
|
||||
- Agents/Slack: preserve threaded announce delivery when `sessions.list` rows lack stored thread metadata by falling back to the thread id encoded in the session key. (#63143) Thanks @mariosousa-finn.
|
||||
|
||||
## 2026.4.9
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js";
|
||||
import { callGateway } from "../../gateway/call.js";
|
||||
import { parseThreadSessionSuffix } from "../../sessions/session-key-utils.js";
|
||||
import { normalizeOptionalStringifiedId } from "../../shared/string-coerce.js";
|
||||
import { SessionListRow } from "./sessions-helpers.js";
|
||||
import type { AnnounceTarget } from "./sessions-send-helpers.js";
|
||||
@@ -12,6 +13,10 @@ export async function resolveAnnounceTarget(params: {
|
||||
const parsed = resolveAnnounceTargetFromKey(params.sessionKey);
|
||||
const parsedDisplay = resolveAnnounceTargetFromKey(params.displayKey);
|
||||
const fallback = parsed ?? parsedDisplay ?? null;
|
||||
const fallbackThreadId =
|
||||
fallback?.threadId ??
|
||||
parseThreadSessionSuffix(params.sessionKey).threadId ??
|
||||
parseThreadSessionSuffix(params.displayKey).threadId;
|
||||
|
||||
if (fallback) {
|
||||
const normalized = normalizeChannelId(fallback.channel);
|
||||
@@ -55,7 +60,10 @@ export async function resolveAnnounceTarget(params: {
|
||||
(typeof match?.lastAccountId === "string" ? match.lastAccountId : undefined) ??
|
||||
(typeof origin?.accountId === "string" ? origin.accountId : undefined);
|
||||
const threadId = normalizeOptionalStringifiedId(
|
||||
deliveryContext?.threadId ?? match?.lastThreadId,
|
||||
deliveryContext?.threadId ??
|
||||
match?.lastThreadId ??
|
||||
origin?.threadId ??
|
||||
fallbackThreadId,
|
||||
);
|
||||
if (channel && to) {
|
||||
return { channel, to, accountId, threadId };
|
||||
|
||||
@@ -42,7 +42,6 @@ describe("runSessionsSendA2AFlow announce delivery", () => {
|
||||
roundOneReply: "Worker completed successfully",
|
||||
});
|
||||
|
||||
// Find the gateway send call (not the waitForAgentRun call)
|
||||
const sendCall = gatewayCalls.find((call) => call.method === "send");
|
||||
expect(sendCall).toBeDefined();
|
||||
const sendParams = sendCall?.params as Record<string, unknown>;
|
||||
|
||||
@@ -80,7 +80,6 @@ const installRegistry = async () => {
|
||||
},
|
||||
capabilities: { chatTypes: ["direct", "channel", "thread"] },
|
||||
messaging: {
|
||||
resolveSessionConversation: resolveSessionConversationStub,
|
||||
resolveSessionTarget: resolveSessionTargetStub,
|
||||
},
|
||||
config: {
|
||||
@@ -113,6 +112,30 @@ const installRegistry = async () => {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
pluginId: "slack",
|
||||
source: "test",
|
||||
plugin: {
|
||||
id: "slack",
|
||||
meta: {
|
||||
id: "slack",
|
||||
label: "Slack",
|
||||
selectionLabel: "Slack",
|
||||
docsPath: "/channels/slack",
|
||||
blurb: "Slack test stub.",
|
||||
preferSessionLookupForAnnounceTarget: true,
|
||||
},
|
||||
capabilities: { chatTypes: ["direct", "channel", "thread"] },
|
||||
messaging: {
|
||||
resolveSessionConversation: resolveSessionConversationStub,
|
||||
resolveSessionTarget: resolveSessionTargetStub,
|
||||
},
|
||||
config: {
|
||||
listAccountIds: () => ["default"],
|
||||
resolveAccount: () => ({}),
|
||||
},
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
};
|
||||
@@ -145,7 +168,7 @@ function expectWorkerTranscriptPath(
|
||||
) {
|
||||
const session = getFirstListedSession(result);
|
||||
expect(session).toMatchObject({ key: "agent:worker:main" });
|
||||
const transcriptPath = String(session?.transcriptPath ?? "");
|
||||
const transcriptPath = session?.transcriptPath ?? "";
|
||||
expect(path.normalize(transcriptPath)).toContain(path.normalize(params.containsPath));
|
||||
expect(transcriptPath).toMatch(new RegExp(`${params.sessionId}\\.jsonl$`));
|
||||
}
|
||||
@@ -336,6 +359,59 @@ describe("resolveAnnounceTarget", () => {
|
||||
threadId: "271",
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps threadId from sessions.list delivery context for announce delivery", async () => {
|
||||
callGatewayMock.mockResolvedValueOnce({
|
||||
sessions: [
|
||||
{
|
||||
key: "agent:main:whatsapp:group:123@g.us",
|
||||
deliveryContext: {
|
||||
channel: "whatsapp",
|
||||
to: "123@g.us",
|
||||
accountId: "work",
|
||||
threadId: "thread-77",
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const target = await resolveAnnounceTarget({
|
||||
sessionKey: "agent:main:whatsapp:group:123@g.us",
|
||||
displayKey: "agent:main:whatsapp:group:123@g.us",
|
||||
});
|
||||
expect(target).toEqual({
|
||||
channel: "whatsapp",
|
||||
to: "123@g.us",
|
||||
accountId: "work",
|
||||
threadId: "thread-77",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves threaded Slack session keys when sessions.list lacks stored thread metadata", async () => {
|
||||
callGatewayMock.mockResolvedValueOnce({
|
||||
sessions: [
|
||||
{
|
||||
key: "agent:main:slack:channel:C123:thread:1710000000.000100",
|
||||
deliveryContext: {
|
||||
channel: "slack",
|
||||
to: "channel:C123",
|
||||
accountId: "workspace",
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const target = await resolveAnnounceTarget({
|
||||
sessionKey: "agent:main:slack:channel:C123:thread:1710000000.000100",
|
||||
displayKey: "agent:main:slack:channel:C123:thread:1710000000.000100",
|
||||
});
|
||||
expect(target).toEqual({
|
||||
channel: "slack",
|
||||
to: "channel:C123",
|
||||
accountId: "workspace",
|
||||
threadId: "1710000000.000100",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("sessions_list gating", () => {
|
||||
|
||||
@@ -34,6 +34,7 @@ import {
|
||||
} from "../../shared/string-coerce.js";
|
||||
import { createRunningTaskRun } from "../../tasks/task-executor.js";
|
||||
import {
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
normalizeSessionDeliveryFields,
|
||||
} from "../../utils/delivery-context.js";
|
||||
@@ -563,6 +564,26 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
resolvedGroupChannel = resolvedGroupChannel || inheritedGroup?.groupChannel;
|
||||
resolvedGroupSpace = resolvedGroupSpace || inheritedGroup?.groupSpace;
|
||||
const deliveryFields = normalizeSessionDeliveryFields(entry);
|
||||
// When the session has no delivery context yet (e.g. a freshly-spawned subagent
|
||||
// with deliver: false), seed it from the request's channel/to/threadId params.
|
||||
// Without this, subagent sessions end up with deliveryContext: {channel: "slack"}
|
||||
// and no `to`/`threadId`, which causes announce delivery to either target the
|
||||
// wrong channel (when the parent's lastTo drifts) or fail entirely.
|
||||
const requestDeliveryHint = normalizeDeliveryContext({
|
||||
channel: request.channel?.trim(),
|
||||
to: request.to?.trim(),
|
||||
accountId: request.accountId?.trim(),
|
||||
// Pass threadId directly — normalizeDeliveryContext handles both
|
||||
// string and numeric threadIds (e.g., Matrix uses integers).
|
||||
threadId: request.threadId,
|
||||
});
|
||||
const effectiveDelivery = mergeDeliveryContext(
|
||||
deliveryFields.deliveryContext,
|
||||
requestDeliveryHint,
|
||||
);
|
||||
const effectiveDeliveryFields = normalizeSessionDeliveryFields({
|
||||
deliveryContext: effectiveDelivery,
|
||||
});
|
||||
const nextEntryPatch: SessionEntry = {
|
||||
sessionId,
|
||||
updatedAt: now,
|
||||
@@ -573,11 +594,11 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
systemSent: entry?.systemSent,
|
||||
sendPolicy: entry?.sendPolicy,
|
||||
skillsSnapshot: entry?.skillsSnapshot,
|
||||
deliveryContext: deliveryFields.deliveryContext,
|
||||
lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel,
|
||||
lastTo: deliveryFields.lastTo ?? entry?.lastTo,
|
||||
lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId,
|
||||
lastThreadId: deliveryFields.lastThreadId ?? entry?.lastThreadId,
|
||||
deliveryContext: effectiveDeliveryFields.deliveryContext,
|
||||
lastChannel: effectiveDeliveryFields.lastChannel ?? entry?.lastChannel,
|
||||
lastTo: effectiveDeliveryFields.lastTo ?? entry?.lastTo,
|
||||
lastAccountId: effectiveDeliveryFields.lastAccountId ?? entry?.lastAccountId,
|
||||
lastThreadId: effectiveDeliveryFields.lastThreadId ?? entry?.lastThreadId,
|
||||
modelOverride: entry?.modelOverride,
|
||||
providerOverride: entry?.providerOverride,
|
||||
label: labelValue,
|
||||
|
||||
222
src/gateway/server.agent.subagent-delivery-context.test.ts
Normal file
222
src/gateway/server.agent.subagent-delivery-context.test.ts
Normal file
@@ -0,0 +1,222 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, describe, expect, test } from "vitest";
|
||||
import type { ChannelPlugin } from "../channels/plugins/types.js";
|
||||
import { createChannelTestPluginBase } from "../test-utils/channel-plugins.js";
|
||||
import { setRegistry } from "./server.agent.gateway-server-agent.mocks.js";
|
||||
import { createRegistry } from "./server.e2e-registry-helpers.js";
|
||||
import {
|
||||
connectOk,
|
||||
installGatewayTestHooks,
|
||||
rpcReq,
|
||||
startServerWithClient,
|
||||
testState,
|
||||
writeSessionStore,
|
||||
} from "./test-helpers.js";
|
||||
|
||||
installGatewayTestHooks({ scope: "suite" });
|
||||
|
||||
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
|
||||
let ws: Awaited<ReturnType<typeof startServerWithClient>>["ws"];
|
||||
let sessionStoreDir: string;
|
||||
let sessionStorePath: string;
|
||||
|
||||
const createStubChannelPlugin = (params: {
|
||||
id: ChannelPlugin["id"];
|
||||
label: string;
|
||||
}): ChannelPlugin => ({
|
||||
...createChannelTestPluginBase({
|
||||
id: params.id,
|
||||
label: params.label,
|
||||
}),
|
||||
outbound: {
|
||||
deliveryMode: "direct",
|
||||
resolveTarget: ({ to }) => {
|
||||
const trimmed = to?.trim() ?? "";
|
||||
if (trimmed) {
|
||||
return { ok: true, to: trimmed };
|
||||
}
|
||||
return { ok: false, error: new Error(`missing target for ${params.id}`) };
|
||||
},
|
||||
sendText: async () => ({ channel: params.id, messageId: "msg-test" }),
|
||||
sendMedia: async () => ({ channel: params.id, messageId: "msg-test" }),
|
||||
},
|
||||
});
|
||||
|
||||
const defaultRegistry = createRegistry([
|
||||
{
|
||||
pluginId: "slack",
|
||||
source: "test",
|
||||
plugin: createStubChannelPlugin({ id: "slack", label: "Slack" }),
|
||||
},
|
||||
]);
|
||||
|
||||
beforeAll(async () => {
|
||||
const started = await startServerWithClient();
|
||||
server = started.server;
|
||||
ws = started.ws;
|
||||
await connectOk(ws);
|
||||
sessionStoreDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-subagent-delivery-ctx-"));
|
||||
sessionStorePath = path.join(sessionStoreDir, "sessions.json");
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
ws.close();
|
||||
await server.close();
|
||||
await fs.rm(sessionStoreDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
type StoredEntry = {
|
||||
deliveryContext?: { channel?: string; to?: string; threadId?: string; accountId?: string };
|
||||
lastChannel?: string;
|
||||
lastTo?: string;
|
||||
lastThreadId?: string | number;
|
||||
lastAccountId?: string;
|
||||
};
|
||||
|
||||
describe("subagent session deliveryContext from spawn request params", () => {
|
||||
test("new subagent session inherits deliveryContext from request channel/to/threadId", async () => {
|
||||
setRegistry(defaultRegistry);
|
||||
testState.sessionStorePath = sessionStorePath;
|
||||
await writeSessionStore({ entries: {} });
|
||||
|
||||
const res = await rpcReq(ws, "agent", {
|
||||
message: "[Subagent Task]: analyze data",
|
||||
sessionKey: "agent:main:subagent:test-delivery-ctx",
|
||||
channel: "slack",
|
||||
to: "channel:C0AF8TW48UQ",
|
||||
accountId: "default",
|
||||
threadId: "1774374945.091819",
|
||||
deliver: false,
|
||||
idempotencyKey: "idem-subagent-delivery-ctx-1",
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record<
|
||||
string,
|
||||
StoredEntry
|
||||
>;
|
||||
const entry = stored["agent:main:subagent:test-delivery-ctx"];
|
||||
expect(entry).toBeDefined();
|
||||
expect(entry?.deliveryContext?.channel).toBe("slack");
|
||||
expect(entry?.deliveryContext?.to).toBe("channel:C0AF8TW48UQ");
|
||||
expect(entry?.deliveryContext?.threadId).toBe("1774374945.091819");
|
||||
expect(entry?.deliveryContext?.accountId).toBe("default");
|
||||
expect(entry?.lastChannel).toBe("slack");
|
||||
expect(entry?.lastTo).toBe("channel:C0AF8TW48UQ");
|
||||
});
|
||||
|
||||
test("existing session deliveryContext is NOT overwritten by request params", async () => {
|
||||
setRegistry(defaultRegistry);
|
||||
testState.sessionStorePath = sessionStorePath;
|
||||
await writeSessionStore({
|
||||
entries: {
|
||||
"agent:main:subagent:existing-ctx": {
|
||||
sessionId: "sess-existing",
|
||||
updatedAt: Date.now(),
|
||||
deliveryContext: {
|
||||
channel: "slack",
|
||||
to: "user:U09U1LV7JDN",
|
||||
accountId: "default",
|
||||
threadId: "1771242986.529939",
|
||||
},
|
||||
lastChannel: "slack",
|
||||
lastTo: "user:U09U1LV7JDN",
|
||||
lastAccountId: "default",
|
||||
lastThreadId: "1771242986.529939",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const res = await rpcReq(ws, "agent", {
|
||||
message: "follow-up",
|
||||
sessionKey: "agent:main:subagent:existing-ctx",
|
||||
channel: "slack",
|
||||
to: "channel:C0AF8TW48UQ",
|
||||
threadId: "9999999999.000000",
|
||||
deliver: false,
|
||||
idempotencyKey: "idem-subagent-delivery-ctx-2",
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record<
|
||||
string,
|
||||
StoredEntry
|
||||
>;
|
||||
const entry = stored["agent:main:subagent:existing-ctx"];
|
||||
expect(entry).toBeDefined();
|
||||
// The ORIGINAL deliveryContext should be preserved (primary wins in merge).
|
||||
expect(entry?.deliveryContext?.to).toBe("user:U09U1LV7JDN");
|
||||
expect(entry?.deliveryContext?.threadId).toBe("1771242986.529939");
|
||||
expect(entry?.lastTo).toBe("user:U09U1LV7JDN");
|
||||
});
|
||||
|
||||
test("pre-patched subagent session (via sessions.patch) inherits deliveryContext from agent request", async () => {
|
||||
// Simulates the real subagent spawn flow: spawnSubagentDirect calls sessions.patch
|
||||
// first (to set spawnDepth, spawnedBy, etc.), then calls callSubagentGateway({method: "agent"}).
|
||||
// The sessions.patch creates a partial entry without deliveryContext.
|
||||
// The agent handler must seed deliveryContext from the request params.
|
||||
setRegistry(defaultRegistry);
|
||||
testState.sessionStorePath = sessionStorePath;
|
||||
await writeSessionStore({
|
||||
entries: {
|
||||
"agent:main:subagent:pre-patched": {
|
||||
sessionId: "sess-pre-patched",
|
||||
updatedAt: Date.now(),
|
||||
spawnDepth: 1,
|
||||
spawnedBy: "agent:main:slack:direct:u07fdr83w6n:thread:1775577152.364109",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const res = await rpcReq(ws, "agent", {
|
||||
message: "[Subagent Task]: investigate data",
|
||||
sessionKey: "agent:main:subagent:pre-patched",
|
||||
channel: "slack",
|
||||
to: "user:U07FDR83W6N",
|
||||
accountId: "default",
|
||||
threadId: "1775577152.364109",
|
||||
deliver: false,
|
||||
idempotencyKey: "idem-subagent-delivery-ctx-prepatched",
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record<
|
||||
string,
|
||||
StoredEntry
|
||||
>;
|
||||
const entry = stored["agent:main:subagent:pre-patched"];
|
||||
expect(entry).toBeDefined();
|
||||
expect(entry?.deliveryContext?.channel).toBe("slack");
|
||||
expect(entry?.deliveryContext?.to).toBe("user:U07FDR83W6N");
|
||||
expect(entry?.deliveryContext?.threadId).toBe("1775577152.364109");
|
||||
expect(entry?.deliveryContext?.accountId).toBe("default");
|
||||
expect(entry?.lastThreadId).toBe("1775577152.364109");
|
||||
});
|
||||
|
||||
test("request without to/threadId does not inject empty values", async () => {
|
||||
setRegistry(defaultRegistry);
|
||||
testState.sessionStorePath = sessionStorePath;
|
||||
await writeSessionStore({ entries: {} });
|
||||
|
||||
const res = await rpcReq(ws, "agent", {
|
||||
message: "internal task",
|
||||
sessionKey: "agent:main:subagent:no-routing",
|
||||
channel: "slack",
|
||||
deliver: false,
|
||||
idempotencyKey: "idem-subagent-delivery-ctx-3",
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record<
|
||||
string,
|
||||
StoredEntry
|
||||
>;
|
||||
const entry = stored["agent:main:subagent:no-routing"];
|
||||
expect(entry).toBeDefined();
|
||||
expect(entry?.deliveryContext?.channel).toBe("slack");
|
||||
expect(entry?.deliveryContext?.to).toBeUndefined();
|
||||
expect(entry?.deliveryContext?.threadId).toBeUndefined();
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user