refactor(msteams): persist conversation and poll stores in sqlite

Move MSTeams conversation and poll plugin-local stores to plugin-state SQLite. Legacy JSON stores import once without overwriting existing SQLite state; conversation and poll IDs are hashed for plugin-state keys; poll votes are sharded with bounded row-cap headroom and prune cleanup; MSTeams docs now describe SQLite storage. SSO and delegated token stores are unchanged. Verified with focused MSTeams tests, docs sanity, autoreview, Testbox check:changed, and green PR CI.
This commit is contained in:
Peter Steinberger
2026-05-30 21:08:39 +01:00
committed by GitHub
parent a9a86f788b
commit a2b2c4a76c
19 changed files with 1257 additions and 300 deletions

View File

@@ -915,9 +915,10 @@ Uploaded files are stored in a `/OpenClawShared/` folder in the configured Share
OpenClaw sends Teams polls as Adaptive Cards (there is no native Teams poll API).
- CLI: `openclaw message poll --channel msteams --target conversation:<id> ...`
- Votes are recorded by the gateway in `~/.openclaw/msteams-polls.json`.
- Votes are recorded by the gateway in OpenClaw plugin-state SQLite under `state/openclaw.sqlite`.
- Existing `msteams-polls.json` files are imported once when the MSTeams plugin starts.
- The gateway must stay online to record votes.
- Polls do not auto-post result summaries yet (inspect the store file if needed).
- Polls do not auto-post result summaries yet, and there is no supported poll-results CLI yet.
## Presentation cards

View File

@@ -1,81 +0,0 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it } from "vitest";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import type { StoredConversationReference } from "./conversation-store.js";
import { setMSTeamsRuntime } from "./runtime.js";
import { msteamsRuntimeStub } from "./test-runtime.js";
describe("msteams conversation store (fs-only)", () => {
beforeEach(() => {
setMSTeamsRuntime(msteamsRuntimeStub);
});
it("filters and prunes expired entries while preserving legacy entries without lastSeenAt", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
const env: NodeJS.ProcessEnv = {
...process.env,
OPENCLAW_STATE_DIR: stateDir,
};
const store = createMSTeamsConversationStoreFs({ env, ttlMs: 1_000 });
const ref: StoredConversationReference = {
conversation: { id: "19:active@thread.tacv2" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
user: { id: "u1", aadObjectId: "aad1" },
};
await store.upsert("19:active@thread.tacv2", ref);
const filePath = path.join(stateDir, "msteams-conversations.json");
const raw = await fs.promises.readFile(filePath, "utf-8");
const json = JSON.parse(raw) as {
version: number;
conversations: Record<string, StoredConversationReference>;
};
json.conversations["19:old@thread.tacv2"] = {
...ref,
conversation: { id: "19:old@thread.tacv2" },
lastSeenAt: new Date(Date.now() - 60_000).toISOString(),
};
json.conversations["19:legacy@thread.tacv2"] = {
...ref,
conversation: { id: "19:legacy@thread.tacv2" },
};
await fs.promises.writeFile(filePath, `${JSON.stringify(json, null, 2)}\n`);
const list = await store.list();
const ids = list.map((entry) => entry.conversationId).toSorted();
expect(ids).toEqual(["19:active@thread.tacv2", "19:legacy@thread.tacv2"]);
expect(await store.get("19:old@thread.tacv2")).toBeNull();
const legacyConversation = await store.get("19:legacy@thread.tacv2");
if (!legacyConversation) {
throw new Error("expected migrated legacy Teams conversation");
}
if (!legacyConversation.conversation) {
throw new Error("expected migrated legacy Teams conversation payload");
}
expect(legacyConversation.conversation.id).toBe("19:legacy@thread.tacv2");
await store.upsert("19:new@thread.tacv2", {
...ref,
conversation: { id: "19:new@thread.tacv2" },
});
const rawAfter = await fs.promises.readFile(filePath, "utf-8");
const jsonAfter = JSON.parse(rawAfter) as typeof json;
expect(Object.keys(jsonAfter.conversations).toSorted()).toEqual([
"19:active@thread.tacv2",
"19:legacy@thread.tacv2",
"19:new@thread.tacv2",
]);
});
});

View File

@@ -1,149 +0,0 @@
import {
findPreferredDmConversationByUserId,
mergeStoredConversationReference,
normalizeStoredConversationId,
parseStoredConversationTimestamp,
toConversationStoreEntries,
} from "./conversation-store-helpers.js";
import type {
MSTeamsConversationStore,
MSTeamsConversationStoreEntry,
StoredConversationReference,
} from "./conversation-store.js";
import { resolveMSTeamsStorePath } from "./storage.js";
import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js";
type ConversationStoreData = {
version: 1;
conversations: Record<string, StoredConversationReference>;
};
const STORE_FILENAME = "msteams-conversations.json";
const MAX_CONVERSATIONS = 1000;
const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000;
function pruneToLimit(conversations: Record<string, StoredConversationReference>) {
const entries = Object.entries(conversations);
if (entries.length <= MAX_CONVERSATIONS) {
return conversations;
}
entries.sort((a, b) => {
const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0;
const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0;
return aTs - bTs;
});
const keep = entries.slice(entries.length - MAX_CONVERSATIONS);
return Object.fromEntries(keep);
}
function pruneExpired(
conversations: Record<string, StoredConversationReference>,
nowMs: number,
ttlMs: number,
) {
let removed = false;
const kept: typeof conversations = {};
for (const [conversationId, reference] of Object.entries(conversations)) {
const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt);
// Preserve legacy entries that have no lastSeenAt until they're seen again.
if (lastSeenAt != null && nowMs - lastSeenAt > ttlMs) {
removed = true;
continue;
}
kept[conversationId] = reference;
}
return { conversations: kept, removed };
}
export function createMSTeamsConversationStoreFs(params?: {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
ttlMs?: number;
stateDir?: string;
storePath?: string;
}): MSTeamsConversationStore {
const ttlMs = params?.ttlMs ?? CONVERSATION_TTL_MS;
const filePath = resolveMSTeamsStorePath({
filename: STORE_FILENAME,
env: params?.env,
homedir: params?.homedir,
stateDir: params?.stateDir,
storePath: params?.storePath,
});
const empty: ConversationStoreData = { version: 1, conversations: {} };
const readStore = async (): Promise<ConversationStoreData> => {
const { value } = await readJsonFile(filePath, empty);
if (
value.version !== 1 ||
!value.conversations ||
typeof value.conversations !== "object" ||
Array.isArray(value.conversations)
) {
return empty;
}
const nowMs = Date.now();
const pruned = pruneExpired(value.conversations, nowMs, ttlMs).conversations;
return { version: 1, conversations: pruneToLimit(pruned) };
};
const list = async (): Promise<MSTeamsConversationStoreEntry[]> => {
const store = await readStore();
return toConversationStoreEntries(Object.entries(store.conversations));
};
const get = async (conversationId: string): Promise<StoredConversationReference | null> => {
const store = await readStore();
return store.conversations[normalizeStoredConversationId(conversationId)] ?? null;
};
const findPreferredDmByUserId = async (
id: string,
): Promise<MSTeamsConversationStoreEntry | null> => {
return findPreferredDmConversationByUserId(await list(), id);
};
const upsert = async (
conversationId: string,
reference: StoredConversationReference,
): Promise<void> => {
const normalizedId = normalizeStoredConversationId(conversationId);
await withFileLock(filePath, empty, async () => {
const store = await readStore();
store.conversations[normalizedId] = mergeStoredConversationReference(
store.conversations[normalizedId],
reference,
new Date().toISOString(),
);
const nowMs = Date.now();
store.conversations = pruneExpired(store.conversations, nowMs, ttlMs).conversations;
store.conversations = pruneToLimit(store.conversations);
await writeJsonFile(filePath, store);
});
};
const remove = async (conversationId: string): Promise<boolean> => {
const normalizedId = normalizeStoredConversationId(conversationId);
return await withFileLock(filePath, empty, async () => {
const store = await readStore();
if (!(normalizedId in store.conversations)) {
return false;
}
delete store.conversations[normalizedId];
await writeJsonFile(filePath, store);
return true;
});
};
return {
upsert,
get,
list,
remove,
findPreferredDmByUserId,
findByUserId: findPreferredDmByUserId,
};
}

