slack: persist thread participation best-effort (#75583)

This commit is contained in:
Alex Knight
2026-05-01 22:10:09 +10:00
committed by GitHub
parent c07f29bcf7
commit d0ec3d1f09
5 changed files with 196 additions and 4 deletions

View File

@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
- macOS app: move recent session context rows into a Context submenu while keeping usage and cost details root-level, so the menu bar companion stays compact with many active sessions. Thanks @guti.
- Gateway/SDK: add SDK-facing tools.invoke RPC with shared HTTP policy, typed approval/refusal results, and SDK helper support. Refs #74705. Thanks @BunsDev and @ai-hpc.
- Messages/docs: clarify that `BodyForAgent` is the primary inbound model text while `Body` is the legacy envelope fallback, and add Signal coverage so channel hardening patches target the real prompt path. Refs #66198. Thanks @defonota3box.
- Slack: keep track of bot-participated threads across restarts, so ongoing threaded conversations can continue auto-replying after the Gateway is restarted. Thanks @amknight.
- Control UI/Usage: add UTC quarter-hour token buckets for the Usage Mosaic and reuse them for hour filtering, keeping the legacy session-span fallback for older summaries. (#74337) Thanks @konanok.
- BlueBubbles: add opt-in `channels.bluebubbles.replyContextApiFallback` that fetches the original message from the BlueBubbles HTTP API when the in-memory reply-context cache misses (multi-instance deployments sharing one BB account, post-restart, after long-lived TTL/LRU eviction). Off by default; channel-level setting propagates to accounts that omit the flag through `mergeAccountConfig`; routed through the typed `BlueBubblesClient` so every fetch is SSRF-guarded by the same three-mode policy as every other BB client request; reply-id shape is validated and part-index prefixes (`p:0/<guid>`) are stripped before the request; concurrent webhooks for the same `replyToId` coalesce into one fetch and successful responses populate the reply cache for subsequent hits. Also promotes BlueBubbles attachment download failures from verbose to runtime error so silently-dropped inbound images are visible at default log level, and extends `sanitizeForLog` to redact `?password=…`/`?token=…` query params and `Authorization:` headers before they reach the log sink (CWE-532). (#71820) Thanks @coletebou and @zqchris.
- CLI/proxy: add `openclaw proxy validate` so operators can verify effective proxy configuration, proxy reachability, and expected allow/deny destination behavior before deploying proxy-routed OpenClaw commands. (#73438) Thanks @jesse-merhi.

View File

@@ -1170,7 +1170,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
// or draft stream). Falls back to statusThreadTs for edge cases.
const participationThreadTs = usedReplyThreadTs ?? statusThreadTs;
if (anyReplyDelivered && participationThreadTs) {
recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs);
recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs, {
agentId: route.agentId,
});
}
if (!anyReplyDelivered) {

View File

@@ -33,7 +33,7 @@ import {
import type { ResolvedSlackAccount } from "../../accounts.js";
import { reactSlackMessage } from "../../actions.js";
import { formatSlackFileReference } from "../../file-reference.js";
import { hasSlackThreadParticipation } from "../../sent-thread-cache.js";
import { hasSlackThreadParticipationWithPersistence } from "../../sent-thread-cache.js";
import type { SlackMessageEvent } from "../../types.js";
import {
normalizeAllowListLower,
@@ -361,7 +361,11 @@ export async function prepareSlackMessage(params: {
...implicitMentionKindWhen("reply_to_bot", message.parent_user_id === ctx.botUserId),
...implicitMentionKindWhen(
"bot_thread_participant",
hasSlackThreadParticipation(account.accountId, message.channel, message.thread_ts),
await hasSlackThreadParticipationWithPersistence({
accountId: account.accountId,
channelId: message.channel,
threadTs: message.thread_ts,
}),
),
];

View File

@@ -1,14 +1,17 @@
import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures";
import { afterEach, describe, expect, it, vi } from "vitest";
import { clearSlackRuntime, setSlackRuntime } from "./runtime.js";
import {
clearSlackThreadParticipationCache,
hasSlackThreadParticipation,
hasSlackThreadParticipationWithPersistence,
recordSlackThreadParticipation,
} from "./sent-thread-cache.js";
describe("slack sent-thread-cache", () => {
afterEach(() => {
clearSlackThreadParticipationCache();
clearSlackRuntime();
vi.restoreAllMocks();
});
@@ -88,4 +91,75 @@ describe("slack sent-thread-cache", () => {
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000000")).toBe(false);
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.005000")).toBe(true);
});
it("writes and reads persistent thread participation when runtime state is available", async () => {
const register = vi.fn().mockResolvedValue(undefined);
const lookup = vi.fn().mockResolvedValue({ repliedAt: 123 });
const openKeyedStore = vi.fn(() => ({
register,
lookup,
consume: vi.fn(),
delete: vi.fn(),
entries: vi.fn(),
clear: vi.fn(),
}));
setSlackRuntime({
state: { openKeyedStore },
logging: { getChildLogger: () => ({ warn: vi.fn() }) },
} as never);
recordSlackThreadParticipation("A1", "C123", "1700000000.000002");
await vi.waitFor(() => expect(register).toHaveBeenCalledTimes(1));
expect(register).toHaveBeenCalledWith(
"A1:C123:1700000000.000002",
expect.objectContaining({ repliedAt: expect.any(Number) }),
);
clearSlackThreadParticipationCache();
await expect(
hasSlackThreadParticipationWithPersistence({
accountId: "A1",
channelId: "C123",
threadTs: "1700000000.000002",
}),
).resolves.toBe(true);
expect(openKeyedStore).toHaveBeenCalledTimes(2);
expect(lookup).toHaveBeenCalledWith("A1:C123:1700000000.000002");
lookup.mockClear();
await expect(
hasSlackThreadParticipationWithPersistence({
accountId: "A1",
channelId: "C123",
threadTs: "1700000000.000002",
}),
).resolves.toBe(true);
expect(lookup).not.toHaveBeenCalled();
});
it("falls back to in-memory thread participation when persistent state cannot open", async () => {
const warn = vi.fn();
setSlackRuntime({
state: {
openKeyedStore: vi.fn(() => {
throw new Error("sqlite unavailable");
}),
},
logging: { getChildLogger: () => ({ warn }) },
} as never);
recordSlackThreadParticipation("A1", "C123", "1700000000.000003");
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000003")).toBe(true);
clearSlackThreadParticipationCache();
await expect(
hasSlackThreadParticipationWithPersistence({
accountId: "A1",
channelId: "C123",
threadTs: "1700000000.000003",
}),
).resolves.toBe(false);
expect(warn).toHaveBeenCalled();
});
});

View File

@@ -1,4 +1,5 @@
import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { getOptionalSlackRuntime } from "./runtime.js";
/**
* In-memory cache of Slack threads the bot has participated in.
@@ -8,6 +9,22 @@ import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours
const MAX_ENTRIES = 5000;
const PERSISTENT_MAX_ENTRIES = 1000;
const PERSISTENT_NAMESPACE = "slack.thread-participation";
type SlackThreadParticipationRecord = {
agentId?: string;
repliedAt: number;
};
type SlackThreadParticipationStore = {
register(
key: string,
value: SlackThreadParticipationRecord,
opts?: { ttlMs?: number },
): Promise<void>;
lookup(key: string): Promise<SlackThreadParticipationRecord | undefined>;
};
/**
* Keep Slack thread participation shared across bundled chunks so thread
@@ -19,19 +36,92 @@ const threadParticipation = resolveGlobalDedupeCache(SLACK_THREAD_PARTICIPATION_
maxSize: MAX_ENTRIES,
});
let persistentStore: SlackThreadParticipationStore | undefined;
let persistentStoreDisabled = false;
function makeKey(accountId: string, channelId: string, threadTs: string): string {
return `${accountId}:${channelId}:${threadTs}`;
}
function reportPersistentThreadParticipationError(error: unknown): void {
try {
getOptionalSlackRuntime()
?.logging.getChildLogger({ plugin: "slack", feature: "thread-participation-state" })
.warn("Slack persistent thread participation state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Slack message handling.
}
}
function disablePersistentThreadParticipation(error: unknown): void {
persistentStoreDisabled = true;
persistentStore = undefined;
reportPersistentThreadParticipationError(error);
}
function getPersistentThreadParticipationStore(): SlackThreadParticipationStore | undefined {
if (persistentStoreDisabled) {
return undefined;
}
if (persistentStore) {
return persistentStore;
}
const runtime = getOptionalSlackRuntime();
if (!runtime) {
return undefined;
}
try {
persistentStore = runtime.state.openKeyedStore<SlackThreadParticipationRecord>({
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
defaultTtlMs: TTL_MS,
});
return persistentStore;
} catch (error) {
disablePersistentThreadParticipation(error);
return undefined;
}
}
function rememberPersistentThreadParticipation(params: { key: string; agentId?: string }): void {
const store = getPersistentThreadParticipationStore();
if (!store) {
return;
}
void store
.register(params.key, {
// Stored for future per-agent thread routing; current reads only need presence.
...(params.agentId ? { agentId: params.agentId } : {}),
repliedAt: Date.now(),
})
.catch(disablePersistentThreadParticipation);
}
async function lookupPersistentThreadParticipation(key: string): Promise<boolean> {
const store = getPersistentThreadParticipationStore();
if (!store) {
return false;
}
try {
return Boolean(await store.lookup(key));
} catch (error) {
disablePersistentThreadParticipation(error);
return false;
}
}
export function recordSlackThreadParticipation(
accountId: string,
channelId: string,
threadTs: string,
opts?: { agentId?: string },
): void {
if (!accountId || !channelId || !threadTs) {
return;
}
threadParticipation.check(makeKey(accountId, channelId, threadTs));
const key = makeKey(accountId, channelId, threadTs);
threadParticipation.check(key);
rememberPersistentThreadParticipation({ key, agentId: opts?.agentId });
}
export function hasSlackThreadParticipation(
@@ -45,6 +135,27 @@ export function hasSlackThreadParticipation(
return threadParticipation.peek(makeKey(accountId, channelId, threadTs));
}
export async function hasSlackThreadParticipationWithPersistence(params: {
accountId: string;
channelId: string;
threadTs: string;
}): Promise<boolean> {
if (!params.accountId || !params.channelId || !params.threadTs) {
return false;
}
const key = makeKey(params.accountId, params.channelId, params.threadTs);
if (threadParticipation.peek(key)) {
return true;
}
const found = await lookupPersistentThreadParticipation(key);
if (found) {
threadParticipation.check(key);
}
return found;
}
export function clearSlackThreadParticipationCache(): void {
threadParticipation.clear();
persistentStore = undefined;
persistentStoreDisabled = false;
}