diff --git a/CHANGELOG.md b/CHANGELOG.md index 289674db4f9..085a899e2d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai - macOS app: move recent session context rows into a Context submenu while keeping usage and cost details root-level, so the menu bar companion stays compact with many active sessions. Thanks @guti. - Gateway/SDK: add SDK-facing tools.invoke RPC with shared HTTP policy, typed approval/refusal results, and SDK helper support. Refs #74705. Thanks @BunsDev and @ai-hpc. - Messages/docs: clarify that `BodyForAgent` is the primary inbound model text while `Body` is the legacy envelope fallback, and add Signal coverage so channel hardening patches target the real prompt path. Refs #66198. Thanks @defonota3box. +- Slack: keep track of bot-participated threads across restarts, so ongoing threaded conversations can continue auto-replying after the Gateway is restarted. Thanks @amknight. - Control UI/Usage: add UTC quarter-hour token buckets for the Usage Mosaic and reuse them for hour filtering, keeping the legacy session-span fallback for older summaries. (#74337) Thanks @konanok. - BlueBubbles: add opt-in `channels.bluebubbles.replyContextApiFallback` that fetches the original message from the BlueBubbles HTTP API when the in-memory reply-context cache misses (multi-instance deployments sharing one BB account, post-restart, after long-lived TTL/LRU eviction). Off by default; channel-level setting propagates to accounts that omit the flag through `mergeAccountConfig`; routed through the typed `BlueBubblesClient` so every fetch is SSRF-guarded by the same three-mode policy as every other BB client request; reply-id shape is validated and part-index prefixes (`p:0/`) are stripped before the request; concurrent webhooks for the same `replyToId` coalesce into one fetch and successful responses populate the reply cache for subsequent hits. Also promotes BlueBubbles attachment download failures from verbose to runtime error so silently-dropped inbound images are visible at default log level, and extends `sanitizeForLog` to redact `?password=…`/`?token=…` query params and `Authorization:` headers before they reach the log sink (CWE-532). (#71820) Thanks @coletebou and @zqchris. - CLI/proxy: add `openclaw proxy validate` so operators can verify effective proxy configuration, proxy reachability, and expected allow/deny destination behavior before deploying proxy-routed OpenClaw commands. (#73438) Thanks @jesse-merhi. diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index ceab70cd4da..c406c4e44c8 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -1170,7 +1170,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag // or draft stream). Falls back to statusThreadTs for edge cases. const participationThreadTs = usedReplyThreadTs ?? statusThreadTs; if (anyReplyDelivered && participationThreadTs) { - recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs); + recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs, { + agentId: route.agentId, + }); } if (!anyReplyDelivered) { diff --git a/extensions/slack/src/monitor/message-handler/prepare.ts b/extensions/slack/src/monitor/message-handler/prepare.ts index 77f0b9615ba..b9844584fa6 100644 --- a/extensions/slack/src/monitor/message-handler/prepare.ts +++ b/extensions/slack/src/monitor/message-handler/prepare.ts @@ -33,7 +33,7 @@ import { import type { ResolvedSlackAccount } from "../../accounts.js"; import { reactSlackMessage } from "../../actions.js"; import { formatSlackFileReference } from "../../file-reference.js"; -import { hasSlackThreadParticipation } from "../../sent-thread-cache.js"; +import { hasSlackThreadParticipationWithPersistence } from "../../sent-thread-cache.js"; import type { SlackMessageEvent } from "../../types.js"; import { normalizeAllowListLower, @@ -361,7 +361,11 @@ export async function prepareSlackMessage(params: { ...implicitMentionKindWhen("reply_to_bot", message.parent_user_id === ctx.botUserId), ...implicitMentionKindWhen( "bot_thread_participant", - hasSlackThreadParticipation(account.accountId, message.channel, message.thread_ts), + await hasSlackThreadParticipationWithPersistence({ + accountId: account.accountId, + channelId: message.channel, + threadTs: message.thread_ts, + }), ), ]; diff --git a/extensions/slack/src/sent-thread-cache.test.ts b/extensions/slack/src/sent-thread-cache.test.ts index a6d45d85e99..f3191ee5f0c 100644 --- a/extensions/slack/src/sent-thread-cache.test.ts +++ b/extensions/slack/src/sent-thread-cache.test.ts @@ -1,14 +1,17 @@ import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { clearSlackRuntime, setSlackRuntime } from "./runtime.js"; import { clearSlackThreadParticipationCache, hasSlackThreadParticipation, + hasSlackThreadParticipationWithPersistence, recordSlackThreadParticipation, } from "./sent-thread-cache.js"; describe("slack sent-thread-cache", () => { afterEach(() => { clearSlackThreadParticipationCache(); + clearSlackRuntime(); vi.restoreAllMocks(); }); @@ -88,4 +91,75 @@ describe("slack sent-thread-cache", () => { expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000000")).toBe(false); expect(hasSlackThreadParticipation("A1", "C123", "1700000000.005000")).toBe(true); }); + + it("writes and reads persistent thread participation when runtime state is available", async () => { + const register = vi.fn().mockResolvedValue(undefined); + const lookup = vi.fn().mockResolvedValue({ repliedAt: 123 }); + const openKeyedStore = vi.fn(() => ({ + register, + lookup, + consume: vi.fn(), + delete: vi.fn(), + entries: vi.fn(), + clear: vi.fn(), + })); + setSlackRuntime({ + state: { openKeyedStore }, + logging: { getChildLogger: () => ({ warn: vi.fn() }) }, + } as never); + + recordSlackThreadParticipation("A1", "C123", "1700000000.000002"); + + await vi.waitFor(() => expect(register).toHaveBeenCalledTimes(1)); + expect(register).toHaveBeenCalledWith( + "A1:C123:1700000000.000002", + expect.objectContaining({ repliedAt: expect.any(Number) }), + ); + + clearSlackThreadParticipationCache(); + await expect( + hasSlackThreadParticipationWithPersistence({ + accountId: "A1", + channelId: "C123", + threadTs: "1700000000.000002", + }), + ).resolves.toBe(true); + expect(openKeyedStore).toHaveBeenCalledTimes(2); + expect(lookup).toHaveBeenCalledWith("A1:C123:1700000000.000002"); + + lookup.mockClear(); + await expect( + hasSlackThreadParticipationWithPersistence({ + accountId: "A1", + channelId: "C123", + threadTs: "1700000000.000002", + }), + ).resolves.toBe(true); + expect(lookup).not.toHaveBeenCalled(); + }); + + it("falls back to in-memory thread participation when persistent state cannot open", async () => { + const warn = vi.fn(); + setSlackRuntime({ + state: { + openKeyedStore: vi.fn(() => { + throw new Error("sqlite unavailable"); + }), + }, + logging: { getChildLogger: () => ({ warn }) }, + } as never); + + recordSlackThreadParticipation("A1", "C123", "1700000000.000003"); + expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000003")).toBe(true); + + clearSlackThreadParticipationCache(); + await expect( + hasSlackThreadParticipationWithPersistence({ + accountId: "A1", + channelId: "C123", + threadTs: "1700000000.000003", + }), + ).resolves.toBe(false); + expect(warn).toHaveBeenCalled(); + }); }); diff --git a/extensions/slack/src/sent-thread-cache.ts b/extensions/slack/src/sent-thread-cache.ts index 0e896283fb4..d7836cf57d9 100644 --- a/extensions/slack/src/sent-thread-cache.ts +++ b/extensions/slack/src/sent-thread-cache.ts @@ -1,4 +1,5 @@ import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; +import { getOptionalSlackRuntime } from "./runtime.js"; /** * In-memory cache of Slack threads the bot has participated in. @@ -8,6 +9,22 @@ import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours const MAX_ENTRIES = 5000; +const PERSISTENT_MAX_ENTRIES = 1000; +const PERSISTENT_NAMESPACE = "slack.thread-participation"; + +type SlackThreadParticipationRecord = { + agentId?: string; + repliedAt: number; +}; + +type SlackThreadParticipationStore = { + register( + key: string, + value: SlackThreadParticipationRecord, + opts?: { ttlMs?: number }, + ): Promise; + lookup(key: string): Promise; +}; /** * Keep Slack thread participation shared across bundled chunks so thread @@ -19,19 +36,92 @@ const threadParticipation = resolveGlobalDedupeCache(SLACK_THREAD_PARTICIPATION_ maxSize: MAX_ENTRIES, }); +let persistentStore: SlackThreadParticipationStore | undefined; +let persistentStoreDisabled = false; + function makeKey(accountId: string, channelId: string, threadTs: string): string { return `${accountId}:${channelId}:${threadTs}`; } +function reportPersistentThreadParticipationError(error: unknown): void { + try { + getOptionalSlackRuntime() + ?.logging.getChildLogger({ plugin: "slack", feature: "thread-participation-state" }) + .warn("Slack persistent thread participation state failed", { error: String(error) }); + } catch { + // Best effort only: persistent state must never break Slack message handling. + } +} + +function disablePersistentThreadParticipation(error: unknown): void { + persistentStoreDisabled = true; + persistentStore = undefined; + reportPersistentThreadParticipationError(error); +} + +function getPersistentThreadParticipationStore(): SlackThreadParticipationStore | undefined { + if (persistentStoreDisabled) { + return undefined; + } + if (persistentStore) { + return persistentStore; + } + const runtime = getOptionalSlackRuntime(); + if (!runtime) { + return undefined; + } + try { + persistentStore = runtime.state.openKeyedStore({ + namespace: PERSISTENT_NAMESPACE, + maxEntries: PERSISTENT_MAX_ENTRIES, + defaultTtlMs: TTL_MS, + }); + return persistentStore; + } catch (error) { + disablePersistentThreadParticipation(error); + return undefined; + } +} + +function rememberPersistentThreadParticipation(params: { key: string; agentId?: string }): void { + const store = getPersistentThreadParticipationStore(); + if (!store) { + return; + } + void store + .register(params.key, { + // Stored for future per-agent thread routing; current reads only need presence. + ...(params.agentId ? { agentId: params.agentId } : {}), + repliedAt: Date.now(), + }) + .catch(disablePersistentThreadParticipation); +} + +async function lookupPersistentThreadParticipation(key: string): Promise { + const store = getPersistentThreadParticipationStore(); + if (!store) { + return false; + } + try { + return Boolean(await store.lookup(key)); + } catch (error) { + disablePersistentThreadParticipation(error); + return false; + } +} + export function recordSlackThreadParticipation( accountId: string, channelId: string, threadTs: string, + opts?: { agentId?: string }, ): void { if (!accountId || !channelId || !threadTs) { return; } - threadParticipation.check(makeKey(accountId, channelId, threadTs)); + const key = makeKey(accountId, channelId, threadTs); + threadParticipation.check(key); + rememberPersistentThreadParticipation({ key, agentId: opts?.agentId }); } export function hasSlackThreadParticipation( @@ -45,6 +135,27 @@ export function hasSlackThreadParticipation( return threadParticipation.peek(makeKey(accountId, channelId, threadTs)); } +export async function hasSlackThreadParticipationWithPersistence(params: { + accountId: string; + channelId: string; + threadTs: string; +}): Promise { + if (!params.accountId || !params.channelId || !params.threadTs) { + return false; + } + const key = makeKey(params.accountId, params.channelId, params.threadTs); + if (threadParticipation.peek(key)) { + return true; + } + const found = await lookupPersistentThreadParticipation(key); + if (found) { + threadParticipation.check(key); + } + return found; +} + export function clearSlackThreadParticipationCache(): void { threadParticipation.clear(); + persistentStore = undefined; + persistentStoreDisabled = false; }