View File

@@ -0,0 +1,266 @@
import crypto from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import {
createPluginStateKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { beforeEach, describe, expect, it } from "vitest";
import { createMSTeamsConversationStoreState } from "./conversation-store-state.js";
import type { StoredConversationReference } from "./conversation-store.js";
import { setMSTeamsRuntime } from "./runtime.js";
import { msteamsRuntimeStub } from "./test-support/runtime.js";
function conversationStateKey(conversationId: string): string {
return crypto.createHash("sha256").update(conversationId).digest("hex");
}
describe("msteams conversation store (plugin state)", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
});
it("filters and prunes expired entries while preserving legacy entries without lastSeenAt", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
const env: NodeJS.ProcessEnv = {
...process.env,
OPENCLAW_STATE_DIR: stateDir,
};
const ref: StoredConversationReference = {
conversation: { id: "19:active@thread.tacv2" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
user: { id: "u1", aadObjectId: "aad1" },
};
const filePath = path.join(stateDir, "msteams-conversations.json");
await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
await fs.promises.writeFile(
filePath,
`${JSON.stringify(
{
version: 1,
conversations: {
"19:active@thread.tacv2": ref,
"19:old@thread.tacv2": {
...ref,
conversation: { id: "19:old@thread.tacv2" },
lastSeenAt: new Date(Date.now() - 60_000).toISOString(),
},
"19:legacy@thread.tacv2": {
...ref,
conversation: { id: "19:legacy@thread.tacv2" },
},
},
},
null,
2,
)}\n`,
);
const store = createMSTeamsConversationStoreState({ env, ttlMs: 1_000 });
const ids = (await store.list()).map((entry) => entry.conversationId).toSorted();
expect(ids).toEqual(["19:active@thread.tacv2", "19:legacy@thread.tacv2"]);
await expect(fs.promises.access(filePath)).rejects.toThrow();
expect(await store.get("19:old@thread.tacv2")).toBeNull();
const legacyConversation = await store.get("19:legacy@thread.tacv2");
if (!legacyConversation?.conversation) {
throw new Error("expected migrated legacy Teams conversation payload");
}
expect(legacyConversation.conversation.id).toBe("19:legacy@thread.tacv2");
await store.upsert("19:new@thread.tacv2", {
...ref,
conversation: { id: "19:new@thread.tacv2" },
});
const idsAfter = (await store.list()).map((entry) => entry.conversationId).toSorted();
expect(idsAfter).toEqual([
"19:active@thread.tacv2",
"19:legacy@thread.tacv2",
"19:new@thread.tacv2",
]);
await expect(
fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")),
).resolves.toBeUndefined();
});
it("does not let a stale legacy JSON file overwrite existing SQLite rows", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
const env: NodeJS.ProcessEnv = {
...process.env,
OPENCLAW_STATE_DIR: stateDir,
};
const ref: StoredConversationReference = {
conversation: { id: "conv-current" },
channelId: "msteams",
serviceUrl: "https://service.example.com/current",
user: { id: "current-user" },
};
const filePath = path.join(stateDir, "msteams-conversations.json");
await fs.promises.writeFile(
filePath,
`${JSON.stringify({
version: 1,
conversations: {
"conv-current": {
...ref,
serviceUrl: "https://service.example.com/stale",
user: { id: "stale-user" },
},
},
})}\n`,
);
const sqliteStore = createPluginStateKeyedStoreForTests<StoredConversationReference>(
"msteams",
{
namespace: "conversations",
maxEntries: 2000,
env,
},
);
await sqliteStore.register(conversationStateKey("conv-current"), ref);
const store = createMSTeamsConversationStoreState({ env });
await expect(store.get("conv-current")).resolves.toEqual(ref);
});
it("hashes external conversation ids before using plugin-state keys", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
const longConversationId = `a:${"x".repeat(900)}`;
const filePath = path.join(stateDir, "msteams-conversations.json");
await fs.promises.writeFile(
filePath,
`${JSON.stringify({
version: 1,
conversations: {
[longConversationId]: {
channelId: "msteams",
serviceUrl: "https://service.example.com",
user: { id: "long-user" },
} satisfies StoredConversationReference,
},
})}\n`,
);
const store = createMSTeamsConversationStoreState({ stateDir });
await expect(store.get(longConversationId)).resolves.toMatchObject({
conversation: { id: longConversationId },
user: { id: "long-user" },
});
await store.upsert(`${longConversationId}-new`, {
conversation: { conversationType: "personal" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
user: { id: "long-user-new" },
});
await expect(store.get(`${longConversationId}-new`)).resolves.toMatchObject({
conversation: { id: `${longConversationId}-new` },
user: { id: "long-user-new" },
});
});
it("serializes concurrent upserts so sparse activities do not drop preserved fields", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
const store = createMSTeamsConversationStoreState({ stateDir });
await store.upsert("conv-race", {
conversation: { id: "conv-race", conversationType: "personal" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
user: { id: "u1" },
graphChatId: "19:resolved@unq.gbl.spaces",
});
await Promise.all([
store.upsert("conv-race", {
conversation: { id: "conv-race", conversationType: "personal" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
user: { id: "u1" },
timezone: "Europe/London",
}),
store.upsert("conv-race", {
conversation: { id: "conv-race", conversationType: "personal" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
user: { id: "u1" },
tenantId: "tenant-1",
}),
]);
await expect(store.get("conv-race")).resolves.toMatchObject({
graphChatId: "19:resolved@unq.gbl.spaces",
timezone: "Europe/London",
tenantId: "tenant-1",
});
});
it("keeps newest legacy conversations by lastSeenAt at the row cap", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
const filePath = path.join(stateDir, "msteams-conversations.json");
const conversations: Record<string, StoredConversationReference> = {
"conv-recent": {
conversation: { id: "conv-recent" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
lastSeenAt: "2026-03-25T20:00:00.000Z",
},
};
for (let index = 0; index < 1000; index += 1) {
const id = `conv-${String(index).padStart(4, "0")}`;
conversations[id] = {
conversation: { id },
channelId: "msteams",
serviceUrl: "https://service.example.com",
lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(),
};
}
await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`);
const store = createMSTeamsConversationStoreState({ stateDir });
const ids = (await store.list()).map((entry) => entry.conversationId);
expect(ids).toHaveLength(1000);
expect(ids).toContain("conv-recent");
expect(ids).not.toContain("conv-0000");
});
it("treats timestamp-less legacy conversations as oldest during later cap pruning", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
const filePath = path.join(stateDir, "msteams-conversations.json");
const conversations: Record<string, StoredConversationReference> = {
"conv-legacy": {
conversation: { id: "conv-legacy" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
},
};
for (let index = 0; index < 999; index += 1) {
const id = `conv-seen-${String(index).padStart(4, "0")}`;
conversations[id] = {
conversation: { id },
channelId: "msteams",
serviceUrl: "https://service.example.com",
lastSeenAt: new Date(Date.UTC(2026, 1, 1, 0, 0, index)).toISOString(),
};
}
await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, conversations })}\n`);
const store = createMSTeamsConversationStoreState({ stateDir });
await store.list();
await store.upsert("conv-new", {
conversation: { id: "conv-new" },
channelId: "msteams",
serviceUrl: "https://service.example.com",
});
const ids = (await store.list()).map((entry) => entry.conversationId);
expect(ids).toHaveLength(1000);
expect(ids).toContain("conv-new");
expect(ids).not.toContain("conv-legacy");
});
});

