fix(tasks): validate persisted requester origins

This commit is contained in:
Vincent Koc
2026-05-16 09:28:35 +08:00
parent 2c9f284c1e
commit 24e4dc68b7
5 changed files with 117 additions and 2 deletions

View File

@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
- Update: carry plugin-validation bypasses into config mutation pre-write reads, so package update doctor repairs can finish while externalized plugin schemas are converging.
- Agents/subagents: warn and continue completion announce cleanup when lifecycle cleanup fails, preventing ended subagent runs from becoming silent ghosts. Fixes #82306. Thanks @SebTardif.
- Telegram: let authorized text `/stop` commands use the fast-abort path before queued agent work, so active turns stop immediately instead of processing the abort after the turn finishes; foreign-bot `/stop@otherbot` mentions now stay on the regular topic lane instead of being routed into our control lane. Fixes #82162. Thanks @civiltox.
- Task persistence: drop malformed array/scalar requester-origin JSON from task and task-flow SQLite sidecars instead of restoring it as delivery metadata.
- Agents/timeouts: clarify model idle-timeout errors and docs so provider `timeoutSeconds` is shown as bounded by the whole agent/run timeout ceiling.
- Release tooling: align the published launcher Node floor, `npm start`, package script checks, sharded lint locking, Vitest root project coverage, and plugin-SDK declaration build cache metadata so release/package validation does not silently skip or ship stale surfaces.
- Cron/agents: honor configured subagent model fallbacks for isolated scheduled runs and forward that fallback policy into embedded agent timeout failover. Fixes #74985. Thanks @chrisgwynne.

View File

@@ -2,6 +2,8 @@ import { chmodSync, existsSync, mkdirSync } from "node:fs";
import type { DatabaseSync, StatementSync } from "node:sqlite";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js";
import { isRecord } from "../utils.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import {
resolveTaskFlowRegistryDir,
@@ -74,6 +76,22 @@ function parseJsonValue<T>(raw: string | null): T | undefined {
}
}
function parseDeliveryContextJson(raw: string | null): DeliveryContext | undefined {
const parsed = parseJsonValue<unknown>(raw);
if (!isRecord(parsed)) {
return undefined;
}
return normalizeDeliveryContext({
channel: typeof parsed.channel === "string" ? parsed.channel : undefined,
to: typeof parsed.to === "string" ? parsed.to : undefined,
accountId: typeof parsed.accountId === "string" ? parsed.accountId : undefined,
threadId:
typeof parsed.threadId === "string" || typeof parsed.threadId === "number"
? parsed.threadId
: undefined,
});
}
function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode {
if (row.sync_mode === "task_mirrored" || row.sync_mode === "managed") {
return row.sync_mode;
@@ -84,7 +102,7 @@ function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode {
function rowToFlowRecord(row: FlowRegistryRow): TaskFlowRecord {
const endedAt = normalizeNumber(row.ended_at);
const cancelRequestedAt = normalizeNumber(row.cancel_requested_at);
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const requesterOrigin = parseDeliveryContextJson(row.requester_origin_json);
const stateJson = parseJsonValue<JsonValue>(row.state_json);
const waitJson = parseJsonValue<JsonValue>(row.wait_json);
return {

View File

@@ -1,5 +1,6 @@
import { statSync } from "node:fs";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
import {
createManagedTaskFlow,
@@ -192,6 +193,39 @@ describe("task-flow-registry store runtime", () => {
});
});
it("drops malformed requester origin json from sqlite flow state", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskFlowRegistryForTests();
const created = createManagedTaskFlow({
ownerKey: "agent:main:main",
requesterOrigin: {
channel: "notifychat",
to: "notifychat:123",
},
controllerId: "tests/malformed-origin",
goal: "Restore malformed origin",
status: "running",
});
const sqlitePath = resolveTaskFlowRegistrySqlitePath(process.env);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(sqlitePath);
db.prepare(`UPDATE flow_runs SET requester_origin_json = ? WHERE flow_id = ?`).run(
JSON.stringify(["notifychat", "123"]),
created.flowId,
);
db.close();
resetTaskFlowRegistryForTests({ persist: false });
const restored = getTaskFlowById(created.flowId);
expect(restored?.flowId).toBe(created.flowId);
expect(restored?.requesterOrigin).toBeUndefined();
});
});
it("hardens the sqlite flow store directory and file modes", async () => {
if (process.platform === "win32") {
return;

View File

@@ -2,6 +2,8 @@ import { chmodSync, existsSync, mkdirSync } from "node:fs";
import type { DatabaseSync, StatementSync } from "node:sqlite";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js";
import { isRecord } from "../utils.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js";
import type { TaskRegistryStoreSnapshot } from "./task-registry.store.types.js";
@@ -92,6 +94,22 @@ function parseJsonValue<T>(raw: string | null): T | undefined {
}
}
function parseDeliveryContextJson(raw: string | null): DeliveryContext | undefined {
const parsed = parseJsonValue<unknown>(raw);
if (!isRecord(parsed)) {
return undefined;
}
return normalizeDeliveryContext({
channel: typeof parsed.channel === "string" ? parsed.channel : undefined,
to: typeof parsed.to === "string" ? parsed.to : undefined,
accountId: typeof parsed.accountId === "string" ? parsed.accountId : undefined,
threadId:
typeof parsed.threadId === "string" || typeof parsed.threadId === "number"
? parsed.threadId
: undefined,
});
}
function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
const startedAt = normalizeNumber(row.started_at);
const endedAt = normalizeNumber(row.ended_at);
@@ -130,7 +148,7 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
}
function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState {
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const requesterOrigin = parseDeliveryContextJson(row.requester_origin_json);
const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at);
return {
taskId: row.task_id,

View File

@@ -8,6 +8,7 @@ import {
createTaskRecord,
deleteTaskRecordById,
findTaskByRunId,
getTaskRegistrySnapshot,
markTaskLostById,
maybeDeliverTaskStateChangeUpdate,
resetTaskRegistryForTests,
@@ -274,6 +275,49 @@ describe("task-registry store runtime", () => {
expect(restored?.childSessionKey).toBe("agent:main:workspace:channel:C1234567890");
});
it("drops malformed requester origin json from sqlite delivery state", async () => {
await withOpenClawTestState(
{ layout: "state-only", prefix: "openclaw-task-store-origin-shape-" },
async () => {
const created = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "notifychat",
to: "notifychat:123",
},
childSessionKey: "agent:main:acp:origin-shape",
runId: "run-origin-shape",
task: "Restore malformed origin",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "state_changes",
});
const sqlitePath = resolveTaskRegistrySqlitePath(process.env);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(sqlitePath);
db.prepare(
`INSERT OR REPLACE INTO task_delivery_state (
task_id,
requester_origin_json,
last_notified_event_at
) VALUES (?, ?, ?)`,
).run(created.taskId, JSON.stringify(["notifychat", "123"]), 321);
db.close();
resetTaskRegistryForTests({ persist: false });
const deliveryState = getTaskRegistrySnapshot().deliveryStates.find(
(state) => state.taskId === created.taskId,
);
expect(deliveryState?.lastNotifiedEventAt).toBe(321);
expect(deliveryState?.requesterOrigin).toBeUndefined();
},
);
});
it("preserves taskKind across sqlite restore", () => {
const created = createTaskRecord({
runtime: "acp",