From 0bbac63d00bc0959dccf177fa037589c248f83d2 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 15 Jun 2026 09:42:41 -0400 Subject: [PATCH] fix(tasks): migrate legacy agent attribution --- src/commands/doctor-state-migrations.test.ts | 122 +++++++++- src/infra/state-migrations.ts | 34 ++- src/state/openclaw-state-db.test.ts | 231 ++++++++++++++++++- src/state/openclaw-state-db.ts | 64 ++++- 4 files changed, 435 insertions(+), 16 deletions(-) diff --git a/src/commands/doctor-state-migrations.test.ts b/src/commands/doctor-state-migrations.test.ts index c2ae3163008..ca7d31373bb 100644 --- a/src/commands/doctor-state-migrations.test.ts +++ b/src/commands/doctor-state-migrations.test.ts @@ -431,9 +431,9 @@ function writeLegacyTaskStateSidecars(root: string): { .prepare( ` INSERT INTO task_runs ( - task_id, runtime, source_id, requester_session_key, child_session_key, run_id, task, - status, delivery_status, notify_policy, created_at, last_event_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + task_id, runtime, source_id, requester_session_key, child_session_key, agent_id, run_id, + task, status, delivery_status, notify_policy, created_at, last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, ) .run( @@ -442,6 +442,7 @@ function writeLegacyTaskStateSidecars(root: string): { "nightly", "", "agent:main:cron:nightly", + "ops", "legacy-task-run", "Legacy cron task", "running", @@ -507,6 +508,36 @@ function writeLegacyTaskStateSidecars(root: string): { return { taskRunsPath, flowRunsPath }; } +function appendLegacyCrossAgentTask(taskRunsPath: string): void { + const sqlite = requireNodeSqlite(); + const db = new sqlite.DatabaseSync(taskRunsPath); + try { + db.prepare( + ` + INSERT INTO task_runs ( + task_id, runtime, requester_session_key, child_session_key, agent_id, run_id, task, + status, delivery_status, notify_policy, created_at, last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + ).run( + "legacy-cross-agent", + "subagent", + "agent:main:main", + "agent:worker:subagent:child", + "main", + "legacy-cross-agent-run", + "Inspect worker state", + "running", + "pending", + "done_only", + 130, + 140, + ); + } finally { + db.close(); + } +} + async function detectAndRunMigrations(params: { root: string; cfg: OpenClawConfig; @@ -2271,6 +2302,7 @@ describe("doctor legacy state migrations", () => { ownerKey: "system:cron:nightly", scopeKind: "system", requesterSessionKey: "", + agentId: "ops", runId: "legacy-task-run", }); expect(taskState.deliveryStates.get("legacy-task")).toMatchObject({ @@ -2345,6 +2377,90 @@ describe("doctor legacy state migrations", () => { }); }); + it("canonicalizes cross-agent attribution while importing task sidecars", async () => { + const root = await makeTempRoot(); + const { taskRunsPath } = writeLegacyTaskStateSidecars(root); + appendLegacyCrossAgentTask(taskRunsPath); + + const result = await autoMigrateLegacyTaskStateSidecars({ + env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv, + }); + + expect(result.warnings).toStrictEqual([]); + expect(result.changes).toContain("Migrated 2 task registry sidecar rows → shared SQLite state"); + + await withStateDir(root, async () => { + expect(loadTaskRegistryStateFromSqlite().tasks.get("legacy-cross-agent")).toMatchObject({ + taskId: "legacy-cross-agent", + agentId: "worker", + requesterAgentId: "main", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:worker:subagent:child", + }); + }); + }); + + it("keeps task sidecars when only requester attribution conflicts", async () => { + const root = await makeTempRoot(); + const { taskRunsPath } = writeLegacyTaskStateSidecars(root); + appendLegacyCrossAgentTask(taskRunsPath); + + await withStateDir(root, async () => { + loadTaskRegistryStateFromSqlite(); + closeOpenClawStateDatabaseForTest(); + const sqlite = requireNodeSqlite(); + const db = new sqlite.DatabaseSync(path.join(root, "state", "openclaw.sqlite")); + try { + db.prepare( + `INSERT INTO task_runs ( + task_id, + runtime, + requester_session_key, + owner_key, + scope_kind, + child_session_key, + agent_id, + requester_agent_id, + run_id, + task, + status, + delivery_status, + notify_policy, + created_at, + last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ).run( + "legacy-cross-agent", + "subagent", + "agent:main:main", + "agent:main:main", + "session", + "agent:worker:subagent:child", + "worker", + "other-requester", + "legacy-cross-agent-run", + "Inspect worker state", + "running", + "pending", + "done_only", + 130, + 140, + ); + } finally { + db.close(); + } + }); + + const result = await autoMigrateLegacyTaskStateSidecars({ + env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv, + }); + + expect(result.warnings).toContain( + "Left task registry sidecar in place because 1 row already existed in shared state: legacy-cross-agent", + ); + expect(fs.existsSync(taskRunsPath)).toBe(true); + }); + it("keeps task sidecars when shared state already has conflicting task rows", async () => { const root = await makeTempRoot(); const { taskRunsPath, flowRunsPath } = writeLegacyTaskStateSidecars(root); diff --git a/src/infra/state-migrations.ts b/src/infra/state-migrations.ts index d0896c9c320..3a3c3e18859 100644 --- a/src/infra/state-migrations.ts +++ b/src/infra/state-migrations.ts @@ -658,6 +658,19 @@ function normalizeLegacyTaskRow(row: Record): SqliteBindRow { const ownerKey = ownerRaw || requesterRaw || `system:${runtime}:${sourceId || taskId}`; const scopeRaw = typeof row.scope_kind === "string" ? row.scope_kind : ""; const scopeKind = scopeRaw === "system" || ownerKey.startsWith("system:") ? "system" : "session"; + const childSessionKey = + typeof row.child_session_key === "string" ? row.child_session_key.trim() : ""; + const persistedAgentId = typeof row.agent_id === "string" ? row.agent_id.trim() : ""; + const isSpawnRuntime = runtime === "subagent" || runtime === "acp"; + const childAgentId = isSpawnRuntime ? parseAgentSessionKey(childSessionKey)?.agentId : undefined; + const requesterAgentId = + (typeof row.requester_agent_id === "string" ? row.requester_agent_id.trim() : "") || + (isSpawnRuntime + ? (parseAgentSessionKey(ownerKey)?.agentId ?? + parseAgentSessionKey(requesterRaw)?.agentId ?? + (childAgentId && persistedAgentId !== childAgentId ? persistedAgentId : "")) + : ""); + const executorAgentId = requesterAgentId ? childAgentId || persistedAgentId : persistedAgentId; return { task_id: taskId, runtime, @@ -666,10 +679,11 @@ function normalizeLegacyTaskRow(row: Record): SqliteBindRow { requester_session_key: scopeKind === "system" ? "" : requesterRaw || ownerKey, owner_key: ownerKey, scope_kind: scopeKind, - child_session_key: legacyBindValue(row.child_session_key), + child_session_key: childSessionKey || null, parent_flow_id: legacyBindValue(row.parent_flow_id), parent_task_id: legacyBindValue(row.parent_task_id), - agent_id: legacyBindValue(row.agent_id), + agent_id: executorAgentId || null, + requester_agent_id: requesterAgentId || null, run_id: legacyBindValue(row.run_id), label: legacyBindValue(row.label), task: legacyBindValue(row.task ?? ""), @@ -760,6 +774,7 @@ function readLegacyTaskRows(sourcePath: string): SqliteBindRow[] { pickLegacyColumn(columns, "parent_flow_id"), pickLegacyColumn(columns, "parent_task_id"), pickLegacyColumn(columns, "agent_id"), + pickLegacyColumn(columns, "requester_agent_id"), pickLegacyColumn(columns, "run_id"), pickLegacyColumn(columns, "label"), "task", @@ -851,15 +866,15 @@ function insertTaskRunRowSql(db: DatabaseSync, row: SqliteBindRow): void { ` INSERT INTO task_runs ( task_id, runtime, task_kind, source_id, requester_session_key, owner_key, scope_kind, - child_session_key, parent_flow_id, parent_task_id, agent_id, run_id, label, task, status, - delivery_status, notify_policy, created_at, started_at, ended_at, last_event_at, - cleanup_after, error, progress_summary, terminal_summary, terminal_outcome + child_session_key, parent_flow_id, parent_task_id, agent_id, requester_agent_id, run_id, + label, task, status, delivery_status, notify_policy, created_at, started_at, ended_at, + last_event_at, cleanup_after, error, progress_summary, terminal_summary, terminal_outcome ) VALUES ( @task_id, @runtime, @task_kind, @source_id, @requester_session_key, @owner_key, - @scope_kind, @child_session_key, @parent_flow_id, @parent_task_id, @agent_id, @run_id, - @label, @task, @status, @delivery_status, @notify_policy, @created_at, @started_at, - @ended_at, @last_event_at, @cleanup_after, @error, @progress_summary, @terminal_summary, - @terminal_outcome + @scope_kind, @child_session_key, @parent_flow_id, @parent_task_id, @agent_id, + @requester_agent_id, @run_id, @label, @task, @status, @delivery_status, @notify_policy, + @created_at, @started_at, @ended_at, @last_event_at, @cleanup_after, @error, + @progress_summary, @terminal_summary, @terminal_outcome ) `, ).run(row); @@ -933,6 +948,7 @@ async function migrateLegacyTaskRunsSidecar(params: { "parent_flow_id", "parent_task_id", "agent_id", + "requester_agent_id", "run_id", "label", "task", diff --git a/src/state/openclaw-state-db.test.ts b/src/state/openclaw-state-db.test.ts index 7730fd91c63..4c20f4e8de6 100644 --- a/src/state/openclaw-state-db.test.ts +++ b/src/state/openclaw-state-db.test.ts @@ -79,7 +79,7 @@ describe("openclaw state database", () => { expect(database.path).toBe(path.join(stateDir, "state", "openclaw.sqlite")); }); - it("adds requester agent attribution to existing task tables", () => { + it("migrates requester and executor attribution for existing cross-agent tasks", () => { const stateDir = createTempStateDir(); const database = openOpenClawStateDatabase({ env: { OPENCLAW_STATE_DIR: stateDir }, @@ -90,6 +90,72 @@ describe("openclaw state database", () => { const { DatabaseSync } = requireNodeSqlite(); const legacyDb = new DatabaseSync(databasePath); legacyDb.exec("ALTER TABLE task_runs DROP COLUMN requester_agent_id"); + legacyDb + .prepare( + `INSERT INTO task_runs ( + task_id, + runtime, + requester_session_key, + owner_key, + scope_kind, + child_session_key, + agent_id, + task, + status, + delivery_status, + notify_policy, + created_at, + last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + "legacy-cross-agent", + "subagent", + "agent:main:main", + "agent:main:main", + "session", + "agent:worker:subagent:child", + "main", + "Inspect worker state", + "running", + "pending", + "done_only", + 100, + 100, + ); + legacyDb + .prepare( + `INSERT INTO task_runs ( + task_id, + runtime, + requester_session_key, + owner_key, + scope_kind, + child_session_key, + agent_id, + task, + status, + delivery_status, + notify_policy, + created_at, + last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + "legacy-global-cross-agent", + "subagent", + "global", + "global", + "session", + "agent:worker:subagent:global-child", + null, + "Inspect global worker state", + "running", + "pending", + "done_only", + 110, + 110, + ); legacyDb.close(); const reopened = openOpenClawStateDatabase({ @@ -99,6 +165,169 @@ describe("openclaw state database", () => { name?: string; }>; expect(columns.some((column) => column.name === "requester_agent_id")).toBe(true); + expect( + reopened.db + .prepare( + `SELECT agent_id, requester_agent_id + FROM task_runs + WHERE task_id = ?`, + ) + .get("legacy-cross-agent"), + ).toEqual({ + agent_id: "worker", + requester_agent_id: "main", + }); + expect( + reopened.db + .prepare( + `SELECT agent_id, requester_agent_id + FROM task_runs + WHERE task_id = ?`, + ) + .get("legacy-global-cross-agent"), + ).toEqual({ + agent_id: null, + requester_agent_id: null, + }); + + reopened.db + .prepare( + `INSERT INTO task_runs ( + task_id, + runtime, + requester_session_key, + owner_key, + scope_kind, + child_session_key, + agent_id, + requester_agent_id, + task, + status, + delivery_status, + notify_policy, + created_at, + last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + "current-explicit-attribution", + "subagent", + "global", + "global", + "session", + "agent:worker:subagent:current", + "main", + null, + "Current explicit attribution", + "running", + "pending", + "done_only", + 200, + 200, + ); + closeOpenClawStateDatabaseForTest(); + + const currentReopened = openOpenClawStateDatabase({ + env: { OPENCLAW_STATE_DIR: stateDir }, + }); + expect( + currentReopened.db + .prepare( + `SELECT agent_id, requester_agent_id + FROM task_runs + WHERE task_id = ?`, + ) + .get("current-explicit-attribution"), + ).toEqual({ + agent_id: "main", + requester_agent_id: null, + }); + }); + + it("rolls back the requester attribution column when its backfill fails", () => { + const stateDir = createTempStateDir(); + const database = openOpenClawStateDatabase({ + env: { OPENCLAW_STATE_DIR: stateDir }, + }); + const databasePath = database.path; + closeOpenClawStateDatabaseForTest(); + + const { DatabaseSync } = requireNodeSqlite(); + const legacyDb = new DatabaseSync(databasePath); + legacyDb.exec(` + ALTER TABLE task_runs DROP COLUMN requester_agent_id; + CREATE TRIGGER reject_task_attribution_repair + BEFORE UPDATE ON task_runs + BEGIN + SELECT RAISE(ABORT, 'blocked task attribution repair'); + END; + `); + legacyDb + .prepare( + `INSERT INTO task_runs ( + task_id, + runtime, + requester_session_key, + owner_key, + scope_kind, + child_session_key, + agent_id, + task, + status, + delivery_status, + notify_policy, + created_at, + last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + "blocked-cross-agent", + "subagent", + "agent:main:main", + "agent:main:main", + "session", + "agent:worker:subagent:blocked", + "main", + "Inspect blocked worker state", + "running", + "pending", + "done_only", + 100, + 100, + ); + legacyDb.close(); + + expect(() => + openOpenClawStateDatabase({ + env: { OPENCLAW_STATE_DIR: stateDir }, + }), + ).toThrow(/blocked task attribution repair/); + + const interruptedDb = new DatabaseSync(databasePath); + const interruptedColumns = interruptedDb + .prepare("PRAGMA table_info(task_runs)") + .all() as Array<{ + name?: string; + }>; + expect(interruptedColumns.some((column) => column.name === "requester_agent_id")).toBe(false); + interruptedDb.exec("DROP TRIGGER reject_task_attribution_repair"); + interruptedDb.close(); + + const reopened = openOpenClawStateDatabase({ + env: { OPENCLAW_STATE_DIR: stateDir }, + }); + expect( + reopened.db + .prepare( + `SELECT agent_id, requester_agent_id + FROM task_runs + WHERE task_id = ?`, + ) + .get("blocked-cross-agent"), + ).toEqual({ + agent_id: "worker", + requester_agent_id: "main", + }); }); it("opens databases with early cron tables before creating cron indexes", () => { diff --git a/src/state/openclaw-state-db.ts b/src/state/openclaw-state-db.ts index e75b260a905..67ed7279f30 100644 --- a/src/state/openclaw-state-db.ts +++ b/src/state/openclaw-state-db.ts @@ -230,13 +230,66 @@ function tableExists(db: DatabaseSync, tableName: string): boolean { return row?.ok === 1; } -function ensureColumn(db: DatabaseSync, tableName: string, columnSql: string): void { +function ensureColumn(db: DatabaseSync, tableName: string, columnSql: string): boolean { const columnName = columnSql.trim().split(/\s+/, 1)[0]; if (!columnName || !tableExists(db, tableName) || tableHasColumn(db, tableName, columnName)) { - return; + return false; } // State migrations are additive here; destructive or shape-changing repairs belong in doctor. db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnSql};`); + return true; +} + +function repairLegacyTaskAgentAttribution(db: DatabaseSync): void { + if (!tableExists(db, "task_runs") || !tableHasColumn(db, "task_runs", "requester_agent_id")) { + return; + } + // Before requester_agent_id existed, scoped subagent/ACP rows stored the + // requester in agent_id. Repair only rows with recoverable requester + // provenance; global legacy rows must keep the existing fallback behavior. + db.exec(` + UPDATE task_runs + SET + requester_agent_id = CASE + WHEN owner_key GLOB 'agent:*:*' THEN substr( + owner_key, + 7, + instr(substr(owner_key, 7), ':') - 1 + ) + WHEN requester_session_key GLOB 'agent:*:*' THEN substr( + requester_session_key, + 7, + instr(substr(requester_session_key, 7), ':') - 1 + ) + WHEN agent_id <> substr( + child_session_key, + 7, + instr(substr(child_session_key, 7), ':') - 1 + ) THEN agent_id + ELSE NULL + END, + agent_id = substr( + child_session_key, + 7, + instr(substr(child_session_key, 7), ':') - 1 + ) + WHERE requester_agent_id IS NULL + AND runtime IN ('subagent', 'acp') + AND child_session_key GLOB 'agent:*:*' + AND instr(substr(child_session_key, 7), ':') > 1 + AND ( + owner_key GLOB 'agent:*:*' + OR requester_session_key GLOB 'agent:*:*' + OR ( + agent_id IS NOT NULL + AND agent_id <> substr( + child_session_key, + 7, + instr(substr(child_session_key, 7), ':') - 1 + ) + ) + ); + `); } function hasCanonicalAgentDatabasesPrimaryKey(db: DatabaseSync): boolean { @@ -857,7 +910,12 @@ function ensureAdditiveStateColumns(db: DatabaseSync): void { ensureColumn(db, "gateway_restart_sentinel", "continuation_json TEXT"); ensureColumn(db, "gateway_restart_sentinel", "doctor_hint TEXT"); ensureColumn(db, "gateway_restart_sentinel", "stats_json TEXT"); - ensureColumn(db, "task_runs", "requester_agent_id TEXT"); + runSqliteImmediateTransactionSync(db, () => { + const addedTaskRequesterAgentId = ensureColumn(db, "task_runs", "requester_agent_id TEXT"); + if (addedTaskRequesterAgentId) { + repairLegacyTaskAgentAttribution(db); + } + }); ensureColumn(db, "subagent_runs", "task_name TEXT"); }