From a2b2c4a76c79d0305f747e8155d2ea1cba01e17e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 30 May 2026 21:08:39 +0100 Subject: [PATCH] refactor(msteams): persist conversation and poll stores in sqlite Move MSTeams conversation and poll plugin-local stores to plugin-state SQLite. Legacy JSON stores import once without overwriting existing SQLite state; conversation and poll IDs are hashed for plugin-state keys; poll votes are sharded with bounded row-cap headroom and prune cleanup; MSTeams docs now describe SQLite storage. SSO and delegated token stores are unchanged. Verified with focused MSTeams tests, docs sanity, autoreview, Testbox check:changed, and green PR CI. --- docs/channels/msteams.md | 5 +- .../msteams/src/conversation-store-fs.test.ts | 81 ----- .../msteams/src/conversation-store-fs.ts | 149 --------- .../src/conversation-store-state.test.ts | 266 +++++++++++++++ .../msteams/src/conversation-store-state.ts | 296 +++++++++++++++++ .../src/conversation-store.shared.test.ts | 10 +- .../src/graph-group-management.test.ts | 4 +- .../src/graph-messages.test-helpers.ts | 4 +- extensions/msteams/src/graph-messages.ts | 4 +- extensions/msteams/src/monitor.ts | 8 +- extensions/msteams/src/outbound.test.ts | 2 +- extensions/msteams/src/outbound.ts | 4 +- .../msteams/src/pending-uploads-fs.test.ts | 2 +- extensions/msteams/src/polls.test.ts | 314 +++++++++++++++++- extensions/msteams/src/polls.ts | 305 ++++++++++++++--- extensions/msteams/src/send-context.test.ts | 4 +- extensions/msteams/src/send-context.ts | 4 +- extensions/msteams/src/sqlite-state.ts | 89 +++++ .../runtime.ts} | 6 +- 19 files changed, 1257 insertions(+), 300 deletions(-) delete mode 100644 extensions/msteams/src/conversation-store-fs.test.ts delete mode 100644 extensions/msteams/src/conversation-store-fs.ts create mode 100644 extensions/msteams/src/conversation-store-state.test.ts create mode 100644 extensions/msteams/src/conversation-store-state.ts create mode 100644 extensions/msteams/src/sqlite-state.ts rename extensions/msteams/src/{test-runtime.ts => test-support/runtime.ts} (57%) diff --git a/docs/channels/msteams.md b/docs/channels/msteams.md index 0166280dcf6..4c4b03833c2 100644 --- a/docs/channels/msteams.md +++ b/docs/channels/msteams.md @@ -915,9 +915,10 @@ Uploaded files are stored in a `/OpenClawShared/` folder in the configured Share OpenClaw sends Teams polls as Adaptive Cards (there is no native Teams poll API). - CLI: `openclaw message poll --channel msteams --target conversation: ...` -- Votes are recorded by the gateway in `~/.openclaw/msteams-polls.json`. +- Votes are recorded by the gateway in OpenClaw plugin-state SQLite under `state/openclaw.sqlite`. +- Existing `msteams-polls.json` files are imported once when the MSTeams plugin starts. - The gateway must stay online to record votes. -- Polls do not auto-post result summaries yet (inspect the store file if needed). +- Polls do not auto-post result summaries yet, and there is no supported poll-results CLI yet. ## Presentation cards diff --git a/extensions/msteams/src/conversation-store-fs.test.ts b/extensions/msteams/src/conversation-store-fs.test.ts deleted file mode 100644 index e45d31e4d48..00000000000 --- a/extensions/msteams/src/conversation-store-fs.test.ts +++ /dev/null @@ -1,81 +0,0 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; -import { beforeEach, describe, expect, it } from "vitest"; -import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; -import type { StoredConversationReference } from "./conversation-store.js"; -import { setMSTeamsRuntime } from "./runtime.js"; -import { msteamsRuntimeStub } from "./test-runtime.js"; - -describe("msteams conversation store (fs-only)", () => { - beforeEach(() => { - setMSTeamsRuntime(msteamsRuntimeStub); - }); - - it("filters and prunes expired entries while preserving legacy entries without lastSeenAt", async () => { - const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); - - const env: NodeJS.ProcessEnv = { - ...process.env, - OPENCLAW_STATE_DIR: stateDir, - }; - - const store = createMSTeamsConversationStoreFs({ env, ttlMs: 1_000 }); - - const ref: StoredConversationReference = { - conversation: { id: "19:active@thread.tacv2" }, - channelId: "msteams", - serviceUrl: "https://service.example.com", - user: { id: "u1", aadObjectId: "aad1" }, - }; - - await store.upsert("19:active@thread.tacv2", ref); - - const filePath = path.join(stateDir, "msteams-conversations.json"); - const raw = await fs.promises.readFile(filePath, "utf-8"); - const json = JSON.parse(raw) as { - version: number; - conversations: Record; - }; - - json.conversations["19:old@thread.tacv2"] = { - ...ref, - conversation: { id: "19:old@thread.tacv2" }, - lastSeenAt: new Date(Date.now() - 60_000).toISOString(), - }; - - json.conversations["19:legacy@thread.tacv2"] = { - ...ref, - conversation: { id: "19:legacy@thread.tacv2" }, - }; - - await fs.promises.writeFile(filePath, `${JSON.stringify(json, null, 2)}\n`); - - const list = await store.list(); - const ids = list.map((entry) => entry.conversationId).toSorted(); - expect(ids).toEqual(["19:active@thread.tacv2", "19:legacy@thread.tacv2"]); - - expect(await store.get("19:old@thread.tacv2")).toBeNull(); - const legacyConversation = await store.get("19:legacy@thread.tacv2"); - if (!legacyConversation) { - throw new Error("expected migrated legacy Teams conversation"); - } - if (!legacyConversation.conversation) { - throw new Error("expected migrated legacy Teams conversation payload"); - } - expect(legacyConversation.conversation.id).toBe("19:legacy@thread.tacv2"); - - await store.upsert("19:new@thread.tacv2", { - ...ref, - conversation: { id: "19:new@thread.tacv2" }, - }); - - const rawAfter = await fs.promises.readFile(filePath, "utf-8"); - const jsonAfter = JSON.parse(rawAfter) as typeof json; - expect(Object.keys(jsonAfter.conversations).toSorted()).toEqual([ - "19:active@thread.tacv2", - "19:legacy@thread.tacv2", - "19:new@thread.tacv2", - ]); - }); -}); diff --git a/extensions/msteams/src/conversation-store-fs.ts b/extensions/msteams/src/conversation-store-fs.ts deleted file mode 100644 index fc21ae38bed..00000000000 --- a/extensions/msteams/src/conversation-store-fs.ts +++ /dev/null @@ -1,149 +0,0 @@ -import { - findPreferredDmConversationByUserId, - mergeStoredConversationReference, - normalizeStoredConversationId, - parseStoredConversationTimestamp, - toConversationStoreEntries, -} from "./conversation-store-helpers.js"; -import type { - MSTeamsConversationStore, - MSTeamsConversationStoreEntry, - StoredConversationReference, -} from "./conversation-store.js"; -import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js"; - -type ConversationStoreData = { - version: 1; - conversations: Record; -}; - -const STORE_FILENAME = "msteams-conversations.json"; -const MAX_CONVERSATIONS = 1000; -const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000; - -function pruneToLimit(conversations: Record) { - const entries = Object.entries(conversations); - if (entries.length <= MAX_CONVERSATIONS) { - return conversations; - } - - entries.sort((a, b) => { - const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0; - const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0; - return aTs - bTs; - }); - - const keep = entries.slice(entries.length - MAX_CONVERSATIONS); - return Object.fromEntries(keep); -} - -function pruneExpired( - conversations: Record, - nowMs: number, - ttlMs: number, -) { - let removed = false; - const kept: typeof conversations = {}; - for (const [conversationId, reference] of Object.entries(conversations)) { - const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt); - // Preserve legacy entries that have no lastSeenAt until they're seen again. - if (lastSeenAt != null && nowMs - lastSeenAt > ttlMs) { - removed = true; - continue; - } - kept[conversationId] = reference; - } - return { conversations: kept, removed }; -} - -export function createMSTeamsConversationStoreFs(params?: { - env?: NodeJS.ProcessEnv; - homedir?: () => string; - ttlMs?: number; - stateDir?: string; - storePath?: string; -}): MSTeamsConversationStore { - const ttlMs = params?.ttlMs ?? CONVERSATION_TTL_MS; - const filePath = resolveMSTeamsStorePath({ - filename: STORE_FILENAME, - env: params?.env, - homedir: params?.homedir, - stateDir: params?.stateDir, - storePath: params?.storePath, - }); - - const empty: ConversationStoreData = { version: 1, conversations: {} }; - - const readStore = async (): Promise => { - const { value } = await readJsonFile(filePath, empty); - if ( - value.version !== 1 || - !value.conversations || - typeof value.conversations !== "object" || - Array.isArray(value.conversations) - ) { - return empty; - } - const nowMs = Date.now(); - const pruned = pruneExpired(value.conversations, nowMs, ttlMs).conversations; - return { version: 1, conversations: pruneToLimit(pruned) }; - }; - - const list = async (): Promise => { - const store = await readStore(); - return toConversationStoreEntries(Object.entries(store.conversations)); - }; - - const get = async (conversationId: string): Promise => { - const store = await readStore(); - return store.conversations[normalizeStoredConversationId(conversationId)] ?? null; - }; - - const findPreferredDmByUserId = async ( - id: string, - ): Promise => { - return findPreferredDmConversationByUserId(await list(), id); - }; - - const upsert = async ( - conversationId: string, - reference: StoredConversationReference, - ): Promise => { - const normalizedId = normalizeStoredConversationId(conversationId); - await withFileLock(filePath, empty, async () => { - const store = await readStore(); - store.conversations[normalizedId] = mergeStoredConversationReference( - store.conversations[normalizedId], - reference, - new Date().toISOString(), - ); - const nowMs = Date.now(); - store.conversations = pruneExpired(store.conversations, nowMs, ttlMs).conversations; - store.conversations = pruneToLimit(store.conversations); - await writeJsonFile(filePath, store); - }); - }; - - const remove = async (conversationId: string): Promise => { - const normalizedId = normalizeStoredConversationId(conversationId); - return await withFileLock(filePath, empty, async () => { - const store = await readStore(); - if (!(normalizedId in store.conversations)) { - return false; - } - delete store.conversations[normalizedId]; - await writeJsonFile(filePath, store); - return true; - }); - }; - - return { - upsert, - get, - list, - remove, - findPreferredDmByUserId, - findByUserId: findPreferredDmByUserId, - }; -} diff --git a/extensions/msteams/src/conversation-store-state.test.ts b/extensions/msteams/src/conversation-store-state.test.ts new file mode 100644 index 00000000000..932b8b17db3 --- /dev/null +++ b/extensions/msteams/src/conversation-store-state.test.ts @@ -0,0 +1,266 @@ +import crypto from "node:crypto"; +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { + createPluginStateKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { beforeEach, describe, expect, it } from "vitest"; +import { createMSTeamsConversationStoreState } from "./conversation-store-state.js"; +import type { StoredConversationReference } from "./conversation-store.js"; +import { setMSTeamsRuntime } from "./runtime.js"; +import { msteamsRuntimeStub } from "./test-support/runtime.js"; + +function conversationStateKey(conversationId: string): string { + return crypto.createHash("sha256").update(conversationId).digest("hex"); +} + +describe("msteams conversation store (plugin state)", () => { + beforeEach(() => { + resetPluginStateStoreForTests(); + setMSTeamsRuntime(msteamsRuntimeStub); + }); + + it("filters and prunes expired entries while preserving legacy entries without lastSeenAt", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); + const env: NodeJS.ProcessEnv = { + ...process.env, + OPENCLAW_STATE_DIR: stateDir, + }; + + const ref: StoredConversationReference = { + conversation: { id: "19:active@thread.tacv2" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + user: { id: "u1", aadObjectId: "aad1" }, + }; + const filePath = path.join(stateDir, "msteams-conversations.json"); + await fs.promises.mkdir(path.dirname(filePath), { recursive: true }); + await fs.promises.writeFile( + filePath, + `${JSON.stringify( + { + version: 1, + conversations: { + "19:active@thread.tacv2": ref, + "19:old@thread.tacv2": { + ...ref, + conversation: { id: "19:old@thread.tacv2" }, + lastSeenAt: new Date(Date.now() - 60_000).toISOString(), + }, + "19:legacy@thread.tacv2": { + ...ref, + conversation: { id: "19:legacy@thread.tacv2" }, + }, + }, + }, + null, + 2, + )}\n`, + ); + + const store = createMSTeamsConversationStoreState({ env, ttlMs: 1_000 }); + const ids = (await store.list()).map((entry) => entry.conversationId).toSorted(); + expect(ids).toEqual(["19:active@thread.tacv2", "19:legacy@thread.tacv2"]); + await expect(fs.promises.access(filePath)).rejects.toThrow(); + + expect(await store.get("19:old@thread.tacv2")).toBeNull(); + const legacyConversation = await store.get("19:legacy@thread.tacv2"); + if (!legacyConversation?.conversation) { + throw new Error("expected migrated legacy Teams conversation payload"); + } + expect(legacyConversation.conversation.id).toBe("19:legacy@thread.tacv2"); + + await store.upsert("19:new@thread.tacv2", { + ...ref, + conversation: { id: "19:new@thread.tacv2" }, + }); + const idsAfter = (await store.list()).map((entry) => entry.conversationId).toSorted(); + expect(idsAfter).toEqual([ + "19:active@thread.tacv2", + "19:legacy@thread.tacv2", + "19:new@thread.tacv2", + ]); + await expect( + fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")), + ).resolves.toBeUndefined(); + }); + + it("does not let a stale legacy JSON file overwrite existing SQLite rows", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); + const env: NodeJS.ProcessEnv = { + ...process.env, + OPENCLAW_STATE_DIR: stateDir, + }; + const ref: StoredConversationReference = { + conversation: { id: "conv-current" }, + channelId: "msteams", + serviceUrl: "https://service.example.com/current", + user: { id: "current-user" }, + }; + const filePath = path.join(stateDir, "msteams-conversations.json"); + await fs.promises.writeFile( + filePath, + `${JSON.stringify({ + version: 1, + conversations: { + "conv-current": { + ...ref, + serviceUrl: "https://service.example.com/stale", + user: { id: "stale-user" }, + }, + }, + })}\n`, + ); + const sqliteStore = createPluginStateKeyedStoreForTests( + "msteams", + { + namespace: "conversations", + maxEntries: 2000, + env, + }, + ); + await sqliteStore.register(conversationStateKey("conv-current"), ref); + + const store = createMSTeamsConversationStoreState({ env }); + await expect(store.get("conv-current")).resolves.toEqual(ref); + }); + + it("hashes external conversation ids before using plugin-state keys", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); + const longConversationId = `a:${"x".repeat(900)}`; + const filePath = path.join(stateDir, "msteams-conversations.json"); + await fs.promises.writeFile( + filePath, + `${JSON.stringify({ + version: 1, + conversations: { + [longConversationId]: { + channelId: "msteams", + serviceUrl: "https://service.example.com", + user: { id: "long-user" }, + } satisfies StoredConversationReference, + }, + })}\n`, + ); + + const store = createMSTeamsConversationStoreState({ stateDir }); + + await expect(store.get(longConversationId)).resolves.toMatchObject({ + conversation: { id: longConversationId }, + user: { id: "long-user" }, + }); + await store.upsert(`${longConversationId}-new`, { + conversation: { conversationType: "personal" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + user: { id: "long-user-new" }, + }); + await expect(store.get(`${longConversationId}-new`)).resolves.toMatchObject({ + conversation: { id: `${longConversationId}-new` }, + user: { id: "long-user-new" }, + }); + }); + + it("serializes concurrent upserts so sparse activities do not drop preserved fields", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); + const store = createMSTeamsConversationStoreState({ stateDir }); + + await store.upsert("conv-race", { + conversation: { id: "conv-race", conversationType: "personal" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + user: { id: "u1" }, + graphChatId: "19:resolved@unq.gbl.spaces", + }); + + await Promise.all([ + store.upsert("conv-race", { + conversation: { id: "conv-race", conversationType: "personal" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + user: { id: "u1" }, + timezone: "Europe/London", + }), + store.upsert("conv-race", { + conversation: { id: "conv-race", conversationType: "personal" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + user: { id: "u1" }, + tenantId: "tenant-1", + }), + ]); + + await expect(store.get("conv-race")).resolves.toMatchObject({ + graphChatId: "19:resolved@unq.gbl.spaces", + timezone: "Europe/London", + tenantId: "tenant-1", + }); + }); + + it("keeps newest legacy conversations by lastSeenAt at the row cap", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); + const filePath = path.join(stateDir, "msteams-conversations.json"); + const conversations: Record = { + "conv-recent": { + conversation: { id: "conv-recent" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + lastSeenAt: "2026-03-25T20:00:00.000Z", + }, + }; + for (let index = 0; index < 1000; index += 1) { + const id = `conv-${String(index).padStart(4, "0")}`; + conversations[id] = { + conversation: { id }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(), + }; + } + await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`); + + const store = createMSTeamsConversationStoreState({ stateDir }); + const ids = (await store.list()).map((entry) => entry.conversationId); + + expect(ids).toHaveLength(1000); + expect(ids).toContain("conv-recent"); + expect(ids).not.toContain("conv-0000"); + }); + + it("treats timestamp-less legacy conversations as oldest during later cap pruning", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); + const filePath = path.join(stateDir, "msteams-conversations.json"); + const conversations: Record = { + "conv-legacy": { + conversation: { id: "conv-legacy" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + }, + }; + for (let index = 0; index < 999; index += 1) { + const id = `conv-seen-${String(index).padStart(4, "0")}`; + conversations[id] = { + conversation: { id }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(), + }; + } + await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`); + + const store = createMSTeamsConversationStoreState({ stateDir }); + await store.list(); + await store.upsert("conv-new", { + conversation: { id: "conv-new" }, + channelId: "msteams", + serviceUrl: "https://service.example.com", + }); + const ids = (await store.list()).map((entry) => entry.conversationId); + + expect(ids).toHaveLength(1000); + expect(ids).toContain("conv-new"); + expect(ids).not.toContain("conv-legacy"); + }); +}); diff --git a/extensions/msteams/src/conversation-store-state.ts b/extensions/msteams/src/conversation-store-state.ts new file mode 100644 index 00000000000..4022b03972c --- /dev/null +++ b/extensions/msteams/src/conversation-store-state.ts @@ -0,0 +1,296 @@ +import crypto from "node:crypto"; +import fs from "node:fs/promises"; +import { + findPreferredDmConversationByUserId, + mergeStoredConversationReference, + normalizeStoredConversationId, + parseStoredConversationTimestamp, + toConversationStoreEntries, +} from "./conversation-store-helpers.js"; +import type { + MSTeamsConversationStore, + MSTeamsConversationStoreEntry, + StoredConversationReference, +} from "./conversation-store.js"; +import { getMSTeamsRuntime } from "./runtime.js"; +import { + resolveMSTeamsSqliteStateEnv, + toPluginJsonValue, + withMSTeamsSqliteMutationLock, +} from "./sqlite-state.js"; +import { resolveMSTeamsStorePath } from "./storage.js"; +import { readJsonFile } from "./store-fs.js"; + +type ConversationStoreData = { + version: 1; + conversations: Record; +}; + +type ConversationMigrationMarker = { + importedAt: string; +}; + +const STORE_FILENAME = "msteams-conversations.json"; +const CONVERSATIONS_NAMESPACE = "conversations"; +const CONVERSATION_MIGRATIONS_NAMESPACE = "conversation-migrations"; +const LEGACY_JSON_MIGRATION_KEY = "msteams-conversations-json-v1"; +const MAX_CONVERSATIONS = 1000; +const SQLITE_MAX_CONVERSATION_ROWS = MAX_CONVERSATIONS + 1000; +const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000; +const CONVERSATION_LOCK_FILENAME = "msteams-conversations.sqlite.lock"; + +type MSTeamsConversationStoreStateOptions = { + env?: NodeJS.ProcessEnv; + homedir?: () => string; + ttlMs?: number; + stateDir?: string; + storePath?: string; +}; + +function createConversationStateStore(params?: MSTeamsConversationStoreStateOptions) { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: CONVERSATIONS_NAMESPACE, + maxEntries: SQLITE_MAX_CONVERSATION_ROWS, + env: resolveMSTeamsSqliteStateEnv(params), + }); +} + +function createConversationMigrationStore(params?: MSTeamsConversationStoreStateOptions) { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: CONVERSATION_MIGRATIONS_NAMESPACE, + maxEntries: 100, + env: resolveMSTeamsSqliteStateEnv(params), + }); +} + +function resolveLegacyStorePath(params?: MSTeamsConversationStoreStateOptions): string { + return resolveMSTeamsStorePath({ + filename: STORE_FILENAME, + env: params?.env, + homedir: params?.homedir, + stateDir: params?.stateDir, + storePath: params?.storePath, + }); +} + +function normalizeLegacyStore(value: ConversationStoreData): ConversationStoreData { + if ( + value.version !== 1 || + !value.conversations || + typeof value.conversations !== "object" || + Array.isArray(value.conversations) + ) { + return { version: 1, conversations: {} }; + } + return value; +} + +function buildConversationStateKey(conversationId: string): string { + return crypto.createHash("sha256").update(conversationId).digest("hex"); +} + +function prepareConversationReferenceForStorage( + conversationId: string, + reference: StoredConversationReference, +): StoredConversationReference { + return { + ...reference, + conversation: { + ...reference.conversation, + id: conversationId, + }, + }; +} + +function getStoredConversationId(reference: StoredConversationReference): string | null { + const rawId = reference.conversation?.id; + return rawId ? normalizeStoredConversationId(rawId) : null; +} + +export function createMSTeamsConversationStoreState( + params?: MSTeamsConversationStoreStateOptions, +): MSTeamsConversationStore { + const ttlMs = params?.ttlMs ?? CONVERSATION_TTL_MS; + const conversationStore = createConversationStateStore(params); + const migrationStore = createConversationMigrationStore(params); + const legacyStorePath = resolveLegacyStorePath(params); + let legacyImportPromise: Promise | null = null; + + const isExpired = (reference: StoredConversationReference): boolean => { + const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt); + // Preserve migrated legacy entries that have no lastSeenAt until they're seen again. + return lastSeenAt != null && Date.now() - lastSeenAt > ttlMs; + }; + + const selectRetainedConversations = ( + conversations: Record, + ): Array<[string, StoredConversationReference]> => { + const retained = Object.entries(conversations).filter(([, reference]) => !isExpired(reference)); + if (retained.length <= MAX_CONVERSATIONS) { + return retained; + } + retained.sort((a, b) => { + const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0; + const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0; + return aTs - bTs || a[0].localeCompare(b[0]); + }); + return retained.slice(retained.length - MAX_CONVERSATIONS); + }; + + const importLegacyStore = async (): Promise => { + if (await migrationStore.lookup(LEGACY_JSON_MIGRATION_KEY)) { + return; + } + const empty: ConversationStoreData = { version: 1, conversations: {} }; + const { value, exists } = await readJsonFile(legacyStorePath, empty); + if (!exists) { + await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, { + importedAt: new Date().toISOString(), + }); + return; + } + const legacy = normalizeLegacyStore(value); + for (const [rawConversationId, reference] of selectRetainedConversations( + legacy.conversations, + )) { + const conversationId = normalizeStoredConversationId(rawConversationId); + if (!conversationId) { + continue; + } + await conversationStore.registerIfAbsent( + buildConversationStateKey(conversationId), + toPluginJsonValue(prepareConversationReferenceForStorage(conversationId, reference)), + ); + } + await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, { + importedAt: new Date().toISOString(), + }); + await fs.rm(legacyStorePath, { force: true }).catch(() => {}); + }; + + const ensureLegacyImported = async (): Promise => { + legacyImportPromise ??= withMSTeamsSqliteMutationLock( + params, + CONVERSATION_LOCK_FILENAME, + importLegacyStore, + ); + await legacyImportPromise; + }; + + const lookupStored = async ( + conversationId: string, + ): Promise => { + const normalizedId = normalizeStoredConversationId(conversationId); + const value = await conversationStore.lookup(buildConversationStateKey(normalizedId)); + if (!value) { + return null; + } + if (isExpired(value)) { + return null; + } + return value; + }; + + const entries = async (): Promise> => { + await ensureLegacyImported(); + const rows = await conversationStore.entries(); + const kept: Array<[string, StoredConversationReference]> = []; + for (const row of rows) { + if (isExpired(row.value)) { + continue; + } + const conversationId = getStoredConversationId(row.value); + if (conversationId) { + kept.push([conversationId, row.value]); + } + } + return kept; + }; + + const lookup = async (conversationId: string): Promise => { + await ensureLegacyImported(); + return await lookupStored(conversationId); + }; + + const register = async ( + conversationId: string, + reference: StoredConversationReference, + ): Promise => { + const normalizedId = normalizeStoredConversationId(conversationId); + await conversationStore.register( + buildConversationStateKey(normalizedId), + toPluginJsonValue(prepareConversationReferenceForStorage(normalizedId, reference)), + ); + const rows = []; + for (const row of await conversationStore.entries()) { + if (isExpired(row.value)) { + await conversationStore.delete(row.key); + continue; + } + rows.push(row); + } + if (rows.length <= MAX_CONVERSATIONS) { + return; + } + const sorted = rows.toSorted((a, b) => { + const aTs = parseStoredConversationTimestamp(a.value.lastSeenAt) ?? 0; + const bTs = parseStoredConversationTimestamp(b.value.lastSeenAt) ?? 0; + const aId = getStoredConversationId(a.value) ?? a.key; + const bId = getStoredConversationId(b.value) ?? b.key; + return aTs - bTs || aId.localeCompare(bId); + }); + for (const row of sorted.slice(0, rows.length - MAX_CONVERSATIONS)) { + await conversationStore.delete(row.key); + } + }; + + const list = async (): Promise => { + return toConversationStoreEntries(await entries()); + }; + + const get = async (conversationId: string): Promise => { + return await lookup(conversationId); + }; + + const findPreferredDmByUserId = async ( + id: string, + ): Promise => { + return findPreferredDmConversationByUserId(await list(), id); + }; + + const upsert = async ( + conversationId: string, + reference: StoredConversationReference, + ): Promise => { + const normalizedId = normalizeStoredConversationId(conversationId); + await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => { + await importLegacyStore(); + const existing = await lookupStored(normalizedId); + await register( + normalizedId, + mergeStoredConversationReference( + existing ?? undefined, + reference, + new Date().toISOString(), + ), + ); + }); + }; + + const remove = async (conversationId: string): Promise => { + const normalizedId = normalizeStoredConversationId(conversationId); + return await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => { + await importLegacyStore(); + return await conversationStore.delete(buildConversationStateKey(normalizedId)); + }); + }; + + return { + upsert, + get, + list, + remove, + findPreferredDmByUserId, + findByUserId: findPreferredDmByUserId, + }; +} diff --git a/extensions/msteams/src/conversation-store.shared.test.ts b/extensions/msteams/src/conversation-store.shared.test.ts index b4fcd9058db..f0b46617ab5 100644 --- a/extensions/msteams/src/conversation-store.shared.test.ts +++ b/extensions/msteams/src/conversation-store.shared.test.ts @@ -1,12 +1,13 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; import { createMSTeamsConversationStoreMemory } from "./conversation-store-memory.js"; +import { createMSTeamsConversationStoreState } from "./conversation-store-state.js"; import type { MSTeamsConversationStore } from "./conversation-store.js"; import { setMSTeamsRuntime } from "./runtime.js"; -import { msteamsRuntimeStub } from "./test-runtime.js"; +import { msteamsRuntimeStub } from "./test-support/runtime.js"; type StoreFactory = { name: string; @@ -15,10 +16,10 @@ type StoreFactory = { const storeFactories: StoreFactory[] = [ { - name: "fs", + name: "state", createStore: async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-")); - return createMSTeamsConversationStoreFs({ + return createMSTeamsConversationStoreState({ env: { ...process.env, OPENCLAW_STATE_DIR: stateDir }, ttlMs: 60_000, }); @@ -32,6 +33,7 @@ const storeFactories: StoreFactory[] = [ describe.each(storeFactories)("msteams conversation store ($name)", ({ createStore }) => { beforeEach(() => { + resetPluginStateStoreForTests(); setMSTeamsRuntime(msteamsRuntimeStub); }); diff --git a/extensions/msteams/src/graph-group-management.test.ts b/extensions/msteams/src/graph-group-management.test.ts index d59d50077f6..4039c2cc94f 100644 --- a/extensions/msteams/src/graph-group-management.test.ts +++ b/extensions/msteams/src/graph-group-management.test.ts @@ -27,8 +27,8 @@ vi.mock("./graph.js", async (importOriginal) => { }; }); -vi.mock("./conversation-store-fs.js", () => ({ - createMSTeamsConversationStoreFs: () => ({ +vi.mock("./conversation-store-state.js", () => ({ + createMSTeamsConversationStoreState: () => ({ findPreferredDmByUserId: mockState.findPreferredDmByUserId, }), })); diff --git a/extensions/msteams/src/graph-messages.test-helpers.ts b/extensions/msteams/src/graph-messages.test-helpers.ts index eeed7a63a6d..ba7be1515fe 100644 --- a/extensions/msteams/src/graph-messages.test-helpers.ts +++ b/extensions/msteams/src/graph-messages.test-helpers.ts @@ -22,8 +22,8 @@ vi.mock("./graph.js", () => { }; }); -vi.mock("./conversation-store-fs.js", () => ({ - createMSTeamsConversationStoreFs: () => ({ +vi.mock("./conversation-store-state.js", () => ({ + createMSTeamsConversationStoreState: () => ({ findPreferredDmByUserId: graphMessagesMockState.findPreferredDmByUserId, }), })); diff --git a/extensions/msteams/src/graph-messages.ts b/extensions/msteams/src/graph-messages.ts index 9b8967031ce..ad445def6f3 100644 --- a/extensions/msteams/src/graph-messages.ts +++ b/extensions/msteams/src/graph-messages.ts @@ -1,5 +1,5 @@ import type { OpenClawConfig } from "../runtime-api.js"; -import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; +import { createMSTeamsConversationStoreState } from "./conversation-store-state.js"; import { type GraphResponse, deleteGraphRequest, @@ -75,7 +75,7 @@ export async function resolveGraphConversationId(to: string): Promise { } // user: — look up the conversation store for the real chat ID - const store = createMSTeamsConversationStoreFs(); + const store = createMSTeamsConversationStoreState(); const found = await store.findPreferredDmByUserId(cleaned); if (!found) { throw new Error( diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index ead5b584835..8194a06651e 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -9,7 +9,7 @@ import { type RuntimeEnv, } from "../runtime-api.js"; import { resolveMSTeamsSdkCloudOptions } from "./cloud.js"; -import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; +import { createMSTeamsConversationStoreState } from "./conversation-store-state.js"; import type { MSTeamsConversationStore } from "./conversation-store.js"; import { formatUnknownError } from "./errors.js"; import { runMSTeamsFeedbackInvokeHandler } from "./feedback-invoke.js"; @@ -23,7 +23,7 @@ import { } from "./monitor-handler.js"; import type { MSTeamsMessageHandlerDeps } from "./monitor-handler.types.js"; import { - createMSTeamsPollStoreFs, + createMSTeamsPollStoreState, extractMSTeamsPollVote, type MSTeamsPollStore, } from "./polls.js"; @@ -252,8 +252,8 @@ export async function monitorMSTeamsProvider( typeof agentDefaults?.mediaMaxMb === "number" && agentDefaults.mediaMaxMb > 0 ? Math.floor(agentDefaults.mediaMaxMb * MB) : 8 * MB; - const conversationStore = opts.conversationStore ?? createMSTeamsConversationStoreFs(); - const pollStore = opts.pollStore ?? createMSTeamsPollStoreFs(); + const conversationStore = opts.conversationStore ?? createMSTeamsConversationStoreState(); + const pollStore = opts.pollStore ?? createMSTeamsPollStoreState(); log.info(`starting provider (port ${port})`); diff --git a/extensions/msteams/src/outbound.test.ts b/extensions/msteams/src/outbound.test.ts index 3ab92924e93..76d32df8403 100644 --- a/extensions/msteams/src/outbound.test.ts +++ b/extensions/msteams/src/outbound.test.ts @@ -15,7 +15,7 @@ vi.mock("./send.js", () => ({ })); vi.mock("./polls.js", () => ({ - createMSTeamsPollStoreFs: () => ({ + createMSTeamsPollStoreState: () => ({ createPoll: mocks.createPoll, }), })); diff --git a/extensions/msteams/src/outbound.ts b/extensions/msteams/src/outbound.ts index 8099d9f688e..ae3d7b81b3d 100644 --- a/extensions/msteams/src/outbound.ts +++ b/extensions/msteams/src/outbound.ts @@ -16,7 +16,7 @@ import { normalizeStringEntries, type ChannelOutboundAdapter, } from "../runtime-api.js"; -import { createMSTeamsPollStoreFs } from "./polls.js"; +import { createMSTeamsPollStoreState } from "./polls.js"; import { buildMSTeamsPresentationCard, MSTEAMS_PRESENTATION_CAPABILITIES } from "./presentation.js"; import { sendAdaptiveCardMSTeams, sendMessageMSTeams, sendPollMSTeams } from "./send.js"; @@ -180,7 +180,7 @@ export const msteamsOutbound: ChannelOutboundAdapter = { options: poll.options, maxSelections, }); - const pollStore = createMSTeamsPollStoreFs(); + const pollStore = createMSTeamsPollStoreState(); await pollStore.createPoll({ id: result.pollId, question: poll.question, diff --git a/extensions/msteams/src/pending-uploads-fs.test.ts b/extensions/msteams/src/pending-uploads-fs.test.ts index 2ffd755c1f6..74bdb0a97c5 100644 --- a/extensions/msteams/src/pending-uploads-fs.test.ts +++ b/extensions/msteams/src/pending-uploads-fs.test.ts @@ -11,7 +11,7 @@ import { } from "./pending-uploads-fs.js"; import { clearPendingUploads } from "./pending-uploads.js"; import { setMSTeamsRuntime } from "./runtime.js"; -import { msteamsRuntimeStub } from "./test-runtime.js"; +import { msteamsRuntimeStub } from "./test-support/runtime.js"; // Track temp dirs created by each test so afterEach can clean them up. const createdTempDirs: string[] = []; diff --git a/extensions/msteams/src/polls.test.ts b/extensions/msteams/src/polls.test.ts index caa2dc060e7..58c8e57cf1d 100644 --- a/extensions/msteams/src/polls.test.ts +++ b/extensions/msteams/src/polls.test.ts @@ -1,19 +1,26 @@ +import crypto from "node:crypto"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { + createPluginStateKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { beforeEach, describe, expect, it } from "vitest"; import { createMSTeamsPollStoreMemory } from "./polls-store-memory.js"; import { buildMSTeamsPollCard, - createMSTeamsPollStoreFs, + createMSTeamsPollStoreState, extractMSTeamsPollVote, normalizeMSTeamsPollSelections, + type MSTeamsPoll, } from "./polls.js"; import { setMSTeamsRuntime } from "./runtime.js"; -import { msteamsRuntimeStub } from "./test-runtime.js"; +import { msteamsRuntimeStub } from "./test-support/runtime.js"; describe("msteams polls", () => { beforeEach(() => { + resetPluginStateStoreForTests(); setMSTeamsRuntime(msteamsRuntimeStub); }); @@ -45,7 +52,7 @@ describe("msteams polls", () => { it("stores and records poll votes", async () => { const home = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); - const store = createMSTeamsPollStoreFs({ homedir: () => home }); + const store = createMSTeamsPollStoreState({ homedir: () => home }); await store.createPoll({ id: "poll-2", question: "Pick one", @@ -99,17 +106,22 @@ describe("msteams polls", () => { }); }); -const createFsStore = async () => { +const createStateStore = async () => { const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); - return createMSTeamsPollStoreFs({ stateDir }); + return createMSTeamsPollStoreState({ stateDir }); }; const createMemoryStore = () => createMSTeamsPollStoreMemory(); describe.each([ { name: "memory", createStore: createMemoryStore }, - { name: "fs", createStore: createFsStore }, + { name: "state", createStore: createStateStore }, ])("$name poll store", ({ createStore }) => { + beforeEach(() => { + resetPluginStateStoreForTests(); + setMSTeamsRuntime(msteamsRuntimeStub); + }); + it("stores polls and records normalized votes", async () => { const store = await createStore(); await store.createPoll({ @@ -134,6 +146,296 @@ describe.each([ }); }); +describe("state poll store", () => { + beforeEach(() => { + resetPluginStateStoreForTests(); + setMSTeamsRuntime(msteamsRuntimeStub); + }); + + it("imports legacy JSON polls once and removes the old file", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); + const filePath = path.join(stateDir, "msteams-polls.json"); + await fs.promises.writeFile( + filePath, + `${JSON.stringify({ + version: 1, + polls: { + "poll-legacy": { + id: "poll-legacy", + question: "Legacy?", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date().toISOString(), + votes: {}, + }, + }, + })}\n`, + ); + + const store = createMSTeamsPollStoreState({ stateDir }); + await expect(store.getPoll("poll-legacy")).resolves.toMatchObject({ + id: "poll-legacy", + question: "Legacy?", + }); + await expect(fs.promises.access(filePath)).rejects.toThrow(); + + const updated = await store.recordVote({ + pollId: "poll-legacy", + voterId: "user-1", + selections: ["1"], + }); + expect(updated?.votes["user-1"]).toEqual(["1"]); + await expect( + fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")), + ).resolves.toBeUndefined(); + }); + + it("hashes external poll ids before using plugin-state keys", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); + const store = createMSTeamsPollStoreState({ stateDir }); + const longPollId = `poll-${"x".repeat(900)}`; + + await store.createPoll({ + id: longPollId, + question: "Long id?", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date().toISOString(), + votes: {}, + }); + + await expect(store.getPoll(longPollId)).resolves.toMatchObject({ id: longPollId }); + await expect( + store.recordVote({ + pollId: `missing-${"y".repeat(900)}`, + voterId: "user-1", + selections: ["0"], + }), + ).resolves.toBeNull(); + }); + + it("serializes concurrent votes for the same poll", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); + const store = createMSTeamsPollStoreState({ stateDir }); + await store.createPoll({ + id: "poll-race", + question: "Pick", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date().toISOString(), + votes: {}, + }); + + await Promise.all([ + store.recordVote({ pollId: "poll-race", voterId: "user-a", selections: ["0"] }), + store.recordVote({ pollId: "poll-race", voterId: "user-b", selections: ["1"] }), + ]); + + await expect(store.getPoll("poll-race")).resolves.toMatchObject({ + votes: { + "user-a": ["0"], + "user-b": ["1"], + }, + }); + }); + + it("keeps large vote maps split across bounded rows", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); + const store = createMSTeamsPollStoreState({ stateDir }); + const votes = Object.fromEntries( + Array.from({ length: 500 }, (_, index) => [ + `user-${String(index).padStart(4, "0")}-${"x".repeat(160)}`, + ["0"], + ]), + ); + + await store.createPoll({ + id: "poll-large", + question: "Pick", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date().toISOString(), + votes, + }); + await store.recordVote({ pollId: "poll-large", voterId: "user-new", selections: ["1"] }); + + const stored = await store.getPoll("poll-large"); + expect(Object.keys(stored?.votes ?? {})).toHaveLength(501); + expect(stored?.votes["user-new"]).toEqual(["1"]); + }); + + it("fills missing legacy vote buckets after a partial metadata import", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); + const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir }; + const filePath = path.join(stateDir, "msteams-polls.json"); + const metadata = { + id: "poll-partial", + question: "Partial?", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date().toISOString(), + }; + const metadataStore = createPluginStateKeyedStoreForTests("msteams", { + namespace: "polls", + maxEntries: 2000, + env, + }); + await metadataStore.register("poll-partial", metadata); + const voterHash = crypto + .createHash("sha256") + .update("poll-partial") + .update("\0") + .update("user-legacy") + .digest("hex"); + const bucket = String(Number.parseInt(voterHash.slice(0, 8), 16) % 32).padStart(4, "0"); + const pollHash = crypto.createHash("sha256").update("poll-partial").digest("hex"); + const voteBucketStore = createPluginStateKeyedStoreForTests<{ + pollId: string; + bucket: string; + votes: Record; + updatedAt: string; + }>("msteams", { + namespace: "poll-vote-buckets", + maxEntries: 32_032, + env, + }); + await voteBucketStore.register(`${pollHash}:${bucket}`, { + pollId: "poll-partial", + bucket, + votes: { "user-legacy": ["0"] }, + updatedAt: metadata.createdAt, + }); + await fs.promises.writeFile( + filePath, + `${JSON.stringify({ + version: 1, + polls: { + "poll-partial": { + ...metadata, + votes: { + "user-legacy": ["1"], + "user-missing": ["1"], + }, + }, + }, + })}\n`, + ); + + const store = createMSTeamsPollStoreState({ env }); + + await expect(store.getPoll("poll-partial")).resolves.toMatchObject({ + votes: { + "user-legacy": ["0"], + "user-missing": ["1"], + }, + }); + }); + + it("keeps newest legacy polls by update timestamp at the row cap", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); + const filePath = path.join(stateDir, "msteams-polls.json"); + const pollRows: Record = {}; + const baseMs = Date.now() - 60_000; + pollRows["poll-recent"] = { + id: "poll-recent", + question: "Recent?", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date(baseMs + 2_000_000).toISOString(), + updatedAt: new Date(baseMs + 2_000_000).toISOString(), + votes: {}, + }; + for (let index = 0; index < 1000; index += 1) { + const id = `poll-${String(index).padStart(4, "0")}`; + pollRows[id] = { + id, + question: "Old?", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date(baseMs + index).toISOString(), + votes: {}, + }; + } + await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, polls: pollRows })}\n`); + + const store = createMSTeamsPollStoreState({ stateDir }); + + await expect(store.getPoll("poll-recent")).resolves.toMatchObject({ id: "poll-recent" }); + await expect(store.getPoll("poll-0000")).resolves.toBeNull(); + }); + + it("deletes vote buckets when pruning over the poll cap", async () => { + const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-")); + const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir }; + const metadataStore = createPluginStateKeyedStoreForTests>( + "msteams", + { + namespace: "polls", + maxEntries: 2000, + env, + }, + ); + const voteBucketStore = createPluginStateKeyedStoreForTests<{ + pollId: string; + bucket: string; + votes: Record; + updatedAt: string; + }>("msteams", { + namespace: "poll-vote-buckets", + maxEntries: 32_032, + env, + }); + const pollStateKey = (pollId: string) => + crypto.createHash("sha256").update(pollId).digest("hex"); + const voteBucket = (pollId: string, voterId: string) => { + const hash = crypto + .createHash("sha256") + .update(pollId) + .update("\0") + .update(voterId) + .digest("hex"); + return String(Number.parseInt(hash.slice(0, 8), 16) % 32).padStart(4, "0"); + }; + const baseMs = Date.now() - 60_000; + const oldPollId = "poll-old"; + + for (const [index, id] of [ + oldPollId, + ...Array.from({ length: 999 }, (_, entryIndex) => `poll-existing-${entryIndex}`), + ].entries()) { + await metadataStore.register(pollStateKey(id), { + id, + question: "Pick", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date(baseMs + index).toISOString(), + }); + } + const oldBucket = voteBucket(oldPollId, "user-old"); + await voteBucketStore.register(`${pollStateKey(oldPollId)}:${oldBucket}`, { + pollId: oldPollId, + bucket: oldBucket, + votes: { "user-old": ["0"] }, + updatedAt: new Date(baseMs).toISOString(), + }); + + const store = createMSTeamsPollStoreState({ env }); + await store.createPoll({ + id: "poll-new", + question: "New?", + options: ["A", "B"], + maxSelections: 1, + createdAt: new Date(baseMs + 2_000_000).toISOString(), + votes: { "user-new": ["1"] }, + }); + + await expect(store.getPoll(oldPollId)).resolves.toBeNull(); + const buckets = await voteBucketStore.entries(); + expect(buckets.some((row) => row.value.pollId === oldPollId)).toBe(false); + expect(buckets.some((row) => row.value.pollId === "poll-new")).toBe(true); + }); +}); + describe("memory poll store", () => { it("reads seeded polls back, updates timestamps, and returns null for missing polls", async () => { const store = createMSTeamsPollStoreMemory([ diff --git a/extensions/msteams/src/polls.ts b/extensions/msteams/src/polls.ts index 459a06e6886..f0faa64c19f 100644 --- a/extensions/msteams/src/polls.ts +++ b/extensions/msteams/src/polls.ts @@ -1,4 +1,5 @@ import crypto from "node:crypto"; +import fs from "node:fs/promises"; import { parseStrictNonNegativeInteger } from "openclaw/plugin-sdk/number-runtime"; import { isRecord, @@ -6,8 +7,14 @@ import { normalizeStringEntries, uniqueStrings, } from "openclaw/plugin-sdk/string-coerce-runtime"; +import { getMSTeamsRuntime } from "./runtime.js"; +import { + resolveMSTeamsSqliteStateEnv, + toPluginJsonValue, + withMSTeamsSqliteMutationLock, +} from "./sqlite-state.js"; import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js"; +import { readJsonFile } from "./store-fs.js"; type MSTeamsPollVote = { pollId: string; @@ -50,9 +57,27 @@ type PollStoreData = { polls: Record; }; +type StoredMSTeamsPoll = Omit; + +type StoredMSTeamsPollVoteBucket = { + pollId: string; + bucket: string; + votes: Record; + updatedAt: string; +}; + const STORE_FILENAME = "msteams-polls.json"; +const POLLS_NAMESPACE = "polls"; +const POLL_VOTE_BUCKETS_NAMESPACE = "poll-vote-buckets"; +const POLL_MIGRATIONS_NAMESPACE = "poll-migrations"; +const LEGACY_POLLS_MIGRATION_KEY = "msteams-polls-json-v1"; const MAX_POLLS = 1000; +const SQLITE_MAX_POLL_ROWS = MAX_POLLS + 1000; +// Keep worst-case retained vote buckets below plugin-state's per-plugin live row cap. +const POLL_VOTE_BUCKET_COUNT = 32; +const MAX_POLL_VOTE_BUCKET_ROWS = (MAX_POLLS + 1) * POLL_VOTE_BUCKET_COUNT; const POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000; +const POLL_LOCK_FILENAME = "msteams-polls.sqlite.lock"; function normalizeChoiceValue(value: unknown): string | null { if (typeof value === "string") { @@ -216,13 +241,41 @@ export function buildMSTeamsPollCard(params: { }; } -type MSTeamsPollStoreFsOptions = { +type MSTeamsPollStoreStateOptions = { env?: NodeJS.ProcessEnv; homedir?: () => string; stateDir?: string; storePath?: string; }; +type PollMigrationMarker = { + importedAt: string; +}; + +function createPollStateStore(params?: MSTeamsPollStoreStateOptions) { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: POLLS_NAMESPACE, + maxEntries: SQLITE_MAX_POLL_ROWS, + env: resolveMSTeamsSqliteStateEnv(params), + }); +} + +function createPollVoteBucketStateStore(params?: MSTeamsPollStoreStateOptions) { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: POLL_VOTE_BUCKETS_NAMESPACE, + maxEntries: MAX_POLL_VOTE_BUCKET_ROWS, + env: resolveMSTeamsSqliteStateEnv(params), + }); +} + +function createPollMigrationStore(params?: MSTeamsPollStoreStateOptions) { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: POLL_MIGRATIONS_NAMESPACE, + maxEntries: 100, + env: resolveMSTeamsSqliteStateEnv(params), + }); +} + function parseTimestamp(value?: string): number | null { if (!value) { return null; @@ -231,7 +284,9 @@ function parseTimestamp(value?: string): number | null { return Number.isFinite(parsed) ? parsed : null; } -function pruneExpired(polls: Record) { +function pruneExpired( + polls: Record, +) { const cutoff = Date.now() - POLL_TTL_MS; const entries = Object.entries(polls).filter(([, poll]) => { const ts = parseTimestamp(poll.updatedAt ?? poll.createdAt) ?? 0; @@ -240,18 +295,17 @@ function pruneExpired(polls: Record) { return Object.fromEntries(entries); } -function pruneToLimit(polls: Record) { - const entries = Object.entries(polls); - if (entries.length <= MAX_POLLS) { - return polls; +function selectRetainedPolls(polls: Record): Array<[string, MSTeamsPoll]> { + const retained = Object.entries(pruneExpired(polls)); + if (retained.length <= MAX_POLLS) { + return retained; } - entries.sort((a, b) => { + retained.sort((a, b) => { const aTs = parseTimestamp(a[1].updatedAt ?? a[1].createdAt) ?? 0; const bTs = parseTimestamp(b[1].updatedAt ?? b[1].createdAt) ?? 0; - return aTs - bTs; + return aTs - bTs || a[0].localeCompare(b[0]); }); - const keep = entries.slice(entries.length - MAX_POLLS); - return Object.fromEntries(keep); + return retained.slice(retained.length - MAX_POLLS); } export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: string[]) { @@ -265,54 +319,227 @@ export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: st return uniqueStrings(limited); } -export function createMSTeamsPollStoreFs(params?: MSTeamsPollStoreFsOptions): MSTeamsPollStore { - const filePath = resolveMSTeamsStorePath({ +export function createMSTeamsPollStoreState( + params?: MSTeamsPollStoreStateOptions, +): MSTeamsPollStore { + const pollStore = createPollStateStore(params); + const voteBucketStore = createPollVoteBucketStateStore(params); + const migrationStore = createPollMigrationStore(params); + const legacyStorePath = resolveMSTeamsStorePath({ filename: STORE_FILENAME, env: params?.env, homedir: params?.homedir, stateDir: params?.stateDir, storePath: params?.storePath, }); - const empty: PollStoreData = { version: 1, polls: {} }; + let legacyImportPromise: Promise | null = null; - const readStore = async (): Promise => { - const { value } = await readJsonFile(filePath, empty); - const pruned = pruneToLimit(pruneExpired(value.polls ?? {})); - return { version: 1, polls: pruned }; + const splitPoll = ( + poll: MSTeamsPoll, + ): { metadata: StoredMSTeamsPoll; votes: MSTeamsPoll["votes"] } => { + const { votes, ...metadata } = poll; + return { metadata, votes }; }; - const writeStore = async (data: PollStoreData) => { - await writeJsonFile(filePath, data); + const hashVote = (pollId: string, voterId: string): string => { + return crypto.createHash("sha256").update(pollId).update("\0").update(voterId).digest("hex"); + }; + + const buildPollStateKey = (pollId: string): string => { + return crypto.createHash("sha256").update(pollId).digest("hex"); + }; + + const selectVoteBucket = (pollId: string, voterId: string): string => { + const bucket = Number.parseInt(hashVote(pollId, voterId).slice(0, 8), 16); + return String(bucket % POLL_VOTE_BUCKET_COUNT).padStart(4, "0"); + }; + + const buildVoteBucketKey = (pollId: string, bucket: string): string => { + const pollDigest = crypto.createHash("sha256").update(pollId).digest("hex"); + return `${pollDigest}:${bucket}`; + }; + + const readPollVotes = async (pollId: string): Promise> => { + const votes: Record = {}; + for (const row of await voteBucketStore.entries()) { + if (row.value.pollId === pollId) { + Object.assign(votes, row.value.votes); + } + } + return votes; + }; + + const deletePollVotes = async (pollId: string): Promise => { + for (const row of await voteBucketStore.entries()) { + if (row.value.pollId === pollId) { + await voteBucketStore.delete(row.key); + } + } + }; + + const registerPollVotes = async ( + pollId: string, + votes: Record, + updatedAt: string, + ): Promise => { + const buckets = new Map>(); + for (const [voterId, selections] of Object.entries(votes)) { + const bucket = selectVoteBucket(pollId, voterId); + const bucketVotes = buckets.get(bucket) ?? {}; + bucketVotes[voterId] = selections; + buckets.set(bucket, bucketVotes); + } + for (const [bucket, bucketVotes] of buckets) { + const key = buildVoteBucketKey(pollId, bucket); + const existing = await voteBucketStore.lookup(key); + await voteBucketStore.register( + key, + toPluginJsonValue({ + pollId, + bucket, + votes: { ...bucketVotes, ...existing?.votes }, + updatedAt, + }), + ); + } + }; + + const registerPollVote = async ( + pollId: string, + voterId: string, + selections: string[], + updatedAt: string, + ): Promise => { + const bucket = selectVoteBucket(pollId, voterId); + const key = buildVoteBucketKey(pollId, bucket); + const existing = await voteBucketStore.lookup(key); + await voteBucketStore.register( + key, + toPluginJsonValue({ + pollId, + bucket, + votes: { ...existing?.votes, [voterId]: selections }, + updatedAt, + }), + ); + }; + + const reconstructPoll = async (metadata: StoredMSTeamsPoll): Promise => { + return { ...metadata, votes: await readPollVotes(metadata.id) }; + }; + + const importLegacyStore = async (): Promise => { + if (await migrationStore.lookup(LEGACY_POLLS_MIGRATION_KEY)) { + return; + } + const empty: PollStoreData = { version: 1, polls: {} }; + const { value, exists } = await readJsonFile(legacyStorePath, empty); + if (!exists) { + await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, { + importedAt: new Date().toISOString(), + }); + return; + } + const legacyPolls = + value.version === 1 && + value.polls && + typeof value.polls === "object" && + !Array.isArray(value.polls) + ? value.polls + : {}; + for (const [pollId, poll] of selectRetainedPolls(legacyPolls)) { + if (!pollId) { + continue; + } + const { metadata, votes } = splitPoll(poll); + await pollStore.registerIfAbsent(buildPollStateKey(pollId), toPluginJsonValue(metadata)); + await registerPollVotes(pollId, votes, poll.updatedAt ?? poll.createdAt); + } + await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, { + importedAt: new Date().toISOString(), + }); + await fs.rm(legacyStorePath, { force: true }).catch(() => {}); + }; + + const ensureLegacyImported = async (): Promise => { + legacyImportPromise ??= withMSTeamsSqliteMutationLock( + params, + POLL_LOCK_FILENAME, + importLegacyStore, + ); + await legacyImportPromise; + }; + + const prunePollStoreToLimit = async (): Promise => { + const rows = []; + for (const row of await pollStore.entries()) { + if (!pruneExpired({ [row.key]: row.value })[row.key]) { + await pollStore.delete(row.key); + await deletePollVotes(row.value.id); + continue; + } + rows.push(row); + } + if (rows.length <= MAX_POLLS) { + return; + } + const sorted = rows.toSorted((a, b) => { + const aTs = parseTimestamp(a.value.updatedAt ?? a.value.createdAt) ?? 0; + const bTs = parseTimestamp(b.value.updatedAt ?? b.value.createdAt) ?? 0; + return aTs - bTs || a.key.localeCompare(b.key); + }); + for (const row of sorted.slice(0, rows.length - MAX_POLLS)) { + await pollStore.delete(row.key); + await deletePollVotes(row.value.id); + } }; const createPoll = async (poll: MSTeamsPoll) => { - await withFileLock(filePath, empty, async () => { - const data = await readStore(); - data.polls[poll.id] = poll; - await writeStore({ version: 1, polls: pruneToLimit(data.polls) }); + await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => { + await importLegacyStore(); + const { metadata, votes } = splitPoll(poll); + await pollStore.register(buildPollStateKey(poll.id), toPluginJsonValue(metadata)); + await deletePollVotes(poll.id); + await registerPollVotes(poll.id, votes, poll.updatedAt ?? poll.createdAt); + await prunePollStoreToLimit(); }); }; - const getPoll = async (pollId: string) => - await withFileLock(filePath, empty, async () => { - const data = await readStore(); - return data.polls[pollId] ?? null; - }); + const getPoll = async (pollId: string) => { + await ensureLegacyImported(); + const poll = await pollStore.lookup(buildPollStateKey(pollId)); + if (!poll) { + return null; + } + if (!pruneExpired({ [pollId]: poll })[pollId]) { + return null; + } + return await reconstructPoll(poll); + }; - const recordVote = async (params: { pollId: string; voterId: string; selections: string[] }) => - await withFileLock(filePath, empty, async () => { - const data = await readStore(); - const poll = data.polls[params.pollId]; + const recordVote = async (vote: { pollId: string; voterId: string; selections: string[] }) => { + return await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => { + await importLegacyStore(); + const pollKey = buildPollStateKey(vote.pollId); + const poll = await pollStore.lookup(pollKey); if (!poll) { return null; } - const normalized = normalizeMSTeamsPollSelections(poll, params.selections); - poll.votes[params.voterId] = normalized; - poll.updatedAt = new Date().toISOString(); - data.polls[poll.id] = poll; - await writeStore({ version: 1, polls: pruneToLimit(data.polls) }); - return poll; + if (!pruneExpired({ [vote.pollId]: poll })[vote.pollId]) { + await pollStore.delete(pollKey); + await deletePollVotes(vote.pollId); + return null; + } + const currentPoll = await reconstructPoll(poll); + const normalized = normalizeMSTeamsPollSelections(currentPoll, vote.selections); + const updatedAt = new Date().toISOString(); + poll.updatedAt = updatedAt; + await pollStore.register(pollKey, toPluginJsonValue(poll)); + await registerPollVote(vote.pollId, vote.voterId, normalized, updatedAt); + await prunePollStoreToLimit(); + return await reconstructPoll(poll); }); + }; return { createPoll, getPoll, recordVote }; } diff --git a/extensions/msteams/src/send-context.test.ts b/extensions/msteams/src/send-context.test.ts index 012a2f2cc7a..533b43ea6d5 100644 --- a/extensions/msteams/src/send-context.test.ts +++ b/extensions/msteams/src/send-context.test.ts @@ -20,8 +20,8 @@ const sendContextMockState = vi.hoisted(() => { }; }); -vi.mock("./conversation-store-fs.js", () => ({ - createMSTeamsConversationStoreFs: () => sendContextMockState.store, +vi.mock("./conversation-store-state.js", () => ({ + createMSTeamsConversationStoreState: () => sendContextMockState.store, })); vi.mock("./runtime.js", () => ({ diff --git a/extensions/msteams/src/send-context.ts b/extensions/msteams/src/send-context.ts index 1e77f429490..494f61437af 100644 --- a/extensions/msteams/src/send-context.ts +++ b/extensions/msteams/src/send-context.ts @@ -17,7 +17,7 @@ import { validateMSTeamsProactiveServiceUrlBoundary, type MSTeamsSdkCloudOptions, } from "./cloud.js"; -import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; +import { createMSTeamsConversationStoreState } from "./conversation-store-state.js"; import type { MSTeamsConversationStore, StoredConversationReference, @@ -159,7 +159,7 @@ export async function resolveMSTeamsSendContext(params: { throw new Error("msteams credentials not configured"); } - const store = createMSTeamsConversationStoreFs(); + const store = createMSTeamsConversationStoreState(); // Parse recipient and find conversation reference const recipient = parseRecipient(params.to); diff --git a/extensions/msteams/src/sqlite-state.ts b/extensions/msteams/src/sqlite-state.ts new file mode 100644 index 00000000000..3d0bc19ec2d --- /dev/null +++ b/extensions/msteams/src/sqlite-state.ts @@ -0,0 +1,89 @@ +import path from "node:path"; +import { getMSTeamsRuntime } from "./runtime.js"; +import { withFileLock } from "./store-fs.js"; + +export type MSTeamsSqliteStateOptions = { + env?: NodeJS.ProcessEnv; + homedir?: () => string; + stateDir?: string; + storePath?: string; +}; + +function resolveStateDirOverride( + options: MSTeamsSqliteStateOptions | undefined, +): string | undefined { + if (!options) { + return undefined; + } + if (options.stateDir) { + return options.stateDir; + } + if (options.storePath) { + return path.dirname(options.storePath); + } + if (options.homedir) { + return getMSTeamsRuntime().state.resolveStateDir(options.env ?? process.env, options.homedir); + } + return options.env?.OPENCLAW_STATE_DIR?.trim() || undefined; +} + +export function resolveMSTeamsSqliteStateEnv( + options: MSTeamsSqliteStateOptions | undefined, +): NodeJS.ProcessEnv | undefined { + const stateDir = resolveStateDirOverride(options); + if (!stateDir) { + return options?.env; + } + return { + ...(options?.env ?? process.env), + OPENCLAW_STATE_DIR: stateDir, + }; +} + +export function toPluginJsonValue(value: T): T { + return JSON.parse(JSON.stringify(value)) as T; +} + +export function resolveMSTeamsSqliteStateDir( + options: MSTeamsSqliteStateOptions | undefined, +): string { + return ( + resolveStateDirOverride(options) ?? + getMSTeamsRuntime().state.resolveStateDir(options?.env ?? process.env, options?.homedir) + ); +} + +const sqliteMutationLocks = new Map>(); + +async function withProcessMutationLock(lockPath: string, fn: () => Promise): Promise { + const previous = sqliteMutationLocks.get(lockPath) ?? Promise.resolve(); + let release: () => void = () => {}; + const next = new Promise((resolve) => { + release = resolve; + }); + const chained = previous.then( + () => next, + () => next, + ); + sqliteMutationLocks.set(lockPath, chained); + await previous.catch(() => undefined); + try { + return await fn(); + } finally { + release(); + if (sqliteMutationLocks.get(lockPath) === chained) { + sqliteMutationLocks.delete(lockPath); + } + } +} + +export async function withMSTeamsSqliteMutationLock( + options: MSTeamsSqliteStateOptions | undefined, + lockFilename: string, + fn: () => Promise, +): Promise { + const lockPath = path.join(resolveMSTeamsSqliteStateDir(options), lockFilename); + return await withProcessMutationLock(lockPath, async () => { + return await withFileLock(lockPath, { version: 1 }, fn); + }); +} diff --git a/extensions/msteams/src/test-runtime.ts b/extensions/msteams/src/test-support/runtime.ts similarity index 57% rename from extensions/msteams/src/test-runtime.ts rename to extensions/msteams/src/test-support/runtime.ts index 3d884fcf2ac..8b475e5fd69 100644 --- a/extensions/msteams/src/test-runtime.ts +++ b/extensions/msteams/src/test-support/runtime.ts @@ -1,9 +1,13 @@ import os from "node:os"; import path from "node:path"; -import type { PluginRuntime } from "../runtime-api.js"; +import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { createPluginStateKeyedStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import type { PluginRuntime } from "../../runtime-api.js"; export const msteamsRuntimeStub = { state: { + openKeyedStore: (options: OpenKeyedStoreOptions) => + createPluginStateKeyedStoreForTests("msteams", options), resolveStateDir: (env: NodeJS.ProcessEnv = process.env, homedir?: () => string) => { const override = env.OPENCLAW_STATE_DIR?.trim() || env.OPENCLAW_STATE_DIR?.trim(); if (override) {