diff --git a/CHANGELOG.md b/CHANGELOG.md index db6ff573f26..83f13a103dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Update: carry plugin-validation bypasses into config mutation pre-write reads, so package update doctor repairs can finish while externalized plugin schemas are converging. - Agents/subagents: warn and continue completion announce cleanup when lifecycle cleanup fails, preventing ended subagent runs from becoming silent ghosts. Fixes #82306. Thanks @SebTardif. - Telegram: let authorized text `/stop` commands use the fast-abort path before queued agent work, so active turns stop immediately instead of processing the abort after the turn finishes; foreign-bot `/stop@otherbot` mentions now stay on the regular topic lane instead of being routed into our control lane. Fixes #82162. Thanks @civiltox. +- Task persistence: drop malformed array/scalar requester-origin JSON from task and task-flow SQLite sidecars instead of restoring it as delivery metadata. - Agents/timeouts: clarify model idle-timeout errors and docs so provider `timeoutSeconds` is shown as bounded by the whole agent/run timeout ceiling. - Release tooling: align the published launcher Node floor, `npm start`, package script checks, sharded lint locking, Vitest root project coverage, and plugin-SDK declaration build cache metadata so release/package validation does not silently skip or ship stale surfaces. - Cron/agents: honor configured subagent model fallbacks for isolated scheduled runs and forward that fallback policy into embedded agent timeout failover. Fixes #74985. Thanks @chrisgwynne. diff --git a/src/tasks/task-flow-registry.store.sqlite.ts b/src/tasks/task-flow-registry.store.sqlite.ts index 5dbeb7e6ed0..bb13022393b 100644 --- a/src/tasks/task-flow-registry.store.sqlite.ts +++ b/src/tasks/task-flow-registry.store.sqlite.ts @@ -2,6 +2,8 @@ import { chmodSync, existsSync, mkdirSync } from "node:fs"; import type { DatabaseSync, StatementSync } from "node:sqlite"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js"; +import { isRecord } from "../utils.js"; +import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; import { resolveTaskFlowRegistryDir, @@ -74,6 +76,22 @@ function parseJsonValue(raw: string | null): T | undefined { } } +function parseDeliveryContextJson(raw: string | null): DeliveryContext | undefined { + const parsed = parseJsonValue(raw); + if (!isRecord(parsed)) { + return undefined; + } + return normalizeDeliveryContext({ + channel: typeof parsed.channel === "string" ? parsed.channel : undefined, + to: typeof parsed.to === "string" ? parsed.to : undefined, + accountId: typeof parsed.accountId === "string" ? parsed.accountId : undefined, + threadId: + typeof parsed.threadId === "string" || typeof parsed.threadId === "number" + ? parsed.threadId + : undefined, + }); +} + function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode { if (row.sync_mode === "task_mirrored" || row.sync_mode === "managed") { return row.sync_mode; @@ -84,7 +102,7 @@ function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode { function rowToFlowRecord(row: FlowRegistryRow): TaskFlowRecord { const endedAt = normalizeNumber(row.ended_at); const cancelRequestedAt = normalizeNumber(row.cancel_requested_at); - const requesterOrigin = parseJsonValue(row.requester_origin_json); + const requesterOrigin = parseDeliveryContextJson(row.requester_origin_json); const stateJson = parseJsonValue(row.state_json); const waitJson = parseJsonValue(row.wait_json); return { diff --git a/src/tasks/task-flow-registry.store.test.ts b/src/tasks/task-flow-registry.store.test.ts index e0fb882b4c4..dbe0d1a9d4b 100644 --- a/src/tasks/task-flow-registry.store.test.ts +++ b/src/tasks/task-flow-registry.store.test.ts @@ -1,5 +1,6 @@ import { statSync } from "node:fs"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; import { createManagedTaskFlow, @@ -192,6 +193,39 @@ describe("task-flow-registry store runtime", () => { }); }); + it("drops malformed requester origin json from sqlite flow state", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskFlowRegistryForTests(); + + const created = createManagedTaskFlow({ + ownerKey: "agent:main:main", + requesterOrigin: { + channel: "notifychat", + to: "notifychat:123", + }, + controllerId: "tests/malformed-origin", + goal: "Restore malformed origin", + status: "running", + }); + + const sqlitePath = resolveTaskFlowRegistrySqlitePath(process.env); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(sqlitePath); + db.prepare(`UPDATE flow_runs SET requester_origin_json = ? WHERE flow_id = ?`).run( + JSON.stringify(["notifychat", "123"]), + created.flowId, + ); + db.close(); + + resetTaskFlowRegistryForTests({ persist: false }); + + const restored = getTaskFlowById(created.flowId); + expect(restored?.flowId).toBe(created.flowId); + expect(restored?.requesterOrigin).toBeUndefined(); + }); + }); + it("hardens the sqlite flow store directory and file modes", async () => { if (process.platform === "win32") { return; diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index 55c82a61195..4a2a92e9324 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -2,6 +2,8 @@ import { chmodSync, existsSync, mkdirSync } from "node:fs"; import type { DatabaseSync, StatementSync } from "node:sqlite"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js"; +import { isRecord } from "../utils.js"; +import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js"; import type { TaskRegistryStoreSnapshot } from "./task-registry.store.types.js"; @@ -92,6 +94,22 @@ function parseJsonValue(raw: string | null): T | undefined { } } +function parseDeliveryContextJson(raw: string | null): DeliveryContext | undefined { + const parsed = parseJsonValue(raw); + if (!isRecord(parsed)) { + return undefined; + } + return normalizeDeliveryContext({ + channel: typeof parsed.channel === "string" ? parsed.channel : undefined, + to: typeof parsed.to === "string" ? parsed.to : undefined, + accountId: typeof parsed.accountId === "string" ? parsed.accountId : undefined, + threadId: + typeof parsed.threadId === "string" || typeof parsed.threadId === "number" + ? parsed.threadId + : undefined, + }); +} + function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { const startedAt = normalizeNumber(row.started_at); const endedAt = normalizeNumber(row.ended_at); @@ -130,7 +148,7 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { } function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState { - const requesterOrigin = parseJsonValue(row.requester_origin_json); + const requesterOrigin = parseDeliveryContextJson(row.requester_origin_json); const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at); return { taskId: row.task_id, diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index aebb7575e99..1cd312e9c7e 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -8,6 +8,7 @@ import { createTaskRecord, deleteTaskRecordById, findTaskByRunId, + getTaskRegistrySnapshot, markTaskLostById, maybeDeliverTaskStateChangeUpdate, resetTaskRegistryForTests, @@ -274,6 +275,49 @@ describe("task-registry store runtime", () => { expect(restored?.childSessionKey).toBe("agent:main:workspace:channel:C1234567890"); }); + it("drops malformed requester origin json from sqlite delivery state", async () => { + await withOpenClawTestState( + { layout: "state-only", prefix: "openclaw-task-store-origin-shape-" }, + async () => { + const created = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + requesterOrigin: { + channel: "notifychat", + to: "notifychat:123", + }, + childSessionKey: "agent:main:acp:origin-shape", + runId: "run-origin-shape", + task: "Restore malformed origin", + status: "running", + deliveryStatus: "pending", + notifyPolicy: "state_changes", + }); + + const sqlitePath = resolveTaskRegistrySqlitePath(process.env); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(sqlitePath); + db.prepare( + `INSERT OR REPLACE INTO task_delivery_state ( + task_id, + requester_origin_json, + last_notified_event_at + ) VALUES (?, ?, ?)`, + ).run(created.taskId, JSON.stringify(["notifychat", "123"]), 321); + db.close(); + + resetTaskRegistryForTests({ persist: false }); + + const deliveryState = getTaskRegistrySnapshot().deliveryStates.find( + (state) => state.taskId === created.taskId, + ); + expect(deliveryState?.lastNotifiedEventAt).toBe(321); + expect(deliveryState?.requesterOrigin).toBeUndefined(); + }, + ); + }); + it("preserves taskKind across sqlite restore", () => { const created = createTaskRecord({ runtime: "acp",