diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index cb39204c840..bcf93796856 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -28,6 +28,7 @@ import { } from "../utils/delivery-context.shared.js"; import { injectTimestamp, timestampOptsFromConfig } from "./server-methods/agent-timestamp.js"; import { loadSessionEntry } from "./session-utils.js"; +import { runStartupTasks, type StartupTask } from "./startup-tasks.js"; const log = createSubsystemLogger("gateway/restart-sentinel"); const OUTBOUND_RETRY_DELAY_MS = 1_000; @@ -299,10 +300,12 @@ async function dispatchRestartSentinelContinuation(params: { } } -export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { +async function loadRestartSentinelStartupTask(params: { + deps: CliDeps; +}): Promise { const sentinel = await consumeRestartSentinel(); if (!sentinel) { - return; + return null; } const payload = sentinel.payload; const sessionKey = payload.sessionKey?.trim(); @@ -315,123 +318,138 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { undefined, ); - if (!sessionKey) { - const mainSessionKey = resolveMainSessionKeyFromConfig(); - enqueueSystemEvent(message, { sessionKey: mainSessionKey }); - if (payload.continuation) { - log.warn(`${summary}: continuation skipped: restart sentinel sessionKey unavailable`, { - sessionKey: mainSessionKey, + const run = async () => { + if (!sessionKey) { + const mainSessionKey = resolveMainSessionKeyFromConfig(); + enqueueSystemEvent(message, { sessionKey: mainSessionKey }); + if (payload.continuation) { + log.warn(`${summary}: continuation skipped: restart sentinel sessionKey unavailable`, { + sessionKey: mainSessionKey, + continuationKind: payload.continuation.kind, + }); + } + return { status: "ran" as const }; + } + + const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey); + + const { cfg, entry, canonicalKey, storePath } = loadSessionEntry(sessionKey); + + const sentinelContext = payload.deliveryContext; + let sessionDeliveryContext = deliveryContextFromSession(entry); + if ( + !hasRoutableDeliveryContext(sessionDeliveryContext) && + baseSessionKey && + baseSessionKey !== sessionKey + ) { + const { entry: baseEntry } = loadSessionEntry(baseSessionKey); + sessionDeliveryContext = mergeDeliveryContext( + sessionDeliveryContext, + deliveryContextFromSession(baseEntry), + ); + } + + const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext); + + enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext); + + const channelRaw = origin?.channel; + const channel = channelRaw ? normalizeChannelId(channelRaw) : null; + const to = origin?.to; + const threadId = + payload.threadId ?? + sessionThreadId ?? + (origin?.threadId != null ? String(origin.threadId) : undefined); + let resolvedTo: string | undefined; + let replyToId: string | undefined; + let resolvedThreadId = threadId; + + if (channel && to) { + const resolved = resolveOutboundTarget({ + channel, + to, + cfg, + accountId: origin?.accountId, + mode: "implicit", + }); + if (resolved.ok) { + resolvedTo = resolved.to; + const replyTransport = + getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({ + cfg, + accountId: origin?.accountId, + threadId, + }) ?? null; + replyToId = replyTransport?.replyToId ?? undefined; + resolvedThreadId = + replyTransport && Object.hasOwn(replyTransport, "threadId") + ? replyTransport.threadId != null + ? String(replyTransport.threadId) + : undefined + : threadId; + const outboundSession = buildOutboundSessionContext({ + cfg, + sessionKey: canonicalKey, + }); + + await deliverRestartSentinelNotice({ + deps: params.deps, + cfg, + sessionKey: canonicalKey, + summary, + message, + channel, + to: resolvedTo, + accountId: origin?.accountId, + replyToId, + threadId: resolvedThreadId, + session: outboundSession, + }); + } + } + + if (!payload.continuation) { + return { status: "ran" as const }; + } + + try { + await dispatchRestartSentinelContinuation({ + deps: params.deps, + cfg, + storePath, + sessionKey: canonicalKey, + continuation: payload.continuation, + ts: payload.ts, + route: resolveRestartContinuationRoute({ + channel: channel ?? undefined, + to: resolvedTo, + accountId: origin?.accountId, + replyToId, + threadId: resolvedThreadId, + }), + }); + } catch (err) { + log.warn(`${summary}: continuation delivery failed: ${String(err)}`, { + sessionKey: canonicalKey, continuationKind: payload.continuation.kind, }); } + return { status: "ran" as const }; + }; + + return { + source: "restart-sentinel", + ...(sessionKey ? { sessionKey } : {}), + run, + }; +} + +export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { + const task = await loadRestartSentinelStartupTask(params); + if (!task) { return; } - - const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey); - - const { cfg, entry, canonicalKey, storePath } = loadSessionEntry(sessionKey); - - // Prefer delivery context from sentinel (captured at restart) over session store - // Handles race condition where store wasn't flushed before restart - const sentinelContext = payload.deliveryContext; - let sessionDeliveryContext = deliveryContextFromSession(entry); - if ( - !hasRoutableDeliveryContext(sessionDeliveryContext) && - baseSessionKey && - baseSessionKey !== sessionKey - ) { - const { entry: baseEntry } = loadSessionEntry(baseSessionKey); - sessionDeliveryContext = mergeDeliveryContext( - sessionDeliveryContext, - deliveryContextFromSession(baseEntry), - ); - } - - const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext); - - enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext); - - const channelRaw = origin?.channel; - const channel = channelRaw ? normalizeChannelId(channelRaw) : null; - const to = origin?.to; - const threadId = - payload.threadId ?? - sessionThreadId ?? - (origin?.threadId != null ? String(origin.threadId) : undefined); - let resolvedTo: string | undefined; - let replyToId: string | undefined; - let resolvedThreadId = threadId; - - if (channel && to) { - const resolved = resolveOutboundTarget({ - channel, - to, - cfg, - accountId: origin?.accountId, - mode: "implicit", - }); - if (resolved.ok) { - resolvedTo = resolved.to; - const replyTransport = - getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({ - cfg, - accountId: origin?.accountId, - threadId, - }) ?? null; - replyToId = replyTransport?.replyToId ?? undefined; - resolvedThreadId = - replyTransport && Object.hasOwn(replyTransport, "threadId") - ? replyTransport.threadId != null - ? String(replyTransport.threadId) - : undefined - : threadId; - const outboundSession = buildOutboundSessionContext({ - cfg, - sessionKey: canonicalKey, - }); - - await deliverRestartSentinelNotice({ - deps: params.deps, - cfg, - sessionKey: canonicalKey, - summary, - message, - channel, - to: resolvedTo, - accountId: origin?.accountId, - replyToId, - threadId: resolvedThreadId, - session: outboundSession, - }); - } - } - - if (!payload.continuation) { - return; - } - - try { - await dispatchRestartSentinelContinuation({ - deps: params.deps, - cfg, - storePath, - sessionKey: canonicalKey, - continuation: payload.continuation, - ts: payload.ts, - route: resolveRestartContinuationRoute({ - channel: channel ?? undefined, - to: resolvedTo, - accountId: origin?.accountId, - replyToId, - threadId: resolvedThreadId, - }), - }); - } catch (err) { - log.warn(`${summary}: continuation delivery failed: ${String(err)}`, { - sessionKey: canonicalKey, - continuationKind: payload.continuation.kind, - }); - } + await runStartupTasks({ tasks: [task], log }); } export function shouldWakeFromRestartSentinel() { diff --git a/src/gateway/startup-tasks.test.ts b/src/gateway/startup-tasks.test.ts new file mode 100644 index 00000000000..9c51182b624 --- /dev/null +++ b/src/gateway/startup-tasks.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, it, vi } from "vitest"; +import { runStartupTasks, type StartupTask } from "./startup-tasks.js"; + +function createLogger() { + return { + debug: vi.fn(), + warn: vi.fn(), + }; +} + +describe("runStartupTasks", () => { + it("runs tasks in order and logs skipped/failed outcomes with task identity", async () => { + const log = createLogger(); + const events: string[] = []; + const tasks: StartupTask[] = [ + { + source: "boot-md", + agentId: "main", + workspaceDir: "/ws/main", + run: async () => { + events.push("boot"); + return { status: "skipped", reason: "missing" }; + }, + }, + { + source: "restart-sentinel", + sessionKey: "agent:main:telegram:chat", + run: async () => { + events.push("restart"); + return { status: "ran" }; + }, + }, + { + source: "boot-md", + agentId: "ops", + workspaceDir: "/ws/ops", + run: async () => { + events.push("ops"); + throw new Error("boom"); + }, + }, + ]; + + const results = await runStartupTasks({ tasks, log }); + + expect(events).toEqual(["boot", "restart", "ops"]); + expect(results).toEqual([ + { status: "skipped", reason: "missing" }, + { status: "ran" }, + { status: "failed", reason: "boom" }, + ]); + expect(log.debug).toHaveBeenCalledWith("startup task skipped", { + source: "boot-md", + agentId: "main", + workspaceDir: "/ws/main", + reason: "missing", + }); + expect(log.warn).toHaveBeenCalledWith("startup task failed", { + source: "boot-md", + agentId: "ops", + workspaceDir: "/ws/ops", + reason: "boom", + }); + }); +}); diff --git a/src/gateway/startup-tasks.ts b/src/gateway/startup-tasks.ts new file mode 100644 index 00000000000..a70b6679019 --- /dev/null +++ b/src/gateway/startup-tasks.ts @@ -0,0 +1,55 @@ +import { formatErrorMessage } from "../infra/errors.js"; + +export type StartupTaskResult = + | { status: "skipped"; reason: string } + | { status: "ran" } + | { status: "failed"; reason: string }; + +export type StartupTask = { + source: string; + agentId?: string; + sessionKey?: string; + workspaceDir?: string; + run: () => Promise; +}; + +export type StartupTaskLogger = { + debug: (message: string, meta?: Record) => void; + warn: (message: string, meta?: Record) => void; +}; + +function taskMeta(task: StartupTask, result?: StartupTaskResult): Record { + return { + source: task.source, + ...(task.agentId ? { agentId: task.agentId } : {}), + ...(task.sessionKey ? { sessionKey: task.sessionKey } : {}), + ...(task.workspaceDir ? { workspaceDir: task.workspaceDir } : {}), + ...(result?.status === "failed" || result?.status === "skipped" + ? { reason: result.reason } + : {}), + }; +} + +export async function runStartupTasks(params: { + tasks: StartupTask[]; + log: StartupTaskLogger; +}): Promise { + const results: StartupTaskResult[] = []; + for (const task of params.tasks) { + let result: StartupTaskResult; + try { + result = await task.run(); + } catch (err) { + result = { status: "failed", reason: formatErrorMessage(err) }; + } + results.push(result); + if (result.status === "failed") { + params.log.warn("startup task failed", taskMeta(task, result)); + continue; + } + if (result.status === "skipped") { + params.log.debug("startup task skipped", taskMeta(task, result)); + } + } + return results; +} diff --git a/src/hooks/bundled/boot-md/handler.test.ts b/src/hooks/bundled/boot-md/handler.test.ts index de2abd6475f..a07dfb1bc67 100644 --- a/src/hooks/bundled/boot-md/handler.test.ts +++ b/src/hooks/bundled/boot-md/handler.test.ts @@ -113,7 +113,8 @@ describe("boot-md handler", () => { await runBootChecklist(makeEvent({ context: { cfg } })); expect(logWarn).toHaveBeenCalledTimes(1); - expect(logWarn).toHaveBeenCalledWith("boot-md failed for agent startup run", { + expect(logWarn).toHaveBeenCalledWith("startup task failed", { + source: "boot-md", agentId: "ops", workspaceDir: OPS_WORKSPACE_DIR, reason: "agent failed", @@ -126,7 +127,8 @@ describe("boot-md handler", () => { await runBootChecklist(makeEvent({ context: { cfg } })); - expect(logDebug).toHaveBeenCalledWith("boot-md skipped for agent startup run", { + expect(logDebug).toHaveBeenCalledWith("startup task skipped", { + source: "boot-md", agentId: "main", workspaceDir: MAIN_WORKSPACE_DIR, reason: "missing", diff --git a/src/hooks/bundled/boot-md/handler.ts b/src/hooks/bundled/boot-md/handler.ts index b5fcb065ac6..97aaa19c9e8 100644 --- a/src/hooks/bundled/boot-md/handler.ts +++ b/src/hooks/bundled/boot-md/handler.ts @@ -1,6 +1,7 @@ import { listAgentIds, resolveAgentWorkspaceDir } from "../../../agents/agent-scope.js"; import { createDefaultDeps } from "../../../cli/deps.js"; import { runBootOnce } from "../../../gateway/boot.js"; +import { runStartupTasks, type StartupTask } from "../../../gateway/startup-tasks.js"; import { createSubsystemLogger } from "../../../logging/subsystem.js"; import type { HookHandler } from "../../hooks.js"; import { isGatewayStartupEvent } from "../../internal-hooks.js"; @@ -18,27 +19,17 @@ const runBootChecklist: HookHandler = async (event) => { const cfg = event.context.cfg; const deps = event.context.deps ?? createDefaultDeps(); - const agentIds = listAgentIds(cfg); - - for (const agentId of agentIds) { + const tasks: StartupTask[] = listAgentIds(cfg).map((agentId) => { const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); - const result = await runBootOnce({ cfg, deps, workspaceDir, agentId }); - if (result.status === "failed") { - log.warn("boot-md failed for agent startup run", { - agentId, - workspaceDir, - reason: result.reason, - }); - continue; - } - if (result.status === "skipped") { - log.debug("boot-md skipped for agent startup run", { - agentId, - workspaceDir, - reason: result.reason, - }); - } - } + return { + source: "boot-md", + agentId, + workspaceDir, + run: () => runBootOnce({ cfg, deps, workspaceDir, agentId }), + }; + }); + + await runStartupTasks({ tasks, log }); }; export default runBootChecklist;