fix(cron): emit message.queued/processed for isolated-agent turns

This commit is contained in:
Arnab Saha
2026-05-07 20:01:02 -07:00
committed by Peter Steinberger
parent b5ada806dd
commit 6ccd4e72f0

View File

@@ -15,6 +15,8 @@ import {
type SourceDeliveryPlan,
type SourceDeliveryVisibleDelivery,
} from "../../infra/outbound/source-delivery-plan.js";
import { logMessageProcessed, logMessageQueued } from "../../logging/diagnostic.js";
import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js";
import { isCommandLaneTaskTimeoutError } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
@@ -1184,6 +1186,21 @@ export async function runCronIsolatedAgentTurn(params: {
// the correct context even if adoptCronRunSessionMetadata() rotates it.
const initialSessionId = prepared.context.cronSession.sessionEntry.sessionId;
const turnStartedAtMs = Date.now();
const taskLabel =
params.job.payload.kind === "agentTurn"
? params.job.name?.trim() || params.job.id
: params.job.id;
logMessageQueued({
sessionId: prepared.context.runSessionId,
sessionKey: prepared.context.runSessionKey,
channel: "cron",
source: "cron-isolated",
taskLabel,
});
let outcome: "completed" | "error" = "completed";
let outcomeError: string | undefined;
try {
const { executeCronRun } = await loadCronExecutorRuntime();
const execution = await executeCronRun({
@@ -1222,21 +1239,30 @@ export async function runCronIsolatedAgentTurn(params: {
suppressExecNotifyOnExit: prepared.context.suppressExecNotifyOnExit,
});
if (isAborted()) {
outcome = "error";
outcomeError = abortReason();
return prepared.context.withRunSession({
status: "error",
error: abortReason(),
diagnostics: createCronRunDiagnosticsFromError("cron-setup", abortReason()),
});
}
return await finalizeCronRun({
const finalized = await finalizeCronRun({
prepared: prepared.context,
execution,
abortReason,
isAborted,
});
if (finalized.status === "error") {
outcome = "error";
outcomeError = finalized.error;
}
return finalized;
} catch (err) {
const isCronLaneTimeout = isAborted() || isCronNestedLaneTaskTimeoutError(err);
const error = isCronLaneTimeout ? abortReason() : String(err);
outcome = "error";
outcomeError = error;
return prepared.context.withRunSession({
status: "error",
error,
@@ -1246,6 +1272,14 @@ export async function runCronIsolatedAgentTurn(params: {
),
});
} finally {
logMessageProcessed({
channel: "cron",
sessionId: prepared.context.runSessionId,
sessionKey: prepared.context.runSessionKey,
durationMs: Date.now() - turnStartedAtMs,
outcome,
error: outcomeError,
});
// Release runtime references after the run completes (success or failure).
// The session entry has already been persisted to disk by this point,
// so the in-memory store and run context can be safely dropped.