View File

@@ -0,0 +1,296 @@
import crypto from "node:crypto";
import fs from "node:fs/promises";
import {
findPreferredDmConversationByUserId,
mergeStoredConversationReference,
normalizeStoredConversationId,
parseStoredConversationTimestamp,
toConversationStoreEntries,
} from "./conversation-store-helpers.js";
import type {
MSTeamsConversationStore,
MSTeamsConversationStoreEntry,
StoredConversationReference,
} from "./conversation-store.js";
import { getMSTeamsRuntime } from "./runtime.js";
import {
resolveMSTeamsSqliteStateEnv,
toPluginJsonValue,
withMSTeamsSqliteMutationLock,
} from "./sqlite-state.js";
import { resolveMSTeamsStorePath } from "./storage.js";
import { readJsonFile } from "./store-fs.js";
type ConversationStoreData = {
version: 1;
conversations: Record<string, StoredConversationReference>;
};
type ConversationMigrationMarker = {
importedAt: string;
};
const STORE_FILENAME = "msteams-conversations.json";
const CONVERSATIONS_NAMESPACE = "conversations";
const CONVERSATION_MIGRATIONS_NAMESPACE = "conversation-migrations";
const LEGACY_JSON_MIGRATION_KEY = "msteams-conversations-json-v1";
const MAX_CONVERSATIONS = 1000;
const SQLITE_MAX_CONVERSATION_ROWS = MAX_CONVERSATIONS + 1000;
const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000;
const CONVERSATION_LOCK_FILENAME = "msteams-conversations.sqlite.lock";
type MSTeamsConversationStoreStateOptions = {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
ttlMs?: number;
stateDir?: string;
storePath?: string;
};
function createConversationStateStore(params?: MSTeamsConversationStoreStateOptions) {
return getMSTeamsRuntime().state.openKeyedStore<StoredConversationReference>({
namespace: CONVERSATIONS_NAMESPACE,
maxEntries: SQLITE_MAX_CONVERSATION_ROWS,
env: resolveMSTeamsSqliteStateEnv(params),
});
}
function createConversationMigrationStore(params?: MSTeamsConversationStoreStateOptions) {
return getMSTeamsRuntime().state.openKeyedStore<ConversationMigrationMarker>({
namespace: CONVERSATION_MIGRATIONS_NAMESPACE,
maxEntries: 100,
env: resolveMSTeamsSqliteStateEnv(params),
});
}
function resolveLegacyStorePath(params?: MSTeamsConversationStoreStateOptions): string {
return resolveMSTeamsStorePath({
filename: STORE_FILENAME,
env: params?.env,
homedir: params?.homedir,
stateDir: params?.stateDir,
storePath: params?.storePath,
});
}
function normalizeLegacyStore(value: ConversationStoreData): ConversationStoreData {
if (
value.version !== 1 ||
!value.conversations ||
typeof value.conversations !== "object" ||
Array.isArray(value.conversations)
) {
return { version: 1, conversations: {} };
}
return value;
}
function buildConversationStateKey(conversationId: string): string {
return crypto.createHash("sha256").update(conversationId).digest("hex");
}
function prepareConversationReferenceForStorage(
conversationId: string,
reference: StoredConversationReference,
): StoredConversationReference {
return {
...reference,
conversation: {
...reference.conversation,
id: conversationId,
},
};
}
function getStoredConversationId(reference: StoredConversationReference): string | null {
const rawId = reference.conversation?.id;
return rawId ? normalizeStoredConversationId(rawId) : null;
}
export function createMSTeamsConversationStoreState(
params?: MSTeamsConversationStoreStateOptions,
): MSTeamsConversationStore {
const ttlMs = params?.ttlMs ?? CONVERSATION_TTL_MS;
const conversationStore = createConversationStateStore(params);
const migrationStore = createConversationMigrationStore(params);
const legacyStorePath = resolveLegacyStorePath(params);
let legacyImportPromise: Promise<void> | null = null;
const isExpired = (reference: StoredConversationReference): boolean => {
const lastSeenAt = parseStoredConversationTimestamp(reference.lastSeenAt);
// Preserve migrated legacy entries that have no lastSeenAt until they're seen again.
return lastSeenAt != null && Date.now() - lastSeenAt > ttlMs;
};
const selectRetainedConversations = (
conversations: Record<string, StoredConversationReference>,
): Array<[string, StoredConversationReference]> => {
const retained = Object.entries(conversations).filter(([, reference]) => !isExpired(reference));
if (retained.length <= MAX_CONVERSATIONS) {
return retained;
}
retained.sort((a, b) => {
const aTs = parseStoredConversationTimestamp(a[1].lastSeenAt) ?? 0;
const bTs = parseStoredConversationTimestamp(b[1].lastSeenAt) ?? 0;
return aTs - bTs || a[0].localeCompare(b[0]);
});
return retained.slice(retained.length - MAX_CONVERSATIONS);
};
const importLegacyStore = async (): Promise<void> => {
if (await migrationStore.lookup(LEGACY_JSON_MIGRATION_KEY)) {
return;
}
const empty: ConversationStoreData = { version: 1, conversations: {} };
const { value, exists } = await readJsonFile<ConversationStoreData>(legacyStorePath, empty);
if (!exists) {
await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
return;
}
const legacy = normalizeLegacyStore(value);
for (const [rawConversationId, reference] of selectRetainedConversations(
legacy.conversations,
)) {
const conversationId = normalizeStoredConversationId(rawConversationId);
if (!conversationId) {
continue;
}
await conversationStore.registerIfAbsent(
buildConversationStateKey(conversationId),
toPluginJsonValue(prepareConversationReferenceForStorage(conversationId, reference)),
);
}
await migrationStore.register(LEGACY_JSON_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
await fs.rm(legacyStorePath, { force: true }).catch(() => {});
};
const ensureLegacyImported = async (): Promise<void> => {
legacyImportPromise ??= withMSTeamsSqliteMutationLock(
params,
CONVERSATION_LOCK_FILENAME,
importLegacyStore,
);
await legacyImportPromise;
};
const lookupStored = async (
conversationId: string,
): Promise<StoredConversationReference | null> => {
const normalizedId = normalizeStoredConversationId(conversationId);
const value = await conversationStore.lookup(buildConversationStateKey(normalizedId));
if (!value) {
return null;
}
if (isExpired(value)) {
return null;
}
return value;
};
const entries = async (): Promise<Array<[string, StoredConversationReference]>> => {
await ensureLegacyImported();
const rows = await conversationStore.entries();
const kept: Array<[string, StoredConversationReference]> = [];
for (const row of rows) {
if (isExpired(row.value)) {
continue;
}
const conversationId = getStoredConversationId(row.value);
if (conversationId) {
kept.push([conversationId, row.value]);
}
}
return kept;
};
const lookup = async (conversationId: string): Promise<StoredConversationReference | null> => {
await ensureLegacyImported();
return await lookupStored(conversationId);
};
const register = async (
conversationId: string,
reference: StoredConversationReference,
): Promise<void> => {
const normalizedId = normalizeStoredConversationId(conversationId);
await conversationStore.register(
buildConversationStateKey(normalizedId),
toPluginJsonValue(prepareConversationReferenceForStorage(normalizedId, reference)),
);
const rows = [];
for (const row of await conversationStore.entries()) {
if (isExpired(row.value)) {
await conversationStore.delete(row.key);
continue;
}
rows.push(row);
}
if (rows.length <= MAX_CONVERSATIONS) {
return;
}
const sorted = rows.toSorted((a, b) => {
const aTs = parseStoredConversationTimestamp(a.value.lastSeenAt) ?? 0;
const bTs = parseStoredConversationTimestamp(b.value.lastSeenAt) ?? 0;
const aId = getStoredConversationId(a.value) ?? a.key;
const bId = getStoredConversationId(b.value) ?? b.key;
return aTs - bTs || aId.localeCompare(bId);
});
for (const row of sorted.slice(0, rows.length - MAX_CONVERSATIONS)) {
await conversationStore.delete(row.key);
}
};
const list = async (): Promise<MSTeamsConversationStoreEntry[]> => {
return toConversationStoreEntries(await entries());
};
const get = async (conversationId: string): Promise<StoredConversationReference | null> => {
return await lookup(conversationId);
};
const findPreferredDmByUserId = async (
id: string,
): Promise<MSTeamsConversationStoreEntry | null> => {
return findPreferredDmConversationByUserId(await list(), id);
};
const upsert = async (
conversationId: string,
reference: StoredConversationReference,
): Promise<void> => {
const normalizedId = normalizeStoredConversationId(conversationId);
await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => {
await importLegacyStore();
const existing = await lookupStored(normalizedId);
await register(
normalizedId,
mergeStoredConversationReference(
existing ?? undefined,
reference,
new Date().toISOString(),
),
);
});
};
const remove = async (conversationId: string): Promise<boolean> => {
const normalizedId = normalizeStoredConversationId(conversationId);
return await withMSTeamsSqliteMutationLock(params, CONVERSATION_LOCK_FILENAME, async () => {
await importLegacyStore();
return await conversationStore.delete(buildConversationStateKey(normalizedId));
});
};
return {
upsert,
get,
list,
remove,
findPreferredDmByUserId,
findByUserId: findPreferredDmByUserId,
};
}

