mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:20:43 +00:00
refactor(gateway): unify startup task execution
This commit is contained in:
@@ -28,6 +28,7 @@ import {
|
||||
} from "../utils/delivery-context.shared.js";
|
||||
import { injectTimestamp, timestampOptsFromConfig } from "./server-methods/agent-timestamp.js";
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
import { runStartupTasks, type StartupTask } from "./startup-tasks.js";
|
||||
|
||||
const log = createSubsystemLogger("gateway/restart-sentinel");
|
||||
const OUTBOUND_RETRY_DELAY_MS = 1_000;
|
||||
@@ -299,10 +300,12 @@ async function dispatchRestartSentinelContinuation(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
||||
async function loadRestartSentinelStartupTask(params: {
|
||||
deps: CliDeps;
|
||||
}): Promise<StartupTask | null> {
|
||||
const sentinel = await consumeRestartSentinel();
|
||||
if (!sentinel) {
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
const payload = sentinel.payload;
|
||||
const sessionKey = payload.sessionKey?.trim();
|
||||
@@ -315,123 +318,138 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
||||
undefined,
|
||||
);
|
||||
|
||||
if (!sessionKey) {
|
||||
const mainSessionKey = resolveMainSessionKeyFromConfig();
|
||||
enqueueSystemEvent(message, { sessionKey: mainSessionKey });
|
||||
if (payload.continuation) {
|
||||
log.warn(`${summary}: continuation skipped: restart sentinel sessionKey unavailable`, {
|
||||
sessionKey: mainSessionKey,
|
||||
const run = async () => {
|
||||
if (!sessionKey) {
|
||||
const mainSessionKey = resolveMainSessionKeyFromConfig();
|
||||
enqueueSystemEvent(message, { sessionKey: mainSessionKey });
|
||||
if (payload.continuation) {
|
||||
log.warn(`${summary}: continuation skipped: restart sentinel sessionKey unavailable`, {
|
||||
sessionKey: mainSessionKey,
|
||||
continuationKind: payload.continuation.kind,
|
||||
});
|
||||
}
|
||||
return { status: "ran" as const };
|
||||
}
|
||||
|
||||
const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey);
|
||||
|
||||
const { cfg, entry, canonicalKey, storePath } = loadSessionEntry(sessionKey);
|
||||
|
||||
const sentinelContext = payload.deliveryContext;
|
||||
let sessionDeliveryContext = deliveryContextFromSession(entry);
|
||||
if (
|
||||
!hasRoutableDeliveryContext(sessionDeliveryContext) &&
|
||||
baseSessionKey &&
|
||||
baseSessionKey !== sessionKey
|
||||
) {
|
||||
const { entry: baseEntry } = loadSessionEntry(baseSessionKey);
|
||||
sessionDeliveryContext = mergeDeliveryContext(
|
||||
sessionDeliveryContext,
|
||||
deliveryContextFromSession(baseEntry),
|
||||
);
|
||||
}
|
||||
|
||||
const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext);
|
||||
|
||||
enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext);
|
||||
|
||||
const channelRaw = origin?.channel;
|
||||
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
|
||||
const to = origin?.to;
|
||||
const threadId =
|
||||
payload.threadId ??
|
||||
sessionThreadId ??
|
||||
(origin?.threadId != null ? String(origin.threadId) : undefined);
|
||||
let resolvedTo: string | undefined;
|
||||
let replyToId: string | undefined;
|
||||
let resolvedThreadId = threadId;
|
||||
|
||||
if (channel && to) {
|
||||
const resolved = resolveOutboundTarget({
|
||||
channel,
|
||||
to,
|
||||
cfg,
|
||||
accountId: origin?.accountId,
|
||||
mode: "implicit",
|
||||
});
|
||||
if (resolved.ok) {
|
||||
resolvedTo = resolved.to;
|
||||
const replyTransport =
|
||||
getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({
|
||||
cfg,
|
||||
accountId: origin?.accountId,
|
||||
threadId,
|
||||
}) ?? null;
|
||||
replyToId = replyTransport?.replyToId ?? undefined;
|
||||
resolvedThreadId =
|
||||
replyTransport && Object.hasOwn(replyTransport, "threadId")
|
||||
? replyTransport.threadId != null
|
||||
? String(replyTransport.threadId)
|
||||
: undefined
|
||||
: threadId;
|
||||
const outboundSession = buildOutboundSessionContext({
|
||||
cfg,
|
||||
sessionKey: canonicalKey,
|
||||
});
|
||||
|
||||
await deliverRestartSentinelNotice({
|
||||
deps: params.deps,
|
||||
cfg,
|
||||
sessionKey: canonicalKey,
|
||||
summary,
|
||||
message,
|
||||
channel,
|
||||
to: resolvedTo,
|
||||
accountId: origin?.accountId,
|
||||
replyToId,
|
||||
threadId: resolvedThreadId,
|
||||
session: outboundSession,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (!payload.continuation) {
|
||||
return { status: "ran" as const };
|
||||
}
|
||||
|
||||
try {
|
||||
await dispatchRestartSentinelContinuation({
|
||||
deps: params.deps,
|
||||
cfg,
|
||||
storePath,
|
||||
sessionKey: canonicalKey,
|
||||
continuation: payload.continuation,
|
||||
ts: payload.ts,
|
||||
route: resolveRestartContinuationRoute({
|
||||
channel: channel ?? undefined,
|
||||
to: resolvedTo,
|
||||
accountId: origin?.accountId,
|
||||
replyToId,
|
||||
threadId: resolvedThreadId,
|
||||
}),
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(`${summary}: continuation delivery failed: ${String(err)}`, {
|
||||
sessionKey: canonicalKey,
|
||||
continuationKind: payload.continuation.kind,
|
||||
});
|
||||
}
|
||||
return { status: "ran" as const };
|
||||
};
|
||||
|
||||
return {
|
||||
source: "restart-sentinel",
|
||||
...(sessionKey ? { sessionKey } : {}),
|
||||
run,
|
||||
};
|
||||
}
|
||||
|
||||
export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
||||
const task = await loadRestartSentinelStartupTask(params);
|
||||
if (!task) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey);
|
||||
|
||||
const { cfg, entry, canonicalKey, storePath } = loadSessionEntry(sessionKey);
|
||||
|
||||
// Prefer delivery context from sentinel (captured at restart) over session store
|
||||
// Handles race condition where store wasn't flushed before restart
|
||||
const sentinelContext = payload.deliveryContext;
|
||||
let sessionDeliveryContext = deliveryContextFromSession(entry);
|
||||
if (
|
||||
!hasRoutableDeliveryContext(sessionDeliveryContext) &&
|
||||
baseSessionKey &&
|
||||
baseSessionKey !== sessionKey
|
||||
) {
|
||||
const { entry: baseEntry } = loadSessionEntry(baseSessionKey);
|
||||
sessionDeliveryContext = mergeDeliveryContext(
|
||||
sessionDeliveryContext,
|
||||
deliveryContextFromSession(baseEntry),
|
||||
);
|
||||
}
|
||||
|
||||
const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext);
|
||||
|
||||
enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext);
|
||||
|
||||
const channelRaw = origin?.channel;
|
||||
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
|
||||
const to = origin?.to;
|
||||
const threadId =
|
||||
payload.threadId ??
|
||||
sessionThreadId ??
|
||||
(origin?.threadId != null ? String(origin.threadId) : undefined);
|
||||
let resolvedTo: string | undefined;
|
||||
let replyToId: string | undefined;
|
||||
let resolvedThreadId = threadId;
|
||||
|
||||
if (channel && to) {
|
||||
const resolved = resolveOutboundTarget({
|
||||
channel,
|
||||
to,
|
||||
cfg,
|
||||
accountId: origin?.accountId,
|
||||
mode: "implicit",
|
||||
});
|
||||
if (resolved.ok) {
|
||||
resolvedTo = resolved.to;
|
||||
const replyTransport =
|
||||
getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({
|
||||
cfg,
|
||||
accountId: origin?.accountId,
|
||||
threadId,
|
||||
}) ?? null;
|
||||
replyToId = replyTransport?.replyToId ?? undefined;
|
||||
resolvedThreadId =
|
||||
replyTransport && Object.hasOwn(replyTransport, "threadId")
|
||||
? replyTransport.threadId != null
|
||||
? String(replyTransport.threadId)
|
||||
: undefined
|
||||
: threadId;
|
||||
const outboundSession = buildOutboundSessionContext({
|
||||
cfg,
|
||||
sessionKey: canonicalKey,
|
||||
});
|
||||
|
||||
await deliverRestartSentinelNotice({
|
||||
deps: params.deps,
|
||||
cfg,
|
||||
sessionKey: canonicalKey,
|
||||
summary,
|
||||
message,
|
||||
channel,
|
||||
to: resolvedTo,
|
||||
accountId: origin?.accountId,
|
||||
replyToId,
|
||||
threadId: resolvedThreadId,
|
||||
session: outboundSession,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (!payload.continuation) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await dispatchRestartSentinelContinuation({
|
||||
deps: params.deps,
|
||||
cfg,
|
||||
storePath,
|
||||
sessionKey: canonicalKey,
|
||||
continuation: payload.continuation,
|
||||
ts: payload.ts,
|
||||
route: resolveRestartContinuationRoute({
|
||||
channel: channel ?? undefined,
|
||||
to: resolvedTo,
|
||||
accountId: origin?.accountId,
|
||||
replyToId,
|
||||
threadId: resolvedThreadId,
|
||||
}),
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(`${summary}: continuation delivery failed: ${String(err)}`, {
|
||||
sessionKey: canonicalKey,
|
||||
continuationKind: payload.continuation.kind,
|
||||
});
|
||||
}
|
||||
await runStartupTasks({ tasks: [task], log });
|
||||
}
|
||||
|
||||
export function shouldWakeFromRestartSentinel() {
|
||||
|
||||
65
src/gateway/startup-tasks.test.ts
Normal file
65
src/gateway/startup-tasks.test.ts
Normal file
@@ -0,0 +1,65 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { runStartupTasks, type StartupTask } from "./startup-tasks.js";
|
||||
|
||||
function createLogger() {
|
||||
return {
|
||||
debug: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
describe("runStartupTasks", () => {
|
||||
it("runs tasks in order and logs skipped/failed outcomes with task identity", async () => {
|
||||
const log = createLogger();
|
||||
const events: string[] = [];
|
||||
const tasks: StartupTask[] = [
|
||||
{
|
||||
source: "boot-md",
|
||||
agentId: "main",
|
||||
workspaceDir: "/ws/main",
|
||||
run: async () => {
|
||||
events.push("boot");
|
||||
return { status: "skipped", reason: "missing" };
|
||||
},
|
||||
},
|
||||
{
|
||||
source: "restart-sentinel",
|
||||
sessionKey: "agent:main:telegram:chat",
|
||||
run: async () => {
|
||||
events.push("restart");
|
||||
return { status: "ran" };
|
||||
},
|
||||
},
|
||||
{
|
||||
source: "boot-md",
|
||||
agentId: "ops",
|
||||
workspaceDir: "/ws/ops",
|
||||
run: async () => {
|
||||
events.push("ops");
|
||||
throw new Error("boom");
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
const results = await runStartupTasks({ tasks, log });
|
||||
|
||||
expect(events).toEqual(["boot", "restart", "ops"]);
|
||||
expect(results).toEqual([
|
||||
{ status: "skipped", reason: "missing" },
|
||||
{ status: "ran" },
|
||||
{ status: "failed", reason: "boom" },
|
||||
]);
|
||||
expect(log.debug).toHaveBeenCalledWith("startup task skipped", {
|
||||
source: "boot-md",
|
||||
agentId: "main",
|
||||
workspaceDir: "/ws/main",
|
||||
reason: "missing",
|
||||
});
|
||||
expect(log.warn).toHaveBeenCalledWith("startup task failed", {
|
||||
source: "boot-md",
|
||||
agentId: "ops",
|
||||
workspaceDir: "/ws/ops",
|
||||
reason: "boom",
|
||||
});
|
||||
});
|
||||
});
|
||||
55
src/gateway/startup-tasks.ts
Normal file
55
src/gateway/startup-tasks.ts
Normal file
@@ -0,0 +1,55 @@
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
|
||||
export type StartupTaskResult =
|
||||
| { status: "skipped"; reason: string }
|
||||
| { status: "ran" }
|
||||
| { status: "failed"; reason: string };
|
||||
|
||||
export type StartupTask = {
|
||||
source: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
workspaceDir?: string;
|
||||
run: () => Promise<StartupTaskResult>;
|
||||
};
|
||||
|
||||
export type StartupTaskLogger = {
|
||||
debug: (message: string, meta?: Record<string, unknown>) => void;
|
||||
warn: (message: string, meta?: Record<string, unknown>) => void;
|
||||
};
|
||||
|
||||
function taskMeta(task: StartupTask, result?: StartupTaskResult): Record<string, unknown> {
|
||||
return {
|
||||
source: task.source,
|
||||
...(task.agentId ? { agentId: task.agentId } : {}),
|
||||
...(task.sessionKey ? { sessionKey: task.sessionKey } : {}),
|
||||
...(task.workspaceDir ? { workspaceDir: task.workspaceDir } : {}),
|
||||
...(result?.status === "failed" || result?.status === "skipped"
|
||||
? { reason: result.reason }
|
||||
: {}),
|
||||
};
|
||||
}
|
||||
|
||||
export async function runStartupTasks(params: {
|
||||
tasks: StartupTask[];
|
||||
log: StartupTaskLogger;
|
||||
}): Promise<StartupTaskResult[]> {
|
||||
const results: StartupTaskResult[] = [];
|
||||
for (const task of params.tasks) {
|
||||
let result: StartupTaskResult;
|
||||
try {
|
||||
result = await task.run();
|
||||
} catch (err) {
|
||||
result = { status: "failed", reason: formatErrorMessage(err) };
|
||||
}
|
||||
results.push(result);
|
||||
if (result.status === "failed") {
|
||||
params.log.warn("startup task failed", taskMeta(task, result));
|
||||
continue;
|
||||
}
|
||||
if (result.status === "skipped") {
|
||||
params.log.debug("startup task skipped", taskMeta(task, result));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
@@ -113,7 +113,8 @@ describe("boot-md handler", () => {
|
||||
await runBootChecklist(makeEvent({ context: { cfg } }));
|
||||
|
||||
expect(logWarn).toHaveBeenCalledTimes(1);
|
||||
expect(logWarn).toHaveBeenCalledWith("boot-md failed for agent startup run", {
|
||||
expect(logWarn).toHaveBeenCalledWith("startup task failed", {
|
||||
source: "boot-md",
|
||||
agentId: "ops",
|
||||
workspaceDir: OPS_WORKSPACE_DIR,
|
||||
reason: "agent failed",
|
||||
@@ -126,7 +127,8 @@ describe("boot-md handler", () => {
|
||||
|
||||
await runBootChecklist(makeEvent({ context: { cfg } }));
|
||||
|
||||
expect(logDebug).toHaveBeenCalledWith("boot-md skipped for agent startup run", {
|
||||
expect(logDebug).toHaveBeenCalledWith("startup task skipped", {
|
||||
source: "boot-md",
|
||||
agentId: "main",
|
||||
workspaceDir: MAIN_WORKSPACE_DIR,
|
||||
reason: "missing",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { listAgentIds, resolveAgentWorkspaceDir } from "../../../agents/agent-scope.js";
|
||||
import { createDefaultDeps } from "../../../cli/deps.js";
|
||||
import { runBootOnce } from "../../../gateway/boot.js";
|
||||
import { runStartupTasks, type StartupTask } from "../../../gateway/startup-tasks.js";
|
||||
import { createSubsystemLogger } from "../../../logging/subsystem.js";
|
||||
import type { HookHandler } from "../../hooks.js";
|
||||
import { isGatewayStartupEvent } from "../../internal-hooks.js";
|
||||
@@ -18,27 +19,17 @@ const runBootChecklist: HookHandler = async (event) => {
|
||||
|
||||
const cfg = event.context.cfg;
|
||||
const deps = event.context.deps ?? createDefaultDeps();
|
||||
const agentIds = listAgentIds(cfg);
|
||||
|
||||
for (const agentId of agentIds) {
|
||||
const tasks: StartupTask[] = listAgentIds(cfg).map((agentId) => {
|
||||
const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId);
|
||||
const result = await runBootOnce({ cfg, deps, workspaceDir, agentId });
|
||||
if (result.status === "failed") {
|
||||
log.warn("boot-md failed for agent startup run", {
|
||||
agentId,
|
||||
workspaceDir,
|
||||
reason: result.reason,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if (result.status === "skipped") {
|
||||
log.debug("boot-md skipped for agent startup run", {
|
||||
agentId,
|
||||
workspaceDir,
|
||||
reason: result.reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
return {
|
||||
source: "boot-md",
|
||||
agentId,
|
||||
workspaceDir,
|
||||
run: () => runBootOnce({ cfg, deps, workspaceDir, agentId }),
|
||||
};
|
||||
});
|
||||
|
||||
await runStartupTasks({ tasks, log });
|
||||
};
|
||||
|
||||
export default runBootChecklist;
|
||||
|
||||
Reference in New Issue
Block a user