diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index 31e79eec34f..4ec5fe6c8cb 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -9,6 +9,7 @@ import { HARD_MAX_TOOL_RESULT_CHARS, truncateToolResultMessage, } from "./pi-embedded-runner/tool-result-truncation.js"; +import { createPendingToolCallState } from "./session-tool-result-state.js"; import { makeMissingToolResult, sanitizeToolCallInputs } from "./session-transcript-repair.js"; import { extractToolCallsFromAssistant, extractToolResultId } from "./tool-call-id.js"; @@ -106,7 +107,7 @@ export function installSessionToolResultGuard( getPendingIds: () => string[]; } { const originalAppend = sessionManager.appendMessage.bind(sessionManager); - const pending = new Map(); + const pendingState = createPendingToolCallState(); const persistMessage = (message: AgentMessage) => { const transformer = opts?.transformMessageForPersistence; return transformer ? transformer(message) : message; @@ -142,11 +143,11 @@ export function installSessionToolResultGuard( }; const flushPendingToolResults = () => { - if (pending.size === 0) { + if (pendingState.size() === 0) { return; } if (allowSyntheticToolResults) { - for (const [id, name] of pending.entries()) { + for (const [id, name] of pendingState.entries()) { const synthetic = makeMissingToolResult({ toolCallId: id, toolName: name }); const flushed = applyBeforeWriteHook( persistToolResult(persistMessage(synthetic), { @@ -160,7 +161,7 @@ export function installSessionToolResultGuard( } } } - pending.clear(); + pendingState.clear(); }; const guardedAppend = (message: AgentMessage) => { @@ -171,7 +172,7 @@ export function installSessionToolResultGuard( allowedToolNames: opts?.allowedToolNames, }); if (sanitized.length === 0) { - if (pending.size > 0) { + if (pendingState.shouldFlushForSanitizedDrop()) { flushPendingToolResults(); } return undefined; @@ -182,9 +183,9 @@ export function installSessionToolResultGuard( if (nextRole === "toolResult") { const id = extractToolResultId(nextMessage as Extract); - const toolName = id ? pending.get(id) : undefined; + const toolName = id ? pendingState.getToolName(id) : undefined; if (id) { - pending.delete(id); + pendingState.delete(id); } const normalizedToolResult = normalizePersistedToolResultName(nextMessage, toolName); // Apply hard size cap before persistence to prevent oversized tool results @@ -221,11 +222,11 @@ export function installSessionToolResultGuard( // synthetic results (e.g. OpenAI) accumulate stale pending state when a user message // interrupts in-flight tool calls, leaving orphaned tool_use blocks in the transcript // that cause API 400 errors on subsequent requests. - if (pending.size > 0 && (toolCalls.length === 0 || nextRole !== "assistant")) { + if (pendingState.shouldFlushBeforeNonToolResult(nextRole, toolCalls.length)) { flushPendingToolResults(); } // If new tool calls arrive while older ones are pending, flush the old ones first. - if (pending.size > 0 && toolCalls.length > 0) { + if (pendingState.shouldFlushBeforeNewToolCalls(toolCalls.length)) { flushPendingToolResults(); } @@ -243,9 +244,7 @@ export function installSessionToolResultGuard( } if (toolCalls.length > 0) { - for (const call of toolCalls) { - pending.set(call.id, call.name); - } + pendingState.trackToolCalls(toolCalls); } return result; @@ -256,6 +255,6 @@ export function installSessionToolResultGuard( return { flushPendingToolResults, - getPendingIds: () => Array.from(pending.keys()), + getPendingIds: pendingState.getPendingIds, }; } diff --git a/src/agents/session-tool-result-state.ts b/src/agents/session-tool-result-state.ts new file mode 100644 index 00000000000..430883e691b --- /dev/null +++ b/src/agents/session-tool-result-state.ts @@ -0,0 +1,40 @@ +export type PendingToolCall = { id: string; name?: string }; + +export type PendingToolCallState = { + size: () => number; + entries: () => IterableIterator<[string, string | undefined]>; + getToolName: (id: string) => string | undefined; + delete: (id: string) => void; + clear: () => void; + trackToolCalls: (calls: PendingToolCall[]) => void; + getPendingIds: () => string[]; + shouldFlushForSanitizedDrop: () => boolean; + shouldFlushBeforeNonToolResult: (nextRole: unknown, toolCallCount: number) => boolean; + shouldFlushBeforeNewToolCalls: (toolCallCount: number) => boolean; +}; + +export function createPendingToolCallState(): PendingToolCallState { + const pending = new Map(); + + return { + size: () => pending.size, + entries: () => pending.entries(), + getToolName: (id: string) => pending.get(id), + delete: (id: string) => { + pending.delete(id); + }, + clear: () => { + pending.clear(); + }, + trackToolCalls: (calls: PendingToolCall[]) => { + for (const call of calls) { + pending.set(call.id, call.name); + } + }, + getPendingIds: () => Array.from(pending.keys()), + shouldFlushForSanitizedDrop: () => pending.size > 0, + shouldFlushBeforeNonToolResult: (nextRole: unknown, toolCallCount: number) => + pending.size > 0 && (toolCallCount === 0 || nextRole !== "assistant"), + shouldFlushBeforeNewToolCalls: (toolCallCount: number) => pending.size > 0 && toolCallCount > 0, + }; +} diff --git a/src/gateway/config-reload-plan.ts b/src/gateway/config-reload-plan.ts new file mode 100644 index 00000000000..1af87d25020 --- /dev/null +++ b/src/gateway/config-reload-plan.ts @@ -0,0 +1,210 @@ +import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; +import { getActivePluginRegistry } from "../plugins/runtime.js"; + +export type ChannelKind = ChannelId; + +export type GatewayReloadPlan = { + changedPaths: string[]; + restartGateway: boolean; + restartReasons: string[]; + hotReasons: string[]; + reloadHooks: boolean; + restartGmailWatcher: boolean; + restartBrowserControl: boolean; + restartCron: boolean; + restartHeartbeat: boolean; + restartHealthMonitor: boolean; + restartChannels: Set; + noopPaths: string[]; +}; + +type ReloadRule = { + prefix: string; + kind: "restart" | "hot" | "none"; + actions?: ReloadAction[]; +}; + +type ReloadAction = + | "reload-hooks" + | "restart-gmail-watcher" + | "restart-browser-control" + | "restart-cron" + | "restart-heartbeat" + | "restart-health-monitor" + | `restart-channel:${ChannelId}`; + +const BASE_RELOAD_RULES: ReloadRule[] = [ + { prefix: "gateway.remote", kind: "none" }, + { prefix: "gateway.reload", kind: "none" }, + { + prefix: "gateway.channelHealthCheckMinutes", + kind: "hot", + actions: ["restart-health-monitor"], + }, + // Stuck-session warning threshold is read by the diagnostics heartbeat loop. + { prefix: "diagnostics.stuckSessionWarnMs", kind: "none" }, + { prefix: "hooks.gmail", kind: "hot", actions: ["restart-gmail-watcher"] }, + { prefix: "hooks", kind: "hot", actions: ["reload-hooks"] }, + { + prefix: "agents.defaults.heartbeat", + kind: "hot", + actions: ["restart-heartbeat"], + }, + { + prefix: "agents.defaults.model", + kind: "hot", + actions: ["restart-heartbeat"], + }, + { + prefix: "models", + kind: "hot", + actions: ["restart-heartbeat"], + }, + { prefix: "agent.heartbeat", kind: "hot", actions: ["restart-heartbeat"] }, + { prefix: "cron", kind: "hot", actions: ["restart-cron"] }, + { + prefix: "browser", + kind: "hot", + actions: ["restart-browser-control"], + }, +]; + +const BASE_RELOAD_RULES_TAIL: ReloadRule[] = [ + { prefix: "meta", kind: "none" }, + { prefix: "identity", kind: "none" }, + { prefix: "wizard", kind: "none" }, + { prefix: "logging", kind: "none" }, + { prefix: "agents", kind: "none" }, + { prefix: "tools", kind: "none" }, + { prefix: "bindings", kind: "none" }, + { prefix: "audio", kind: "none" }, + { prefix: "agent", kind: "none" }, + { prefix: "routing", kind: "none" }, + { prefix: "messages", kind: "none" }, + { prefix: "session", kind: "none" }, + { prefix: "talk", kind: "none" }, + { prefix: "skills", kind: "none" }, + { prefix: "secrets", kind: "none" }, + { prefix: "plugins", kind: "restart" }, + { prefix: "ui", kind: "none" }, + { prefix: "gateway", kind: "restart" }, + { prefix: "discovery", kind: "restart" }, + { prefix: "canvasHost", kind: "restart" }, +]; + +let cachedReloadRules: ReloadRule[] | null = null; +let cachedRegistry: ReturnType | null = null; + +function listReloadRules(): ReloadRule[] { + const registry = getActivePluginRegistry(); + if (registry !== cachedRegistry) { + cachedReloadRules = null; + cachedRegistry = registry; + } + if (cachedReloadRules) { + return cachedReloadRules; + } + // Channel docking: plugins contribute hot reload/no-op prefixes here. + const channelReloadRules: ReloadRule[] = listChannelPlugins().flatMap((plugin) => [ + ...(plugin.reload?.configPrefixes ?? []).map( + (prefix): ReloadRule => ({ + prefix, + kind: "hot", + actions: [`restart-channel:${plugin.id}` as ReloadAction], + }), + ), + ...(plugin.reload?.noopPrefixes ?? []).map( + (prefix): ReloadRule => ({ + prefix, + kind: "none", + }), + ), + ]); + const rules = [...BASE_RELOAD_RULES, ...channelReloadRules, ...BASE_RELOAD_RULES_TAIL]; + cachedReloadRules = rules; + return rules; +} + +function matchRule(path: string): ReloadRule | null { + for (const rule of listReloadRules()) { + if (path === rule.prefix || path.startsWith(`${rule.prefix}.`)) { + return rule; + } + } + return null; +} + +export function buildGatewayReloadPlan(changedPaths: string[]): GatewayReloadPlan { + const plan: GatewayReloadPlan = { + changedPaths, + restartGateway: false, + restartReasons: [], + hotReasons: [], + reloadHooks: false, + restartGmailWatcher: false, + restartBrowserControl: false, + restartCron: false, + restartHeartbeat: false, + restartHealthMonitor: false, + restartChannels: new Set(), + noopPaths: [], + }; + + const applyAction = (action: ReloadAction) => { + if (action.startsWith("restart-channel:")) { + const channel = action.slice("restart-channel:".length) as ChannelId; + plan.restartChannels.add(channel); + return; + } + switch (action) { + case "reload-hooks": + plan.reloadHooks = true; + break; + case "restart-gmail-watcher": + plan.restartGmailWatcher = true; + break; + case "restart-browser-control": + plan.restartBrowserControl = true; + break; + case "restart-cron": + plan.restartCron = true; + break; + case "restart-heartbeat": + plan.restartHeartbeat = true; + break; + case "restart-health-monitor": + plan.restartHealthMonitor = true; + break; + default: + break; + } + }; + + for (const path of changedPaths) { + const rule = matchRule(path); + if (!rule) { + plan.restartGateway = true; + plan.restartReasons.push(path); + continue; + } + if (rule.kind === "restart") { + plan.restartGateway = true; + plan.restartReasons.push(path); + continue; + } + if (rule.kind === "none") { + plan.noopPaths.push(path); + continue; + } + plan.hotReasons.push(path); + for (const action of rule.actions ?? []) { + applyAction(action); + } + } + + if (plan.restartGmailWatcher) { + plan.reloadHooks = true; + } + + return plan; +} diff --git a/src/gateway/config-reload.test.ts b/src/gateway/config-reload.test.ts index c3261bec976..e45347b0040 100644 --- a/src/gateway/config-reload.test.ts +++ b/src/gateway/config-reload.test.ts @@ -188,6 +188,53 @@ describe("buildGatewayReloadPlan", () => { const plan = buildGatewayReloadPlan(["unknownField"]); expect(plan.restartGateway).toBe(true); }); + + it.each([ + { + path: "gateway.channelHealthCheckMinutes", + expectRestartGateway: false, + expectHotPath: "gateway.channelHealthCheckMinutes", + expectRestartHealthMonitor: true, + }, + { + path: "hooks.gmail.account", + expectRestartGateway: false, + expectHotPath: "hooks.gmail.account", + expectRestartGmailWatcher: true, + expectReloadHooks: true, + }, + { + path: "gateway.remote.url", + expectRestartGateway: false, + expectNoopPath: "gateway.remote.url", + }, + { + path: "unknownField", + expectRestartGateway: true, + expectRestartReason: "unknownField", + }, + ])("classifies reload path: $path", (testCase) => { + const plan = buildGatewayReloadPlan([testCase.path]); + expect(plan.restartGateway).toBe(testCase.expectRestartGateway); + if (testCase.expectHotPath) { + expect(plan.hotReasons).toContain(testCase.expectHotPath); + } + if (testCase.expectNoopPath) { + expect(plan.noopPaths).toContain(testCase.expectNoopPath); + } + if (testCase.expectRestartReason) { + expect(plan.restartReasons).toContain(testCase.expectRestartReason); + } + if (testCase.expectRestartHealthMonitor) { + expect(plan.restartHealthMonitor).toBe(true); + } + if (testCase.expectRestartGmailWatcher) { + expect(plan.restartGmailWatcher).toBe(true); + } + if (testCase.expectReloadHooks) { + expect(plan.reloadHooks).toBe(true); + } + }); }); describe("resolveGatewayReloadSettings", () => { diff --git a/src/gateway/config-reload.ts b/src/gateway/config-reload.ts index bd496cbafd9..dfd9ebdaf93 100644 --- a/src/gateway/config-reload.ts +++ b/src/gateway/config-reload.ts @@ -1,47 +1,17 @@ import { isDeepStrictEqual } from "node:util"; import chokidar from "chokidar"; -import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import type { OpenClawConfig, ConfigFileSnapshot, GatewayReloadMode } from "../config/config.js"; -import { getActivePluginRegistry } from "../plugins/runtime.js"; import { isPlainObject } from "../utils.js"; +import { buildGatewayReloadPlan, type GatewayReloadPlan } from "./config-reload-plan.js"; + +export { buildGatewayReloadPlan }; +export type { GatewayReloadPlan } from "./config-reload-plan.js"; export type GatewayReloadSettings = { mode: GatewayReloadMode; debounceMs: number; }; -export type ChannelKind = ChannelId; - -export type GatewayReloadPlan = { - changedPaths: string[]; - restartGateway: boolean; - restartReasons: string[]; - hotReasons: string[]; - reloadHooks: boolean; - restartGmailWatcher: boolean; - restartBrowserControl: boolean; - restartCron: boolean; - restartHeartbeat: boolean; - restartHealthMonitor: boolean; - restartChannels: Set; - noopPaths: string[]; -}; - -type ReloadRule = { - prefix: string; - kind: "restart" | "hot" | "none"; - actions?: ReloadAction[]; -}; - -type ReloadAction = - | "reload-hooks" - | "restart-gmail-watcher" - | "restart-browser-control" - | "restart-cron" - | "restart-heartbeat" - | "restart-health-monitor" - | `restart-channel:${ChannelId}`; - const DEFAULT_RELOAD_SETTINGS: GatewayReloadSettings = { mode: "hybrid", debounceMs: 300, @@ -49,107 +19,6 @@ const DEFAULT_RELOAD_SETTINGS: GatewayReloadSettings = { const MISSING_CONFIG_RETRY_DELAY_MS = 150; const MISSING_CONFIG_MAX_RETRIES = 2; -const BASE_RELOAD_RULES: ReloadRule[] = [ - { prefix: "gateway.remote", kind: "none" }, - { prefix: "gateway.reload", kind: "none" }, - { - prefix: "gateway.channelHealthCheckMinutes", - kind: "hot", - actions: ["restart-health-monitor"], - }, - // Stuck-session warning threshold is read by the diagnostics heartbeat loop. - { prefix: "diagnostics.stuckSessionWarnMs", kind: "none" }, - { prefix: "hooks.gmail", kind: "hot", actions: ["restart-gmail-watcher"] }, - { prefix: "hooks", kind: "hot", actions: ["reload-hooks"] }, - { - prefix: "agents.defaults.heartbeat", - kind: "hot", - actions: ["restart-heartbeat"], - }, - { - prefix: "agents.defaults.model", - kind: "hot", - actions: ["restart-heartbeat"], - }, - { - prefix: "models", - kind: "hot", - actions: ["restart-heartbeat"], - }, - { prefix: "agent.heartbeat", kind: "hot", actions: ["restart-heartbeat"] }, - { prefix: "cron", kind: "hot", actions: ["restart-cron"] }, - { - prefix: "browser", - kind: "hot", - actions: ["restart-browser-control"], - }, -]; - -const BASE_RELOAD_RULES_TAIL: ReloadRule[] = [ - { prefix: "meta", kind: "none" }, - { prefix: "identity", kind: "none" }, - { prefix: "wizard", kind: "none" }, - { prefix: "logging", kind: "none" }, - { prefix: "agents", kind: "none" }, - { prefix: "tools", kind: "none" }, - { prefix: "bindings", kind: "none" }, - { prefix: "audio", kind: "none" }, - { prefix: "agent", kind: "none" }, - { prefix: "routing", kind: "none" }, - { prefix: "messages", kind: "none" }, - { prefix: "session", kind: "none" }, - { prefix: "talk", kind: "none" }, - { prefix: "skills", kind: "none" }, - { prefix: "secrets", kind: "none" }, - { prefix: "plugins", kind: "restart" }, - { prefix: "ui", kind: "none" }, - { prefix: "gateway", kind: "restart" }, - { prefix: "discovery", kind: "restart" }, - { prefix: "canvasHost", kind: "restart" }, -]; - -let cachedReloadRules: ReloadRule[] | null = null; -let cachedRegistry: ReturnType | null = null; - -function listReloadRules(): ReloadRule[] { - const registry = getActivePluginRegistry(); - if (registry !== cachedRegistry) { - cachedReloadRules = null; - cachedRegistry = registry; - } - if (cachedReloadRules) { - return cachedReloadRules; - } - // Channel docking: plugins contribute hot reload/no-op prefixes here. - const channelReloadRules: ReloadRule[] = listChannelPlugins().flatMap((plugin) => [ - ...(plugin.reload?.configPrefixes ?? []).map( - (prefix): ReloadRule => ({ - prefix, - kind: "hot", - actions: [`restart-channel:${plugin.id}` as ReloadAction], - }), - ), - ...(plugin.reload?.noopPrefixes ?? []).map( - (prefix): ReloadRule => ({ - prefix, - kind: "none", - }), - ), - ]); - const rules = [...BASE_RELOAD_RULES, ...channelReloadRules, ...BASE_RELOAD_RULES_TAIL]; - cachedReloadRules = rules; - return rules; -} - -function matchRule(path: string): ReloadRule | null { - for (const rule of listReloadRules()) { - if (path === rule.prefix || path.startsWith(`${rule.prefix}.`)) { - return rule; - } - } - return null; -} - export function diffConfigPaths(prev: unknown, next: unknown, prefix = ""): string[] { if (prev === next) { return []; @@ -195,81 +64,6 @@ export function resolveGatewayReloadSettings(cfg: OpenClawConfig): GatewayReload return { mode, debounceMs }; } -export function buildGatewayReloadPlan(changedPaths: string[]): GatewayReloadPlan { - const plan: GatewayReloadPlan = { - changedPaths, - restartGateway: false, - restartReasons: [], - hotReasons: [], - reloadHooks: false, - restartGmailWatcher: false, - restartBrowserControl: false, - restartCron: false, - restartHeartbeat: false, - restartHealthMonitor: false, - restartChannels: new Set(), - noopPaths: [], - }; - - const applyAction = (action: ReloadAction) => { - if (action.startsWith("restart-channel:")) { - const channel = action.slice("restart-channel:".length) as ChannelId; - plan.restartChannels.add(channel); - return; - } - switch (action) { - case "reload-hooks": - plan.reloadHooks = true; - break; - case "restart-gmail-watcher": - plan.restartGmailWatcher = true; - break; - case "restart-browser-control": - plan.restartBrowserControl = true; - break; - case "restart-cron": - plan.restartCron = true; - break; - case "restart-heartbeat": - plan.restartHeartbeat = true; - break; - case "restart-health-monitor": - plan.restartHealthMonitor = true; - break; - default: - break; - } - }; - - for (const path of changedPaths) { - const rule = matchRule(path); - if (!rule) { - plan.restartGateway = true; - plan.restartReasons.push(path); - continue; - } - if (rule.kind === "restart") { - plan.restartGateway = true; - plan.restartReasons.push(path); - continue; - } - if (rule.kind === "none") { - plan.noopPaths.push(path); - continue; - } - plan.hotReasons.push(path); - for (const action of rule.actions ?? []) { - applyAction(action); - } - } - - if (plan.restartGmailWatcher) { - plan.reloadHooks = true; - } - - return plan; -} - export type GatewayConfigReloader = { stop: () => Promise; }; diff --git a/src/infra/exec-allowlist-pattern.ts b/src/infra/exec-allowlist-pattern.ts new file mode 100644 index 00000000000..df05a2ae1d9 --- /dev/null +++ b/src/infra/exec-allowlist-pattern.ts @@ -0,0 +1,83 @@ +import fs from "node:fs"; +import { expandHomePrefix } from "./home-dir.js"; + +const GLOB_REGEX_CACHE_LIMIT = 512; +const globRegexCache = new Map(); + +function normalizeMatchTarget(value: string): string { + if (process.platform === "win32") { + const stripped = value.replace(/^\\\\[?.]\\/, ""); + return stripped.replace(/\\/g, "/").toLowerCase(); + } + return value.replace(/\\\\/g, "/").toLowerCase(); +} + +function tryRealpath(value: string): string | null { + try { + return fs.realpathSync(value); + } catch { + return null; + } +} + +function escapeRegExpLiteral(input: string): string { + return input.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); +} + +function compileGlobRegex(pattern: string): RegExp { + const cached = globRegexCache.get(pattern); + if (cached) { + return cached; + } + + let regex = "^"; + let i = 0; + while (i < pattern.length) { + const ch = pattern[i]; + if (ch === "*") { + const next = pattern[i + 1]; + if (next === "*") { + regex += ".*"; + i += 2; + continue; + } + regex += "[^/]*"; + i += 1; + continue; + } + if (ch === "?") { + regex += "."; + i += 1; + continue; + } + regex += escapeRegExpLiteral(ch); + i += 1; + } + regex += "$"; + + const compiled = new RegExp(regex, "i"); + if (globRegexCache.size >= GLOB_REGEX_CACHE_LIMIT) { + globRegexCache.clear(); + } + globRegexCache.set(pattern, compiled); + return compiled; +} + +export function matchesExecAllowlistPattern(pattern: string, target: string): boolean { + const trimmed = pattern.trim(); + if (!trimmed) { + return false; + } + + const expanded = trimmed.startsWith("~") ? expandHomePrefix(trimmed) : trimmed; + const hasWildcard = /[*?]/.test(expanded); + let normalizedPattern = expanded; + let normalizedTarget = target; + if (process.platform === "win32" && !hasWildcard) { + normalizedPattern = tryRealpath(expanded) ?? expanded; + normalizedTarget = tryRealpath(target) ?? target; + } + normalizedPattern = normalizeMatchTarget(normalizedPattern); + normalizedTarget = normalizeMatchTarget(normalizedTarget); + return compileGlobRegex(normalizedPattern).test(normalizedTarget); +} diff --git a/src/infra/exec-command-resolution.ts b/src/infra/exec-command-resolution.ts index 1c961059080..d87b9a264dc 100644 --- a/src/infra/exec-command-resolution.ts +++ b/src/infra/exec-command-resolution.ts @@ -1,5 +1,6 @@ import fs from "node:fs"; import path from "node:path"; +import { matchesExecAllowlistPattern } from "./exec-allowlist-pattern.js"; import type { ExecAllowlistEntry } from "./exec-approvals.js"; import { resolveDispatchWrapperExecutionPlan } from "./exec-wrapper-resolution.js"; import { resolveExecutablePath as resolveExecutableCandidatePath } from "./executable-path.js"; @@ -114,73 +115,6 @@ export function resolveCommandResolutionFromArgv( }); } -function normalizeMatchTarget(value: string): string { - if (process.platform === "win32") { - const stripped = value.replace(/^\\\\[?.]\\/, ""); - return stripped.replace(/\\/g, "/").toLowerCase(); - } - return value.replace(/\\\\/g, "/").toLowerCase(); -} - -function tryRealpath(value: string): string | null { - try { - return fs.realpathSync(value); - } catch { - return null; - } -} - -function escapeRegExpLiteral(input: string): string { - return input.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); -} - -function globToRegExp(pattern: string): RegExp { - let regex = "^"; - let i = 0; - while (i < pattern.length) { - const ch = pattern[i]; - if (ch === "*") { - const next = pattern[i + 1]; - if (next === "*") { - regex += ".*"; - i += 2; - continue; - } - regex += "[^/]*"; - i += 1; - continue; - } - if (ch === "?") { - regex += "."; - i += 1; - continue; - } - regex += escapeRegExpLiteral(ch); - i += 1; - } - regex += "$"; - return new RegExp(regex, "i"); -} - -function matchesPattern(pattern: string, target: string): boolean { - const trimmed = pattern.trim(); - if (!trimmed) { - return false; - } - const expanded = trimmed.startsWith("~") ? expandHomePrefix(trimmed) : trimmed; - const hasWildcard = /[*?]/.test(expanded); - let normalizedPattern = expanded; - let normalizedTarget = target; - if (process.platform === "win32" && !hasWildcard) { - normalizedPattern = tryRealpath(expanded) ?? expanded; - normalizedTarget = tryRealpath(target) ?? target; - } - normalizedPattern = normalizeMatchTarget(normalizedPattern); - normalizedTarget = normalizeMatchTarget(normalizedTarget); - const regex = globToRegExp(normalizedPattern); - return regex.test(normalizedTarget); -} - export function resolveAllowlistCandidatePath( resolution: CommandResolution | null, cwd?: string, @@ -233,7 +167,7 @@ export function matchAllowlist( if (!hasPath) { continue; } - if (matchesPattern(pattern, resolvedPath)) { + if (matchesExecAllowlistPattern(pattern, resolvedPath)) { return entry; } } diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index 4bf192858e1..eeb5787c5e5 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -77,13 +77,47 @@ function resolveCachedMentionRegexes( return built; } -export async function prepareSlackMessage(params: { +type SlackConversationContext = { + channelInfo: { + name?: string; + type?: SlackMessageEvent["channel_type"]; + topic?: string; + purpose?: string; + }; + channelName?: string; + resolvedChannelType: ReturnType; + isDirectMessage: boolean; + isGroupDm: boolean; + isRoom: boolean; + isRoomish: boolean; + channelConfig: ReturnType | null; + allowBots: boolean; + isBotMessage: boolean; +}; + +type SlackAuthorizationContext = { + senderId: string; + allowFromLower: string[]; +}; + +type SlackRoutingContext = { + route: ReturnType; + chatType: "direct" | "group" | "channel"; + replyToMode: ReturnType; + threadContext: ReturnType; + threadTs: string | undefined; + isThreadReply: boolean; + threadKeys: ReturnType; + sessionKey: string; + historyKey: string; +}; + +async function resolveSlackConversationContext(params: { ctx: SlackMonitorContext; account: ResolvedSlackAccount; message: SlackMessageEvent; - opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; -}): Promise { - const { ctx, account, message, opts } = params; +}): Promise { + const { ctx, account, message } = params; const cfg = ctx.cfg; let channelInfo: { @@ -107,7 +141,6 @@ export async function prepareSlackMessage(params: { const isGroupDm = resolvedChannelType === "mpim"; const isRoom = resolvedChannelType === "channel" || resolvedChannelType === "group"; const isRoomish = isRoom || isGroupDm; - const channelConfig = isRoom ? resolveSlackChannelConfig({ channelId: message.channel, @@ -117,14 +150,36 @@ export async function prepareSlackMessage(params: { defaultRequireMention: ctx.defaultRequireMention, }) : null; - const allowBots = channelConfig?.allowBots ?? account.config?.allowBots ?? cfg.channels?.slack?.allowBots ?? false; - const isBotMessage = Boolean(message.bot_id); + return { + channelInfo, + channelName, + resolvedChannelType, + isDirectMessage, + isGroupDm, + isRoom, + isRoomish, + channelConfig, + allowBots, + isBotMessage: Boolean(message.bot_id), + }; +} + +async function authorizeSlackInboundMessage(params: { + ctx: SlackMonitorContext; + account: ResolvedSlackAccount; + message: SlackMessageEvent; + conversation: SlackConversationContext; +}): Promise { + const { ctx, account, message, conversation } = params; + const { isDirectMessage, channelName, resolvedChannelType, isBotMessage, allowBots } = + conversation; + if (isBotMessage) { if (message.user && ctx.botUserId && message.user === ctx.botUserId) { return null; @@ -195,8 +250,24 @@ export async function prepareSlackMessage(params: { } } + return { + senderId, + allowFromLower, + }; +} + +function resolveSlackRoutingContext(params: { + ctx: SlackMonitorContext; + account: ResolvedSlackAccount; + message: SlackMessageEvent; + isDirectMessage: boolean; + isGroupDm: boolean; + isRoom: boolean; + isRoomish: boolean; +}): SlackRoutingContext { + const { ctx, account, message, isDirectMessage, isGroupDm, isRoom, isRoomish } = params; const route = resolveAgentRoute({ - cfg, + cfg: ctx.cfg, channel: "slack", accountId: account.accountId, teamId: ctx.teamId || undefined, @@ -206,7 +277,6 @@ export async function prepareSlackMessage(params: { }, }); - const baseSessionKey = route.sessionKey; const chatType = isDirectMessage ? "direct" : isGroupDm ? "group" : "channel"; const replyToMode = resolveSlackReplyToMode(account, chatType); const threadContext = resolveSlackThreadContext({ message, replyToMode }); @@ -224,14 +294,76 @@ export async function prepareSlackMessage(params: { ? threadTs : autoThreadId; const threadKeys = resolveThreadSessionKeys({ - baseSessionKey, + baseSessionKey: route.sessionKey, threadId: canonicalThreadId, - parentSessionKey: canonicalThreadId && ctx.threadInheritParent ? baseSessionKey : undefined, + parentSessionKey: canonicalThreadId && ctx.threadInheritParent ? route.sessionKey : undefined, }); const sessionKey = threadKeys.sessionKey; const historyKey = isThreadReply && ctx.threadHistoryScope === "thread" ? sessionKey : message.channel; + return { + route, + chatType, + replyToMode, + threadContext, + threadTs, + isThreadReply, + threadKeys, + sessionKey, + historyKey, + }; +} + +export async function prepareSlackMessage(params: { + ctx: SlackMonitorContext; + account: ResolvedSlackAccount; + message: SlackMessageEvent; + opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; +}): Promise { + const { ctx, account, message, opts } = params; + const cfg = ctx.cfg; + const conversation = await resolveSlackConversationContext({ ctx, account, message }); + const { + channelInfo, + channelName, + isDirectMessage, + isGroupDm, + isRoom, + isRoomish, + channelConfig, + isBotMessage, + } = conversation; + const authorization = await authorizeSlackInboundMessage({ + ctx, + account, + message, + conversation, + }); + if (!authorization) { + return null; + } + const { senderId, allowFromLower } = authorization; + const routing = resolveSlackRoutingContext({ + ctx, + account, + message, + isDirectMessage, + isGroupDm, + isRoom, + isRoomish, + }); + const { + route, + replyToMode, + threadContext, + threadTs, + isThreadReply, + threadKeys, + sessionKey, + historyKey, + } = routing; + const mentionRegexes = resolveCachedMentionRegexes(ctx, route.agentId); const hasAnyMention = /<@[^>]+>/.test(message.text ?? ""); const explicitlyMentioned = Boolean( diff --git a/src/web/inbound/monitor.ts b/src/web/inbound/monitor.ts index e1cd6f2981f..6dc2ce5f521 100644 --- a/src/web/inbound/monitor.ts +++ b/src/web/inbound/monitor.ts @@ -151,6 +151,249 @@ export async function monitorWebInbox(options: { } }; + type NormalizedInboundMessage = { + id?: string; + remoteJid: string; + group: boolean; + participantJid?: string; + from: string; + senderE164: string | null; + groupSubject?: string; + groupParticipants?: string[]; + messageTimestampMs?: number; + access: Awaited>; + }; + + const normalizeInboundMessage = async ( + msg: WAMessage, + ): Promise => { + const id = msg.key?.id ?? undefined; + const remoteJid = msg.key?.remoteJid; + if (!remoteJid) { + return null; + } + if (remoteJid.endsWith("@status") || remoteJid.endsWith("@broadcast")) { + return null; + } + + const group = isJidGroup(remoteJid) === true; + if (id) { + const dedupeKey = `${options.accountId}:${remoteJid}:${id}`; + if (isRecentInboundMessage(dedupeKey)) { + return null; + } + } + const participantJid = msg.key?.participant ?? undefined; + const from = group ? remoteJid : await resolveInboundJid(remoteJid); + if (!from) { + return null; + } + const senderE164 = group + ? participantJid + ? await resolveInboundJid(participantJid) + : null + : from; + + let groupSubject: string | undefined; + let groupParticipants: string[] | undefined; + if (group) { + const meta = await getGroupMeta(remoteJid); + groupSubject = meta.subject; + groupParticipants = meta.participants; + } + const messageTimestampMs = msg.messageTimestamp + ? Number(msg.messageTimestamp) * 1000 + : undefined; + + const access = await checkInboundAccessControl({ + accountId: options.accountId, + from, + selfE164, + senderE164, + group, + pushName: msg.pushName ?? undefined, + isFromMe: Boolean(msg.key?.fromMe), + messageTimestampMs, + connectedAtMs, + sock: { sendMessage: (jid, content) => sock.sendMessage(jid, content) }, + remoteJid, + }); + if (!access.allowed) { + return null; + } + + return { + id, + remoteJid, + group, + participantJid, + from, + senderE164, + groupSubject, + groupParticipants, + messageTimestampMs, + access, + }; + }; + + const maybeMarkInboundAsRead = async (inbound: NormalizedInboundMessage) => { + const { id, remoteJid, participantJid, access } = inbound; + if (id && !access.isSelfChat && options.sendReadReceipts !== false) { + try { + await sock.readMessages([{ remoteJid, id, participant: participantJid, fromMe: false }]); + if (shouldLogVerbose()) { + const suffix = participantJid ? ` (participant ${participantJid})` : ""; + logVerbose(`Marked message ${id} as read for ${remoteJid}${suffix}`); + } + } catch (err) { + logVerbose(`Failed to mark message ${id} read: ${String(err)}`); + } + } else if (id && access.isSelfChat && shouldLogVerbose()) { + // Self-chat mode: never auto-send read receipts (blue ticks) on behalf of the owner. + logVerbose(`Self-chat mode: skipping read receipt for ${id}`); + } + }; + + type EnrichedInboundMessage = { + body: string; + location?: ReturnType; + replyContext?: ReturnType; + mediaPath?: string; + mediaType?: string; + mediaFileName?: string; + }; + + const enrichInboundMessage = async (msg: WAMessage): Promise => { + const location = extractLocationData(msg.message ?? undefined); + const locationText = location ? formatLocationText(location) : undefined; + let body = extractText(msg.message ?? undefined); + if (locationText) { + body = [body, locationText].filter(Boolean).join("\n").trim(); + } + if (!body) { + body = extractMediaPlaceholder(msg.message ?? undefined); + if (!body) { + return null; + } + } + const replyContext = describeReplyContext(msg.message as proto.IMessage | undefined); + + let mediaPath: string | undefined; + let mediaType: string | undefined; + let mediaFileName: string | undefined; + try { + const inboundMedia = await downloadInboundMedia(msg as proto.IWebMessageInfo, sock); + if (inboundMedia) { + const maxMb = + typeof options.mediaMaxMb === "number" && options.mediaMaxMb > 0 + ? options.mediaMaxMb + : 50; + const maxBytes = maxMb * 1024 * 1024; + const saved = await saveMediaBuffer( + inboundMedia.buffer, + inboundMedia.mimetype, + "inbound", + maxBytes, + inboundMedia.fileName, + ); + mediaPath = saved.path; + mediaType = inboundMedia.mimetype; + mediaFileName = inboundMedia.fileName; + } + } catch (err) { + logVerbose(`Inbound media download failed: ${String(err)}`); + } + + return { + body, + location: location ?? undefined, + replyContext, + mediaPath, + mediaType, + mediaFileName, + }; + }; + + const enqueueInboundMessage = async ( + msg: WAMessage, + inbound: NormalizedInboundMessage, + enriched: EnrichedInboundMessage, + ) => { + const chatJid = inbound.remoteJid; + const sendComposing = async () => { + try { + await sock.sendPresenceUpdate("composing", chatJid); + } catch (err) { + logVerbose(`Presence update failed: ${String(err)}`); + } + }; + const reply = async (text: string) => { + await sock.sendMessage(chatJid, { text }); + }; + const sendMedia = async (payload: AnyMessageContent) => { + await sock.sendMessage(chatJid, payload); + }; + const timestamp = inbound.messageTimestampMs; + const mentionedJids = extractMentionedJids(msg.message as proto.IMessage | undefined); + const senderName = msg.pushName ?? undefined; + + inboundLogger.info( + { + from: inbound.from, + to: selfE164 ?? "me", + body: enriched.body, + mediaPath: enriched.mediaPath, + mediaType: enriched.mediaType, + mediaFileName: enriched.mediaFileName, + timestamp, + }, + "inbound message", + ); + const inboundMessage: WebInboundMessage = { + id: inbound.id, + from: inbound.from, + conversationId: inbound.from, + to: selfE164 ?? "me", + accountId: inbound.access.resolvedAccountId, + body: enriched.body, + pushName: senderName, + timestamp, + chatType: inbound.group ? "group" : "direct", + chatId: inbound.remoteJid, + senderJid: inbound.participantJid, + senderE164: inbound.senderE164 ?? undefined, + senderName, + replyToId: enriched.replyContext?.id, + replyToBody: enriched.replyContext?.body, + replyToSender: enriched.replyContext?.sender, + replyToSenderJid: enriched.replyContext?.senderJid, + replyToSenderE164: enriched.replyContext?.senderE164, + groupSubject: inbound.groupSubject, + groupParticipants: inbound.groupParticipants, + mentionedJids: mentionedJids ?? undefined, + selfJid, + selfE164, + fromMe: Boolean(msg.key?.fromMe), + location: enriched.location ?? undefined, + sendComposing, + reply, + sendMedia, + mediaPath: enriched.mediaPath, + mediaType: enriched.mediaType, + mediaFileName: enriched.mediaFileName, + }; + try { + const task = Promise.resolve(debouncer.enqueue(inboundMessage)); + void task.catch((err) => { + inboundLogger.error({ error: String(err) }, "failed handling inbound web message"); + inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`); + }); + } catch (err) { + inboundLogger.error({ error: String(err) }, "failed handling inbound web message"); + inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`); + } + }; + const handleMessagesUpsert = async (upsert: { type?: string; messages?: Array }) => { if (upsert.type !== "notify" && upsert.type !== "append") { return; @@ -161,187 +404,24 @@ export async function monitorWebInbox(options: { accountId: options.accountId, direction: "inbound", }); - const id = msg.key?.id ?? undefined; - const remoteJid = msg.key?.remoteJid; - if (!remoteJid) { - continue; - } - if (remoteJid.endsWith("@status") || remoteJid.endsWith("@broadcast")) { + const inbound = await normalizeInboundMessage(msg); + if (!inbound) { continue; } - const group = isJidGroup(remoteJid) === true; - if (id) { - const dedupeKey = `${options.accountId}:${remoteJid}:${id}`; - if (isRecentInboundMessage(dedupeKey)) { - continue; - } - } - const participantJid = msg.key?.participant ?? undefined; - const from = group ? remoteJid : await resolveInboundJid(remoteJid); - if (!from) { - continue; - } - const senderE164 = group - ? participantJid - ? await resolveInboundJid(participantJid) - : null - : from; - - let groupSubject: string | undefined; - let groupParticipants: string[] | undefined; - if (group) { - const meta = await getGroupMeta(remoteJid); - groupSubject = meta.subject; - groupParticipants = meta.participants; - } - const messageTimestampMs = msg.messageTimestamp - ? Number(msg.messageTimestamp) * 1000 - : undefined; - - const access = await checkInboundAccessControl({ - accountId: options.accountId, - from, - selfE164, - senderE164, - group, - pushName: msg.pushName ?? undefined, - isFromMe: Boolean(msg.key?.fromMe), - messageTimestampMs, - connectedAtMs, - sock: { sendMessage: (jid, content) => sock.sendMessage(jid, content) }, - remoteJid, - }); - if (!access.allowed) { - continue; - } - - if (id && !access.isSelfChat && options.sendReadReceipts !== false) { - const participant = msg.key?.participant; - try { - await sock.readMessages([{ remoteJid, id, participant, fromMe: false }]); - if (shouldLogVerbose()) { - const suffix = participant ? ` (participant ${participant})` : ""; - logVerbose(`Marked message ${id} as read for ${remoteJid}${suffix}`); - } - } catch (err) { - logVerbose(`Failed to mark message ${id} read: ${String(err)}`); - } - } else if (id && access.isSelfChat && shouldLogVerbose()) { - // Self-chat mode: never auto-send read receipts (blue ticks) on behalf of the owner. - logVerbose(`Self-chat mode: skipping read receipt for ${id}`); - } + await maybeMarkInboundAsRead(inbound); // If this is history/offline catch-up, mark read above but skip auto-reply. if (upsert.type === "append") { continue; } - const location = extractLocationData(msg.message ?? undefined); - const locationText = location ? formatLocationText(location) : undefined; - let body = extractText(msg.message ?? undefined); - if (locationText) { - body = [body, locationText].filter(Boolean).join("\n").trim(); - } - if (!body) { - body = extractMediaPlaceholder(msg.message ?? undefined); - if (!body) { - continue; - } - } - const replyContext = describeReplyContext(msg.message as proto.IMessage | undefined); - - let mediaPath: string | undefined; - let mediaType: string | undefined; - let mediaFileName: string | undefined; - try { - const inboundMedia = await downloadInboundMedia(msg as proto.IWebMessageInfo, sock); - if (inboundMedia) { - const maxMb = - typeof options.mediaMaxMb === "number" && options.mediaMaxMb > 0 - ? options.mediaMaxMb - : 50; - const maxBytes = maxMb * 1024 * 1024; - const saved = await saveMediaBuffer( - inboundMedia.buffer, - inboundMedia.mimetype, - "inbound", - maxBytes, - inboundMedia.fileName, - ); - mediaPath = saved.path; - mediaType = inboundMedia.mimetype; - mediaFileName = inboundMedia.fileName; - } - } catch (err) { - logVerbose(`Inbound media download failed: ${String(err)}`); + const enriched = await enrichInboundMessage(msg); + if (!enriched) { + continue; } - const chatJid = remoteJid; - const sendComposing = async () => { - try { - await sock.sendPresenceUpdate("composing", chatJid); - } catch (err) { - logVerbose(`Presence update failed: ${String(err)}`); - } - }; - const reply = async (text: string) => { - await sock.sendMessage(chatJid, { text }); - }; - const sendMedia = async (payload: AnyMessageContent) => { - await sock.sendMessage(chatJid, payload); - }; - const timestamp = messageTimestampMs; - const mentionedJids = extractMentionedJids(msg.message as proto.IMessage | undefined); - const senderName = msg.pushName ?? undefined; - - inboundLogger.info( - { from, to: selfE164 ?? "me", body, mediaPath, mediaType, mediaFileName, timestamp }, - "inbound message", - ); - const inboundMessage: WebInboundMessage = { - id, - from, - conversationId: from, - to: selfE164 ?? "me", - accountId: access.resolvedAccountId, - body, - pushName: senderName, - timestamp, - chatType: group ? "group" : "direct", - chatId: remoteJid, - senderJid: participantJid, - senderE164: senderE164 ?? undefined, - senderName, - replyToId: replyContext?.id, - replyToBody: replyContext?.body, - replyToSender: replyContext?.sender, - replyToSenderJid: replyContext?.senderJid, - replyToSenderE164: replyContext?.senderE164, - groupSubject, - groupParticipants, - mentionedJids: mentionedJids ?? undefined, - selfJid, - selfE164, - fromMe: Boolean(msg.key?.fromMe), - location: location ?? undefined, - sendComposing, - reply, - sendMedia, - mediaPath, - mediaType, - mediaFileName, - }; - try { - const task = Promise.resolve(debouncer.enqueue(inboundMessage)); - void task.catch((err) => { - inboundLogger.error({ error: String(err) }, "failed handling inbound web message"); - inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`); - }); - } catch (err) { - inboundLogger.error({ error: String(err) }, "failed handling inbound web message"); - inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`); - } + await enqueueInboundMessage(msg, inbound, enriched); } }; sock.ev.on("messages.upsert", handleMessagesUpsert);