View File

@@ -1,12 +1,13 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import { createMSTeamsConversationStoreMemory } from "./conversation-store-memory.js";
import { createMSTeamsConversationStoreState } from "./conversation-store-state.js";
import type { MSTeamsConversationStore } from "./conversation-store.js";
import { setMSTeamsRuntime } from "./runtime.js";
import { msteamsRuntimeStub } from "./test-runtime.js";
import { msteamsRuntimeStub } from "./test-support/runtime.js";
type StoreFactory = {
name: string;
@@ -15,10 +16,10 @@ type StoreFactory = {
const storeFactories: StoreFactory[] = [
{
name: "fs",
name: "state",
createStore: async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-store-"));
return createMSTeamsConversationStoreFs({
return createMSTeamsConversationStoreState({
env: { ...process.env, OPENCLAW_STATE_DIR: stateDir },
ttlMs: 60_000,
});
@@ -32,6 +33,7 @@ const storeFactories: StoreFactory[] = [
describe.each(storeFactories)("msteams conversation store ($name)", ({ createStore }) => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
});

View File

@@ -27,8 +27,8 @@ vi.mock("./graph.js", async (importOriginal) => {
};
});
vi.mock("./conversation-store-fs.js", () => ({
createMSTeamsConversationStoreFs: () => ({
vi.mock("./conversation-store-state.js", () => ({
createMSTeamsConversationStoreState: () => ({
findPreferredDmByUserId: mockState.findPreferredDmByUserId,
}),
}));

View File

@@ -22,8 +22,8 @@ vi.mock("./graph.js", () => {
};
});
vi.mock("./conversation-store-fs.js", () => ({
createMSTeamsConversationStoreFs: () => ({
vi.mock("./conversation-store-state.js", () => ({
createMSTeamsConversationStoreState: () => ({
findPreferredDmByUserId: graphMessagesMockState.findPreferredDmByUserId,
}),
}));

View File

@@ -1,5 +1,5 @@
import type { OpenClawConfig } from "../runtime-api.js";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import { createMSTeamsConversationStoreState } from "./conversation-store-state.js";
import {
type GraphResponse,
deleteGraphRequest,
@@ -75,7 +75,7 @@ export async function resolveGraphConversationId(to: string): Promise<string> {
}
// user:<aadId> — look up the conversation store for the real chat ID
const store = createMSTeamsConversationStoreFs();
const store = createMSTeamsConversationStoreState();
const found = await store.findPreferredDmByUserId(cleaned);
if (!found) {
throw new Error(

View File

@@ -9,7 +9,7 @@ import {
type RuntimeEnv,
} from "../runtime-api.js";
import { resolveMSTeamsSdkCloudOptions } from "./cloud.js";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import { createMSTeamsConversationStoreState } from "./conversation-store-state.js";
import type { MSTeamsConversationStore } from "./conversation-store.js";
import { formatUnknownError } from "./errors.js";
import { runMSTeamsFeedbackInvokeHandler } from "./feedback-invoke.js";
@@ -23,7 +23,7 @@ import {
} from "./monitor-handler.js";
import type { MSTeamsMessageHandlerDeps } from "./monitor-handler.types.js";
import {
createMSTeamsPollStoreFs,
createMSTeamsPollStoreState,
extractMSTeamsPollVote,
type MSTeamsPollStore,
} from "./polls.js";
@@ -252,8 +252,8 @@ export async function monitorMSTeamsProvider(
typeof agentDefaults?.mediaMaxMb === "number" && agentDefaults.mediaMaxMb > 0
? Math.floor(agentDefaults.mediaMaxMb * MB)
: 8 * MB;
const conversationStore = opts.conversationStore ?? createMSTeamsConversationStoreFs();
const pollStore = opts.pollStore ?? createMSTeamsPollStoreFs();
const conversationStore = opts.conversationStore ?? createMSTeamsConversationStoreState();
const pollStore = opts.pollStore ?? createMSTeamsPollStoreState();
log.info(`starting provider (port ${port})`);

View File

@@ -15,7 +15,7 @@ vi.mock("./send.js", () => ({
}));
vi.mock("./polls.js", () => ({
createMSTeamsPollStoreFs: () => ({
createMSTeamsPollStoreState: () => ({
createPoll: mocks.createPoll,
}),
}));

View File

@@ -16,7 +16,7 @@ import {
normalizeStringEntries,
type ChannelOutboundAdapter,
} from "../runtime-api.js";
import { createMSTeamsPollStoreFs } from "./polls.js";
import { createMSTeamsPollStoreState } from "./polls.js";
import { buildMSTeamsPresentationCard, MSTEAMS_PRESENTATION_CAPABILITIES } from "./presentation.js";
import { sendAdaptiveCardMSTeams, sendMessageMSTeams, sendPollMSTeams } from "./send.js";
@@ -180,7 +180,7 @@ export const msteamsOutbound: ChannelOutboundAdapter = {
options: poll.options,
maxSelections,
});
const pollStore = createMSTeamsPollStoreFs();
const pollStore = createMSTeamsPollStoreState();
await pollStore.createPoll({
id: result.pollId,
question: poll.question,

View File

@@ -11,7 +11,7 @@ import {
} from "./pending-uploads-fs.js";
import { clearPendingUploads } from "./pending-uploads.js";
import { setMSTeamsRuntime } from "./runtime.js";
import { msteamsRuntimeStub } from "./test-runtime.js";
import { msteamsRuntimeStub } from "./test-support/runtime.js";
// Track temp dirs created by each test so afterEach can clean them up.
const createdTempDirs: string[] = [];

View File

@@ -1,19 +1,26 @@
import crypto from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import {
createPluginStateKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { beforeEach, describe, expect, it } from "vitest";
import { createMSTeamsPollStoreMemory } from "./polls-store-memory.js";
import {
buildMSTeamsPollCard,
createMSTeamsPollStoreFs,
createMSTeamsPollStoreState,
extractMSTeamsPollVote,
normalizeMSTeamsPollSelections,
type MSTeamsPoll,
} from "./polls.js";
import { setMSTeamsRuntime } from "./runtime.js";
import { msteamsRuntimeStub } from "./test-runtime.js";
import { msteamsRuntimeStub } from "./test-support/runtime.js";
describe("msteams polls", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
});
@@ -45,7 +52,7 @@ describe("msteams polls", () => {
it("stores and records poll votes", async () => {
const home = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const store = createMSTeamsPollStoreFs({ homedir: () => home });
const store = createMSTeamsPollStoreState({ homedir: () => home });
await store.createPoll({
id: "poll-2",
question: "Pick one",
@@ -99,17 +106,22 @@ describe("msteams polls", () => {
});
});
const createFsStore = async () => {
const createStateStore = async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
return createMSTeamsPollStoreFs({ stateDir });
return createMSTeamsPollStoreState({ stateDir });
};
const createMemoryStore = () => createMSTeamsPollStoreMemory();
describe.each([
{ name: "memory", createStore: createMemoryStore },
{ name: "fs", createStore: createFsStore },
{ name: "state", createStore: createStateStore },
])("$name poll store", ({ createStore }) => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
});
it("stores polls and records normalized votes", async () => {
const store = await createStore();
await store.createPoll({
@@ -134,6 +146,296 @@ describe.each([
});
});
describe("state poll store", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
});
it("imports legacy JSON polls once and removes the old file", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const filePath = path.join(stateDir, "msteams-polls.json");
await fs.promises.writeFile(
filePath,
`${JSON.stringify({
version: 1,
polls: {
"poll-legacy": {
id: "poll-legacy",
question: "Legacy?",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date().toISOString(),
votes: {},
},
},
})}\n`,
);
const store = createMSTeamsPollStoreState({ stateDir });
await expect(store.getPoll("poll-legacy")).resolves.toMatchObject({
id: "poll-legacy",
question: "Legacy?",
});
await expect(fs.promises.access(filePath)).rejects.toThrow();
const updated = await store.recordVote({
pollId: "poll-legacy",
voterId: "user-1",
selections: ["1"],
});
expect(updated?.votes["user-1"]).toEqual(["1"]);
await expect(
fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")),
).resolves.toBeUndefined();
});
it("hashes external poll ids before using plugin-state keys", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const store = createMSTeamsPollStoreState({ stateDir });
const longPollId = `poll-${"x".repeat(900)}`;
await store.createPoll({
id: longPollId,
question: "Long id?",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date().toISOString(),
votes: {},
});
await expect(store.getPoll(longPollId)).resolves.toMatchObject({ id: longPollId });
await expect(
store.recordVote({
pollId: `missing-${"y".repeat(900)}`,
voterId: "user-1",
selections: ["0"],
}),
).resolves.toBeNull();
});
it("serializes concurrent votes for the same poll", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const store = createMSTeamsPollStoreState({ stateDir });
await store.createPoll({
id: "poll-race",
question: "Pick",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date().toISOString(),
votes: {},
});
await Promise.all([
store.recordVote({ pollId: "poll-race", voterId: "user-a", selections: ["0"] }),
store.recordVote({ pollId: "poll-race", voterId: "user-b", selections: ["1"] }),
]);
await expect(store.getPoll("poll-race")).resolves.toMatchObject({
votes: {
"user-a": ["0"],
"user-b": ["1"],
},
});
});
it("keeps large vote maps split across bounded rows", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const store = createMSTeamsPollStoreState({ stateDir });
const votes = Object.fromEntries(
Array.from({ length: 500 }, (_, index) => [
`user-${String(index).padStart(4, "0")}-${"x".repeat(160)}`,
["0"],
]),
);
await store.createPoll({
id: "poll-large",
question: "Pick",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date().toISOString(),
votes,
});
await store.recordVote({ pollId: "poll-large", voterId: "user-new", selections: ["1"] });
const stored = await store.getPoll("poll-large");
expect(Object.keys(stored?.votes ?? {})).toHaveLength(501);
expect(stored?.votes["user-new"]).toEqual(["1"]);
});
it("fills missing legacy vote buckets after a partial metadata import", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir };
const filePath = path.join(stateDir, "msteams-polls.json");
const metadata = {
id: "poll-partial",
question: "Partial?",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date().toISOString(),
};
const metadataStore = createPluginStateKeyedStoreForTests<typeof metadata>("msteams", {
namespace: "polls",
maxEntries: 2000,
env,
});
await metadataStore.register("poll-partial", metadata);
const voterHash = crypto
.createHash("sha256")
.update("poll-partial")
.update("\0")
.update("user-legacy")
.digest("hex");
const bucket = String(Number.parseInt(voterHash.slice(0, 8), 16) % 32).padStart(4, "0");
const pollHash = crypto.createHash("sha256").update("poll-partial").digest("hex");
const voteBucketStore = createPluginStateKeyedStoreForTests<{
pollId: string;
bucket: string;
votes: Record<string, string[]>;
updatedAt: string;
}>("msteams", {
namespace: "poll-vote-buckets",
maxEntries: 32_032,
env,
});
await voteBucketStore.register(`${pollHash}:${bucket}`, {
pollId: "poll-partial",
bucket,
votes: { "user-legacy": ["0"] },
updatedAt: metadata.createdAt,
});
await fs.promises.writeFile(
filePath,
`${JSON.stringify({
version: 1,
polls: {
"poll-partial": {
...metadata,
votes: {
"user-legacy": ["1"],
"user-missing": ["1"],
},
},
},
})}\n`,
);
const store = createMSTeamsPollStoreState({ env });
await expect(store.getPoll("poll-partial")).resolves.toMatchObject({
votes: {
"user-legacy": ["0"],
"user-missing": ["1"],
},
});
});
it("keeps newest legacy polls by update timestamp at the row cap", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const filePath = path.join(stateDir, "msteams-polls.json");
const pollRows: Record<string, MSTeamsPoll> = {};
const baseMs = Date.now() - 60_000;
pollRows["poll-recent"] = {
id: "poll-recent",
question: "Recent?",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date(baseMs + 2_000_000).toISOString(),
updatedAt: new Date(baseMs + 2_000_000).toISOString(),
votes: {},
};
for (let index = 0; index < 1000; index += 1) {
const id = `poll-${String(index).padStart(4, "0")}`;
pollRows[id] = {
id,
question: "Old?",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date(baseMs + index).toISOString(),
votes: {},
};
}
await fs.promises.writeFile(filePath, `${JSON.stringify({ version: 1, polls: pollRows })}\n`);
const store = createMSTeamsPollStoreState({ stateDir });
await expect(store.getPoll("poll-recent")).resolves.toMatchObject({ id: "poll-recent" });
await expect(store.getPoll("poll-0000")).resolves.toBeNull();
});
it("deletes vote buckets when pruning over the poll cap", async () => {
const stateDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-polls-"));
const env = { ...process.env, OPENCLAW_STATE_DIR: stateDir };
const metadataStore = createPluginStateKeyedStoreForTests<Omit<MSTeamsPoll, "votes">>(
"msteams",
{
namespace: "polls",
maxEntries: 2000,
env,
},
);
const voteBucketStore = createPluginStateKeyedStoreForTests<{
pollId: string;
bucket: string;
votes: Record<string, string[]>;
updatedAt: string;
}>("msteams", {
namespace: "poll-vote-buckets",
maxEntries: 32_032,
env,
});
const pollStateKey = (pollId: string) =>
crypto.createHash("sha256").update(pollId).digest("hex");
const voteBucket = (pollId: string, voterId: string) => {
const hash = crypto
.createHash("sha256")
.update(pollId)
.update("\0")
.update(voterId)
.digest("hex");
return String(Number.parseInt(hash.slice(0, 8), 16) % 32).padStart(4, "0");
};
const baseMs = Date.now() - 60_000;
const oldPollId = "poll-old";
for (const [index, id] of [
oldPollId,
...Array.from({ length: 999 }, (_, entryIndex) => `poll-existing-${entryIndex}`),
].entries()) {
await metadataStore.register(pollStateKey(id), {
id,
question: "Pick",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date(baseMs + index).toISOString(),
});
}
const oldBucket = voteBucket(oldPollId, "user-old");
await voteBucketStore.register(`${pollStateKey(oldPollId)}:${oldBucket}`, {
pollId: oldPollId,
bucket: oldBucket,
votes: { "user-old": ["0"] },
updatedAt: new Date(baseMs).toISOString(),
});
const store = createMSTeamsPollStoreState({ env });
await store.createPoll({
id: "poll-new",
question: "New?",
options: ["A", "B"],
maxSelections: 1,
createdAt: new Date(baseMs + 2_000_000).toISOString(),
votes: { "user-new": ["1"] },
});
await expect(store.getPoll(oldPollId)).resolves.toBeNull();
const buckets = await voteBucketStore.entries();
expect(buckets.some((row) => row.value.pollId === oldPollId)).toBe(false);
expect(buckets.some((row) => row.value.pollId === "poll-new")).toBe(true);
});
});
describe("memory poll store", () => {
it("reads seeded polls back, updates timestamps, and returns null for missing polls", async () => {
const store = createMSTeamsPollStoreMemory([

View File

@@ -1,4 +1,5 @@
import crypto from "node:crypto";
import fs from "node:fs/promises";
import { parseStrictNonNegativeInteger } from "openclaw/plugin-sdk/number-runtime";
import {
isRecord,
@@ -6,8 +7,14 @@ import {
normalizeStringEntries,
uniqueStrings,
} from "openclaw/plugin-sdk/string-coerce-runtime";
import { getMSTeamsRuntime } from "./runtime.js";
import {
resolveMSTeamsSqliteStateEnv,
toPluginJsonValue,
withMSTeamsSqliteMutationLock,
} from "./sqlite-state.js";
import { resolveMSTeamsStorePath } from "./storage.js";
import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js";
import { readJsonFile } from "./store-fs.js";
type MSTeamsPollVote = {
pollId: string;
@@ -50,9 +57,27 @@ type PollStoreData = {
polls: Record<string, MSTeamsPoll>;
};
type StoredMSTeamsPoll = Omit<MSTeamsPoll, "votes">;
type StoredMSTeamsPollVoteBucket = {
pollId: string;
bucket: string;
votes: Record<string, string[]>;
updatedAt: string;
};
const STORE_FILENAME = "msteams-polls.json";
const POLLS_NAMESPACE = "polls";
const POLL_VOTE_BUCKETS_NAMESPACE = "poll-vote-buckets";
const POLL_MIGRATIONS_NAMESPACE = "poll-migrations";
const LEGACY_POLLS_MIGRATION_KEY = "msteams-polls-json-v1";
const MAX_POLLS = 1000;
const SQLITE_MAX_POLL_ROWS = MAX_POLLS + 1000;
// Keep worst-case retained vote buckets below plugin-state's per-plugin live row cap.
const POLL_VOTE_BUCKET_COUNT = 32;
const MAX_POLL_VOTE_BUCKET_ROWS = (MAX_POLLS + 1) * POLL_VOTE_BUCKET_COUNT;
const POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000;
const POLL_LOCK_FILENAME = "msteams-polls.sqlite.lock";
function normalizeChoiceValue(value: unknown): string | null {
if (typeof value === "string") {
@@ -216,13 +241,41 @@ export function buildMSTeamsPollCard(params: {
};
}
type MSTeamsPollStoreFsOptions = {
type MSTeamsPollStoreStateOptions = {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
stateDir?: string;
storePath?: string;
};
type PollMigrationMarker = {
importedAt: string;
};
function createPollStateStore(params?: MSTeamsPollStoreStateOptions) {
return getMSTeamsRuntime().state.openKeyedStore<StoredMSTeamsPoll>({
namespace: POLLS_NAMESPACE,
maxEntries: SQLITE_MAX_POLL_ROWS,
env: resolveMSTeamsSqliteStateEnv(params),
});
}
function createPollVoteBucketStateStore(params?: MSTeamsPollStoreStateOptions) {
return getMSTeamsRuntime().state.openKeyedStore<StoredMSTeamsPollVoteBucket>({
namespace: POLL_VOTE_BUCKETS_NAMESPACE,
maxEntries: MAX_POLL_VOTE_BUCKET_ROWS,
env: resolveMSTeamsSqliteStateEnv(params),
});
}
function createPollMigrationStore(params?: MSTeamsPollStoreStateOptions) {
return getMSTeamsRuntime().state.openKeyedStore<PollMigrationMarker>({
namespace: POLL_MIGRATIONS_NAMESPACE,
maxEntries: 100,
env: resolveMSTeamsSqliteStateEnv(params),
});
}
function parseTimestamp(value?: string): number | null {
if (!value) {
return null;
@@ -231,7 +284,9 @@ function parseTimestamp(value?: string): number | null {
return Number.isFinite(parsed) ? parsed : null;
}
function pruneExpired(polls: Record<string, MSTeamsPoll>) {
function pruneExpired<T extends { createdAt: string; updatedAt?: string }>(
polls: Record<string, T>,
) {
const cutoff = Date.now() - POLL_TTL_MS;
const entries = Object.entries(polls).filter(([, poll]) => {
const ts = parseTimestamp(poll.updatedAt ?? poll.createdAt) ?? 0;
@@ -240,18 +295,17 @@ function pruneExpired(polls: Record<string, MSTeamsPoll>) {
return Object.fromEntries(entries);
}
function pruneToLimit(polls: Record<string, MSTeamsPoll>) {
const entries = Object.entries(polls);
if (entries.length <= MAX_POLLS) {
return polls;
function selectRetainedPolls(polls: Record<string, MSTeamsPoll>): Array<[string, MSTeamsPoll]> {
const retained = Object.entries(pruneExpired(polls));
if (retained.length <= MAX_POLLS) {
return retained;
}
entries.sort((a, b) => {
retained.sort((a, b) => {
const aTs = parseTimestamp(a[1].updatedAt ?? a[1].createdAt) ?? 0;
const bTs = parseTimestamp(b[1].updatedAt ?? b[1].createdAt) ?? 0;
return aTs - bTs;
return aTs - bTs || a[0].localeCompare(b[0]);
});
const keep = entries.slice(entries.length - MAX_POLLS);
return Object.fromEntries(keep);
return retained.slice(retained.length - MAX_POLLS);
}
export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: string[]) {
@@ -265,54 +319,227 @@ export function normalizeMSTeamsPollSelections(poll: MSTeamsPoll, selections: st
return uniqueStrings(limited);
}
export function createMSTeamsPollStoreFs(params?: MSTeamsPollStoreFsOptions): MSTeamsPollStore {
const filePath = resolveMSTeamsStorePath({
export function createMSTeamsPollStoreState(
params?: MSTeamsPollStoreStateOptions,
): MSTeamsPollStore {
const pollStore = createPollStateStore(params);
const voteBucketStore = createPollVoteBucketStateStore(params);
const migrationStore = createPollMigrationStore(params);
const legacyStorePath = resolveMSTeamsStorePath({
filename: STORE_FILENAME,
env: params?.env,
homedir: params?.homedir,
stateDir: params?.stateDir,
storePath: params?.storePath,
});
const empty: PollStoreData = { version: 1, polls: {} };
let legacyImportPromise: Promise<void> | null = null;
const readStore = async (): Promise<PollStoreData> => {
const { value } = await readJsonFile(filePath, empty);
const pruned = pruneToLimit(pruneExpired(value.polls ?? {}));
return { version: 1, polls: pruned };
const splitPoll = (
poll: MSTeamsPoll,
): { metadata: StoredMSTeamsPoll; votes: MSTeamsPoll["votes"] } => {
const { votes, ...metadata } = poll;
return { metadata, votes };
};
const writeStore = async (data: PollStoreData) => {
await writeJsonFile(filePath, data);
const hashVote = (pollId: string, voterId: string): string => {
return crypto.createHash("sha256").update(pollId).update("\0").update(voterId).digest("hex");
};
const buildPollStateKey = (pollId: string): string => {
return crypto.createHash("sha256").update(pollId).digest("hex");
};
const selectVoteBucket = (pollId: string, voterId: string): string => {
const bucket = Number.parseInt(hashVote(pollId, voterId).slice(0, 8), 16);
return String(bucket % POLL_VOTE_BUCKET_COUNT).padStart(4, "0");
};
const buildVoteBucketKey = (pollId: string, bucket: string): string => {
const pollDigest = crypto.createHash("sha256").update(pollId).digest("hex");
return `${pollDigest}:${bucket}`;
};
const readPollVotes = async (pollId: string): Promise<Record<string, string[]>> => {
const votes: Record<string, string[]> = {};
for (const row of await voteBucketStore.entries()) {
if (row.value.pollId === pollId) {
Object.assign(votes, row.value.votes);
}
}
return votes;
};
const deletePollVotes = async (pollId: string): Promise<void> => {
for (const row of await voteBucketStore.entries()) {
if (row.value.pollId === pollId) {
await voteBucketStore.delete(row.key);
}
}
};
const registerPollVotes = async (
pollId: string,
votes: Record<string, string[]>,
updatedAt: string,
): Promise<void> => {
const buckets = new Map<string, Record<string, string[]>>();
for (const [voterId, selections] of Object.entries(votes)) {
const bucket = selectVoteBucket(pollId, voterId);
const bucketVotes = buckets.get(bucket) ?? {};
bucketVotes[voterId] = selections;
buckets.set(bucket, bucketVotes);
}
for (const [bucket, bucketVotes] of buckets) {
const key = buildVoteBucketKey(pollId, bucket);
const existing = await voteBucketStore.lookup(key);
await voteBucketStore.register(
key,
toPluginJsonValue({
pollId,
bucket,
votes: { ...bucketVotes, ...existing?.votes },
updatedAt,
}),
);
}
};
const registerPollVote = async (
pollId: string,
voterId: string,
selections: string[],
updatedAt: string,
): Promise<void> => {
const bucket = selectVoteBucket(pollId, voterId);
const key = buildVoteBucketKey(pollId, bucket);
const existing = await voteBucketStore.lookup(key);
await voteBucketStore.register(
key,
toPluginJsonValue({
pollId,
bucket,
votes: { ...existing?.votes, [voterId]: selections },
updatedAt,
}),
);
};
const reconstructPoll = async (metadata: StoredMSTeamsPoll): Promise<MSTeamsPoll> => {
return { ...metadata, votes: await readPollVotes(metadata.id) };
};
const importLegacyStore = async (): Promise<void> => {
if (await migrationStore.lookup(LEGACY_POLLS_MIGRATION_KEY)) {
return;
}
const empty: PollStoreData = { version: 1, polls: {} };
const { value, exists } = await readJsonFile<PollStoreData>(legacyStorePath, empty);
if (!exists) {
await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
return;
}
const legacyPolls =
value.version === 1 &&
value.polls &&
typeof value.polls === "object" &&
!Array.isArray(value.polls)
? value.polls
: {};
for (const [pollId, poll] of selectRetainedPolls(legacyPolls)) {
if (!pollId) {
continue;
}
const { metadata, votes } = splitPoll(poll);
await pollStore.registerIfAbsent(buildPollStateKey(pollId), toPluginJsonValue(metadata));
await registerPollVotes(pollId, votes, poll.updatedAt ?? poll.createdAt);
}
await migrationStore.register(LEGACY_POLLS_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
await fs.rm(legacyStorePath, { force: true }).catch(() => {});
};
const ensureLegacyImported = async (): Promise<void> => {
legacyImportPromise ??= withMSTeamsSqliteMutationLock(
params,
POLL_LOCK_FILENAME,
importLegacyStore,
);
await legacyImportPromise;
};
const prunePollStoreToLimit = async (): Promise<void> => {
const rows = [];
for (const row of await pollStore.entries()) {
if (!pruneExpired({ [row.key]: row.value })[row.key]) {
await pollStore.delete(row.key);
await deletePollVotes(row.value.id);
continue;
}
rows.push(row);
}
if (rows.length <= MAX_POLLS) {
return;
}
const sorted = rows.toSorted((a, b) => {
const aTs = parseTimestamp(a.value.updatedAt ?? a.value.createdAt) ?? 0;
const bTs = parseTimestamp(b.value.updatedAt ?? b.value.createdAt) ?? 0;
return aTs - bTs || a.key.localeCompare(b.key);
});
for (const row of sorted.slice(0, rows.length - MAX_POLLS)) {
await pollStore.delete(row.key);
await deletePollVotes(row.value.id);
}
};
const createPoll = async (poll: MSTeamsPoll) => {
await withFileLock(filePath, empty, async () => {
const data = await readStore();
data.polls[poll.id] = poll;
await writeStore({ version: 1, polls: pruneToLimit(data.polls) });
await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => {
await importLegacyStore();
const { metadata, votes } = splitPoll(poll);
await pollStore.register(buildPollStateKey(poll.id), toPluginJsonValue(metadata));
await deletePollVotes(poll.id);
await registerPollVotes(poll.id, votes, poll.updatedAt ?? poll.createdAt);
await prunePollStoreToLimit();
});
};
const getPoll = async (pollId: string) =>
await withFileLock(filePath, empty, async () => {
const data = await readStore();
return data.polls[pollId] ?? null;
});
const getPoll = async (pollId: string) => {
await ensureLegacyImported();
const poll = await pollStore.lookup(buildPollStateKey(pollId));
if (!poll) {
return null;
}
if (!pruneExpired({ [pollId]: poll })[pollId]) {
return null;
}
return await reconstructPoll(poll);
};
const recordVote = async (params: { pollId: string; voterId: string; selections: string[] }) =>
await withFileLock(filePath, empty, async () => {
const data = await readStore();
const poll = data.polls[params.pollId];
const recordVote = async (vote: { pollId: string; voterId: string; selections: string[] }) => {
return await withMSTeamsSqliteMutationLock(params, POLL_LOCK_FILENAME, async () => {
await importLegacyStore();
const pollKey = buildPollStateKey(vote.pollId);
const poll = await pollStore.lookup(pollKey);
if (!poll) {
return null;
}
const normalized = normalizeMSTeamsPollSelections(poll, params.selections);
poll.votes[params.voterId] = normalized;
poll.updatedAt = new Date().toISOString();
data.polls[poll.id] = poll;
await writeStore({ version: 1, polls: pruneToLimit(data.polls) });
return poll;
if (!pruneExpired({ [vote.pollId]: poll })[vote.pollId]) {
await pollStore.delete(pollKey);
await deletePollVotes(vote.pollId);
return null;
}
const currentPoll = await reconstructPoll(poll);
const normalized = normalizeMSTeamsPollSelections(currentPoll, vote.selections);
const updatedAt = new Date().toISOString();
poll.updatedAt = updatedAt;
await pollStore.register(pollKey, toPluginJsonValue(poll));
await registerPollVote(vote.pollId, vote.voterId, normalized, updatedAt);
await prunePollStoreToLimit();
return await reconstructPoll(poll);
});
};
return { createPoll, getPoll, recordVote };
}

View File

@@ -20,8 +20,8 @@ const sendContextMockState = vi.hoisted(() => {
};
});
vi.mock("./conversation-store-fs.js", () => ({
createMSTeamsConversationStoreFs: () => sendContextMockState.store,
vi.mock("./conversation-store-state.js", () => ({
createMSTeamsConversationStoreState: () => sendContextMockState.store,
}));
vi.mock("./runtime.js", () => ({

View File

@@ -17,7 +17,7 @@ import {
validateMSTeamsProactiveServiceUrlBoundary,
type MSTeamsSdkCloudOptions,
} from "./cloud.js";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import { createMSTeamsConversationStoreState } from "./conversation-store-state.js";
import type {
MSTeamsConversationStore,
StoredConversationReference,
@@ -159,7 +159,7 @@ export async function resolveMSTeamsSendContext(params: {
throw new Error("msteams credentials not configured");
}
const store = createMSTeamsConversationStoreFs();
const store = createMSTeamsConversationStoreState();
// Parse recipient and find conversation reference
const recipient = parseRecipient(params.to);

View File

@@ -0,0 +1,89 @@
import path from "node:path";
import { getMSTeamsRuntime } from "./runtime.js";
import { withFileLock } from "./store-fs.js";
export type MSTeamsSqliteStateOptions = {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
stateDir?: string;
storePath?: string;
};
function resolveStateDirOverride(
options: MSTeamsSqliteStateOptions | undefined,
): string | undefined {
if (!options) {
return undefined;
}
if (options.stateDir) {
return options.stateDir;
}
if (options.storePath) {
return path.dirname(options.storePath);
}
if (options.homedir) {
return getMSTeamsRuntime().state.resolveStateDir(options.env ?? process.env, options.homedir);
}
return options.env?.OPENCLAW_STATE_DIR?.trim() || undefined;
}
export function resolveMSTeamsSqliteStateEnv(
options: MSTeamsSqliteStateOptions | undefined,
): NodeJS.ProcessEnv | undefined {
const stateDir = resolveStateDirOverride(options);
if (!stateDir) {
return options?.env;
}
return {
...(options?.env ?? process.env),
OPENCLAW_STATE_DIR: stateDir,
};
}
export function toPluginJsonValue<T>(value: T): T {
return JSON.parse(JSON.stringify(value)) as T;
}
export function resolveMSTeamsSqliteStateDir(
options: MSTeamsSqliteStateOptions | undefined,
): string {
return (
resolveStateDirOverride(options) ??
getMSTeamsRuntime().state.resolveStateDir(options?.env ?? process.env, options?.homedir)
);
}
const sqliteMutationLocks = new Map<string, Promise<unknown>>();
async function withProcessMutationLock<T>(lockPath: string, fn: () => Promise<T>): Promise<T> {
const previous = sqliteMutationLocks.get(lockPath) ?? Promise.resolve();
let release: () => void = () => {};
const next = new Promise<void>((resolve) => {
release = resolve;
});
const chained = previous.then(
() => next,
() => next,
);
sqliteMutationLocks.set(lockPath, chained);
await previous.catch(() => undefined);
try {
return await fn();
} finally {
release();
if (sqliteMutationLocks.get(lockPath) === chained) {
sqliteMutationLocks.delete(lockPath);
}
}
}
export async function withMSTeamsSqliteMutationLock<T>(
options: MSTeamsSqliteStateOptions | undefined,
lockFilename: string,
fn: () => Promise<T>,
): Promise<T> {
const lockPath = path.join(resolveMSTeamsSqliteStateDir(options), lockFilename);
return await withProcessMutationLock(lockPath, async () => {
return await withFileLock(lockPath, { version: 1 }, fn);
});
}

View File

@@ -1,9 +1,13 @@
import os from "node:os";
import path from "node:path";
import type { PluginRuntime } from "../runtime-api.js";
import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime";
import { createPluginStateKeyedStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
import type { PluginRuntime } from "../../runtime-api.js";
export const msteamsRuntimeStub = {
state: {
openKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateKeyedStoreForTests("msteams", options),
resolveStateDir: (env: NodeJS.ProcessEnv = process.env, homedir?: () => string) => {
const override = env.OPENCLAW_STATE_DIR?.trim() || env.OPENCLAW_STATE_DIR?.trim();
if (override) {