mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:40:44 +00:00
fix(cron): persist manual run ids in history (#76288)
Summary: - The PR carries manual `cron.run` acknowledgement IDs into finished cron events and `cron.runs` history, upda ... surfaces, adds regression coverage, refreshes the SDK baseline hash, and records the fix in the changelog. - Reproducibility: yes. Current main can be reproduced by source inspection: `cron.run` returns a `manual:...` ... r path omits it; the PR adds targeted assertions for the missing correlation and the task-ledger invariant. ClawSweeper fixups: - Included follow-up commit: chore(protocol): update generated cron models - Included follow-up commit: chore(cron): document manual run id protocol surface - Included follow-up commit: Preserve cron task ledger run IDs Validation: - ClawSweeper review passed for head04ce879858. - Required merge gates passed before the squash merge. Prepared head SHA:04ce879858Review: https://github.com/openclaw/openclaw/pull/76288#issuecomment-4364868383 Co-authored-by: Paul Frederiksen <paul@paulfrederiksen.com>
This commit is contained in:
@@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Control UI/Gateway: avoid full session-list reloads for locally applied message-phase session updates, carry known session keys through transcript-file update events, and defer media provider listing when explicit generation model config is present. Refs #76236, #76203, #76188, #76107, and #76166. Thanks @BunsDev.
|
||||
- Install/update: prune the obsolete `plugin-runtime-deps` state directory during packaged postinstall so upgrades from pre-2026.5.2 releases reclaim old bundled-plugin dependency caches without touching external plugin installs.
|
||||
- Auto-reply/queue: treat reset-triggered `/new` and `/reset` turns as interrupt runs across active-run queue handling, so steer/followup modes cannot delay a fresh session behind existing work. Fixes #74093. (#74144) Thanks @ruji9527 and @yelog.
|
||||
- Cron: preserve manual `cron.run` IDs in `cron.runs` history so manual run acknowledgements can be correlated with finished run records. Fixes #76276.
|
||||
- Gateway: keep directly requested plugin tools invokable under restrictive tool profiles while preserving explicit deny lists and the HTTP safety deny list, preventing catalog/invoke mismatches that surface as "Tool not available". Thanks @BunsDev.
|
||||
- Gateway/update: allow beta binaries to refresh gateway services when the config was last written by the matching stable release version, avoiding false newer-config downgrade blocks during beta channel updates.
|
||||
- Channels: keep Matrix and Mattermost bundled in the core package instead of advertising external npm installs before those channels are cut over. Thanks @vincentkoc.
|
||||
|
||||
@@ -4324,6 +4324,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
public let deliveryerror: String?
|
||||
public let sessionid: String?
|
||||
public let sessionkey: String?
|
||||
public let runid: String?
|
||||
public let runatms: Int?
|
||||
public let durationms: Int?
|
||||
public let nextrunatms: Int?
|
||||
@@ -4344,6 +4345,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
deliveryerror: String?,
|
||||
sessionid: String?,
|
||||
sessionkey: String?,
|
||||
runid: String?,
|
||||
runatms: Int?,
|
||||
durationms: Int?,
|
||||
nextrunatms: Int?,
|
||||
@@ -4363,6 +4365,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
self.deliveryerror = deliveryerror
|
||||
self.sessionid = sessionid
|
||||
self.sessionkey = sessionkey
|
||||
self.runid = runid
|
||||
self.runatms = runatms
|
||||
self.durationms = durationms
|
||||
self.nextrunatms = nextrunatms
|
||||
@@ -4384,6 +4387,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
case deliveryerror = "deliveryError"
|
||||
case sessionid = "sessionId"
|
||||
case sessionkey = "sessionKey"
|
||||
case runid = "runId"
|
||||
case runatms = "runAtMs"
|
||||
case durationms = "durationMs"
|
||||
case nextrunatms = "nextRunAtMs"
|
||||
|
||||
@@ -4324,6 +4324,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
public let deliveryerror: String?
|
||||
public let sessionid: String?
|
||||
public let sessionkey: String?
|
||||
public let runid: String?
|
||||
public let runatms: Int?
|
||||
public let durationms: Int?
|
||||
public let nextrunatms: Int?
|
||||
@@ -4344,6 +4345,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
deliveryerror: String?,
|
||||
sessionid: String?,
|
||||
sessionkey: String?,
|
||||
runid: String?,
|
||||
runatms: Int?,
|
||||
durationms: Int?,
|
||||
nextrunatms: Int?,
|
||||
@@ -4363,6 +4365,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
self.deliveryerror = deliveryerror
|
||||
self.sessionid = sessionid
|
||||
self.sessionkey = sessionkey
|
||||
self.runid = runid
|
||||
self.runatms = runatms
|
||||
self.durationms = durationms
|
||||
self.nextrunatms = nextrunatms
|
||||
@@ -4384,6 +4387,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
case deliveryerror = "deliveryError"
|
||||
case sessionid = "sessionId"
|
||||
case sessionkey = "sessionKey"
|
||||
case runid = "runId"
|
||||
case runatms = "runAtMs"
|
||||
case durationms = "durationMs"
|
||||
case nextrunatms = "nextRunAtMs"
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
5f5db635a11240bee5e6eec95d13ff5ab388f3a5477c2f17fd762b5ed5b3dbae plugin-sdk-api-baseline.json
|
||||
463c3bc12bf78ec6fc9350909fb3076967d276944da14343015f0dfae6ea48ed plugin-sdk-api-baseline.jsonl
|
||||
f829dd720df7c9c8eb9d59eda3b3f879bff278f74b4c00d8d788c1483865b649 plugin-sdk-api-baseline.json
|
||||
1b3504c8f9ddd00801f095f94f417d469b47370064478eae389d33f4b8e10c76 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -29,6 +29,7 @@ export type CronRunLogEntry = {
|
||||
delivery?: CronDeliveryTrace;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
runId?: string;
|
||||
runAtMs?: number;
|
||||
durationMs?: number;
|
||||
nextRunAtMs?: number;
|
||||
@@ -310,6 +311,7 @@ function parseAllRunLogEntries(raw: string, opts?: { jobId?: string }): CronRunL
|
||||
status: obj.status,
|
||||
error: obj.error,
|
||||
summary: obj.summary,
|
||||
runId: typeof obj.runId === "string" && obj.runId.trim() ? obj.runId : undefined,
|
||||
runAtMs: obj.runAtMs,
|
||||
durationMs: obj.durationMs,
|
||||
nextRunAtMs: obj.nextRunAtMs,
|
||||
|
||||
@@ -182,6 +182,35 @@ describe("cron service ops seam coverage", () => {
|
||||
stop(state);
|
||||
});
|
||||
|
||||
it("keeps manual acknowledgement IDs separate from recoverable task run IDs", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.parse("2026-03-23T12:00:00.000Z");
|
||||
const restoreStateDir = withStateDirForStorePath(storePath);
|
||||
|
||||
try {
|
||||
await writeDueIsolatedJobSnapshot(storePath, now);
|
||||
|
||||
const state = createOkIsolatedCronState({ storePath, now, summary: "done" });
|
||||
const manualRunId = `manual:isolated-timeout:${now}:1`;
|
||||
|
||||
await expect(
|
||||
run(state, "isolated-timeout", "force", { runId: manualRunId }),
|
||||
).resolves.toEqual({
|
||||
ok: true,
|
||||
ran: true,
|
||||
});
|
||||
|
||||
expect(findTaskByRunId(`cron:isolated-timeout:${now}`)).toMatchObject({
|
||||
runtime: "cron",
|
||||
status: "succeeded",
|
||||
sourceId: "isolated-timeout",
|
||||
});
|
||||
expect(findTaskByRunId(manualRunId)).toBeUndefined();
|
||||
} finally {
|
||||
restoreStateDir();
|
||||
}
|
||||
});
|
||||
|
||||
it("records timed out manual runs as timed_out in the shared task registry", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.parse("2026-03-23T12:00:00.000Z");
|
||||
|
||||
@@ -439,6 +439,7 @@ type PreparedManualRun =
|
||||
ok: true;
|
||||
ran: true;
|
||||
jobId: string;
|
||||
runId?: string;
|
||||
taskRunId?: string;
|
||||
startedAt: number;
|
||||
executionJob: CronJob;
|
||||
@@ -630,6 +631,7 @@ async function prepareManualRun(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
opts?: { runId?: string },
|
||||
): Promise<PreparedManualRun> {
|
||||
const preflight = await inspectManualRunPreflight(state, id, mode);
|
||||
if (!preflight.ok) {
|
||||
@@ -665,6 +667,7 @@ async function prepareManualRun(
|
||||
ok: true,
|
||||
ran: true,
|
||||
jobId: job.id,
|
||||
runId: opts?.runId ?? taskRunId,
|
||||
taskRunId,
|
||||
startedAt: preflight.now,
|
||||
executionJob,
|
||||
@@ -681,6 +684,7 @@ async function finishPreparedManualRun(
|
||||
const startedAt = prepared.startedAt;
|
||||
const jobId = prepared.jobId;
|
||||
const taskRunId = prepared.taskRunId;
|
||||
const runId = prepared.runId;
|
||||
|
||||
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
|
||||
try {
|
||||
@@ -728,6 +732,7 @@ async function finishPreparedManualRun(
|
||||
delivery: coreResult.delivery,
|
||||
sessionId: coreResult.sessionId,
|
||||
sessionKey: coreResult.sessionKey,
|
||||
runId,
|
||||
runAtMs: startedAt,
|
||||
durationMs: job.state.lastDurationMs,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
@@ -767,8 +772,13 @@ async function finishPreparedManualRun(
|
||||
});
|
||||
}
|
||||
|
||||
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
|
||||
const prepared = await prepareManualRun(state, id, mode);
|
||||
export async function run(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
opts?: { runId?: string },
|
||||
) {
|
||||
const prepared = await prepareManualRun(state, id, mode, opts);
|
||||
if (!prepared.ok || !prepared.ran) {
|
||||
return prepared;
|
||||
}
|
||||
@@ -786,7 +796,7 @@ export async function enqueueRun(state: CronServiceState, id: string, mode?: "du
|
||||
void enqueueCommandInLane(
|
||||
CommandLane.Cron,
|
||||
async () => {
|
||||
const result = await run(state, id, mode);
|
||||
const result = await run(state, id, mode, { runId });
|
||||
if (result.ok && "ran" in result && !result.ran) {
|
||||
state.deps.log.info(
|
||||
{ jobId: id, runId, reason: result.reason },
|
||||
|
||||
@@ -30,6 +30,7 @@ export type CronEvent = {
|
||||
delivery?: CronDeliveryTrace;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
runId?: string;
|
||||
nextRunAtMs?: number;
|
||||
} & CronRunTelemetry;
|
||||
|
||||
|
||||
@@ -365,6 +365,7 @@ export const CronRunLogEntrySchema = Type.Object(
|
||||
deliveryError: Type.Optional(Type.String()),
|
||||
sessionId: Type.Optional(NonEmptyString),
|
||||
sessionKey: Type.Optional(NonEmptyString),
|
||||
runId: Type.Optional(NonEmptyString),
|
||||
runAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
durationMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
|
||||
@@ -371,6 +371,7 @@ export function buildGatewayCronService(params: {
|
||||
"deliveryError",
|
||||
"sessionId",
|
||||
"sessionKey",
|
||||
"runId",
|
||||
"nextRunAtMs",
|
||||
"model",
|
||||
"provider",
|
||||
@@ -410,6 +411,7 @@ export function buildGatewayCronService(params: {
|
||||
delivery: evt.delivery,
|
||||
sessionId: evt.sessionId,
|
||||
sessionKey: evt.sessionKey,
|
||||
runId: evt.runId,
|
||||
runAtMs: evt.runAtMs,
|
||||
durationMs: evt.durationMs,
|
||||
nextRunAtMs: evt.nextRunAtMs,
|
||||
|
||||
@@ -861,6 +861,8 @@ describe("gateway server cron", () => {
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
|
||||
expect(runRes.ok).toBe(true);
|
||||
expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
|
||||
const manualRunId = (runRes.payload as { runId?: unknown } | null)?.runId;
|
||||
expect(typeof manualRunId).toBe("string");
|
||||
const finishedPayload = await finishedRun;
|
||||
expect(finishedPayload).toMatchObject({
|
||||
jobId,
|
||||
@@ -879,6 +881,7 @@ describe("gateway server cron", () => {
|
||||
expect((entries as Array<{ deliveryStatus?: unknown }>).at(-1)?.deliveryStatus).toBe(
|
||||
"not-requested",
|
||||
);
|
||||
expect((entries as Array<{ runId?: unknown }>).at(-1)?.runId).toBe(manualRunId);
|
||||
const allRunsRes = await rpcReq(ws, "cron.runs", {
|
||||
scope: "all",
|
||||
limit: 50,
|
||||
|
||||
@@ -662,6 +662,7 @@ export type PluginHookCronChangedEvent = {
|
||||
deliveryError?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
runId?: string;
|
||||
nextRunAtMs?: number;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
|
||||
Reference in New Issue
Block a user