mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-22 22:52:03 +00:00
* fix(cron): pass agentId to runHeartbeatOnce for main-session jobs Main-session cron jobs with agentId always ran the heartbeat under the default agent, ignoring the job's agent binding. enqueueSystemEvent correctly routed the system event to the bound agent's session, but runHeartbeatOnce was called without agentId, so the heartbeat ran under the default agent and never picked up the event. Thread agentId from job.agentId through the CronServiceDeps type, timer execution, and the gateway wrapper so heartbeat-runner uses the correct agent. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * cron: add heartbeat agentId propagation regression test (#14140) (thanks @ishikawa-pro) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
121 lines
4.3 KiB
TypeScript
121 lines
4.3 KiB
TypeScript
import type { CliDeps } from "../cli/deps.js";
|
|
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
|
import { loadConfig } from "../config/config.js";
|
|
import { resolveAgentMainSessionKey } from "../config/sessions.js";
|
|
import { resolveStorePath } from "../config/sessions/paths.js";
|
|
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
|
|
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
|
|
import { CronService } from "../cron/service.js";
|
|
import { resolveCronStorePath } from "../cron/store.js";
|
|
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
|
|
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
|
import { enqueueSystemEvent } from "../infra/system-events.js";
|
|
import { getChildLogger } from "../logging.js";
|
|
import { normalizeAgentId } from "../routing/session-key.js";
|
|
import { defaultRuntime } from "../runtime.js";
|
|
|
|
export type GatewayCronState = {
|
|
cron: CronService;
|
|
storePath: string;
|
|
cronEnabled: boolean;
|
|
};
|
|
|
|
export function buildGatewayCronService(params: {
|
|
cfg: ReturnType<typeof loadConfig>;
|
|
deps: CliDeps;
|
|
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
|
}): GatewayCronState {
|
|
const cronLogger = getChildLogger({ module: "cron" });
|
|
const storePath = resolveCronStorePath(params.cfg.cron?.store);
|
|
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
|
|
|
|
const resolveCronAgent = (requested?: string | null) => {
|
|
const runtimeConfig = loadConfig();
|
|
const normalized =
|
|
typeof requested === "string" && requested.trim() ? normalizeAgentId(requested) : undefined;
|
|
const hasAgent =
|
|
normalized !== undefined &&
|
|
Array.isArray(runtimeConfig.agents?.list) &&
|
|
runtimeConfig.agents.list.some(
|
|
(entry) =>
|
|
entry && typeof entry.id === "string" && normalizeAgentId(entry.id) === normalized,
|
|
);
|
|
const agentId = hasAgent ? normalized : resolveDefaultAgentId(runtimeConfig);
|
|
return { agentId, cfg: runtimeConfig };
|
|
};
|
|
|
|
const defaultAgentId = resolveDefaultAgentId(params.cfg);
|
|
const resolveSessionStorePath = (agentId?: string) =>
|
|
resolveStorePath(params.cfg.session?.store, {
|
|
agentId: agentId ?? defaultAgentId,
|
|
});
|
|
const sessionStorePath = resolveSessionStorePath(defaultAgentId);
|
|
|
|
const cron = new CronService({
|
|
storePath,
|
|
cronEnabled,
|
|
cronConfig: params.cfg.cron,
|
|
defaultAgentId,
|
|
resolveSessionStorePath,
|
|
sessionStorePath,
|
|
enqueueSystemEvent: (text, opts) => {
|
|
const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId);
|
|
const sessionKey = resolveAgentMainSessionKey({
|
|
cfg: runtimeConfig,
|
|
agentId,
|
|
});
|
|
enqueueSystemEvent(text, { sessionKey });
|
|
},
|
|
requestHeartbeatNow,
|
|
runHeartbeatOnce: async (opts) => {
|
|
const runtimeConfig = loadConfig();
|
|
const agentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined;
|
|
return await runHeartbeatOnce({
|
|
cfg: runtimeConfig,
|
|
reason: opts?.reason,
|
|
agentId,
|
|
deps: { ...params.deps, runtime: defaultRuntime },
|
|
});
|
|
},
|
|
runIsolatedAgentJob: async ({ job, message }) => {
|
|
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
|
|
return await runCronIsolatedAgentTurn({
|
|
cfg: runtimeConfig,
|
|
deps: params.deps,
|
|
job,
|
|
message,
|
|
agentId,
|
|
sessionKey: `cron:${job.id}`,
|
|
lane: "cron",
|
|
});
|
|
},
|
|
log: getChildLogger({ module: "cron", storePath }),
|
|
onEvent: (evt) => {
|
|
params.broadcast("cron", evt, { dropIfSlow: true });
|
|
if (evt.action === "finished") {
|
|
const logPath = resolveCronRunLogPath({
|
|
storePath,
|
|
jobId: evt.jobId,
|
|
});
|
|
void appendCronRunLog(logPath, {
|
|
ts: Date.now(),
|
|
jobId: evt.jobId,
|
|
action: "finished",
|
|
status: evt.status,
|
|
error: evt.error,
|
|
summary: evt.summary,
|
|
sessionId: evt.sessionId,
|
|
sessionKey: evt.sessionKey,
|
|
runAtMs: evt.runAtMs,
|
|
durationMs: evt.durationMs,
|
|
nextRunAtMs: evt.nextRunAtMs,
|
|
}).catch((err) => {
|
|
cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed");
|
|
});
|
|
}
|
|
},
|
|
});
|
|
|
|
return { cron, storePath, cronEnabled };
|
|
}
|