fix(tasks): migrate legacy agent attribution

This commit is contained in:
Peter Steinberger
2026-06-15 09:42:41 -04:00
parent 1fef20c96b
commit 0bbac63d00
4 changed files with 435 additions and 16 deletions

View File

@@ -431,9 +431,9 @@ function writeLegacyTaskStateSidecars(root: string): {
.prepare(
`
INSERT INTO task_runs (
task_id, runtime, source_id, requester_session_key, child_session_key, run_id, task,
status, delivery_status, notify_policy, created_at, last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
task_id, runtime, source_id, requester_session_key, child_session_key, agent_id, run_id,
task, status, delivery_status, notify_policy, created_at, last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
)
.run(
@@ -442,6 +442,7 @@ function writeLegacyTaskStateSidecars(root: string): {
"nightly",
"",
"agent:main:cron:nightly",
"ops",
"legacy-task-run",
"Legacy cron task",
"running",
@@ -507,6 +508,36 @@ function writeLegacyTaskStateSidecars(root: string): {
return { taskRunsPath, flowRunsPath };
}
function appendLegacyCrossAgentTask(taskRunsPath: string): void {
const sqlite = requireNodeSqlite();
const db = new sqlite.DatabaseSync(taskRunsPath);
try {
db.prepare(
`
INSERT INTO task_runs (
task_id, runtime, requester_session_key, child_session_key, agent_id, run_id, task,
status, delivery_status, notify_policy, created_at, last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
).run(
"legacy-cross-agent",
"subagent",
"agent:main:main",
"agent:worker:subagent:child",
"main",
"legacy-cross-agent-run",
"Inspect worker state",
"running",
"pending",
"done_only",
130,
140,
);
} finally {
db.close();
}
}
async function detectAndRunMigrations(params: {
root: string;
cfg: OpenClawConfig;
@@ -2271,6 +2302,7 @@ describe("doctor legacy state migrations", () => {
ownerKey: "system:cron:nightly",
scopeKind: "system",
requesterSessionKey: "",
agentId: "ops",
runId: "legacy-task-run",
});
expect(taskState.deliveryStates.get("legacy-task")).toMatchObject({
@@ -2345,6 +2377,90 @@ describe("doctor legacy state migrations", () => {
});
});
it("canonicalizes cross-agent attribution while importing task sidecars", async () => {
const root = await makeTempRoot();
const { taskRunsPath } = writeLegacyTaskStateSidecars(root);
appendLegacyCrossAgentTask(taskRunsPath);
const result = await autoMigrateLegacyTaskStateSidecars({
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
expect(result.warnings).toStrictEqual([]);
expect(result.changes).toContain("Migrated 2 task registry sidecar rows → shared SQLite state");
await withStateDir(root, async () => {
expect(loadTaskRegistryStateFromSqlite().tasks.get("legacy-cross-agent")).toMatchObject({
taskId: "legacy-cross-agent",
agentId: "worker",
requesterAgentId: "main",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:worker:subagent:child",
});
});
});
it("keeps task sidecars when only requester attribution conflicts", async () => {
const root = await makeTempRoot();
const { taskRunsPath } = writeLegacyTaskStateSidecars(root);
appendLegacyCrossAgentTask(taskRunsPath);
await withStateDir(root, async () => {
loadTaskRegistryStateFromSqlite();
closeOpenClawStateDatabaseForTest();
const sqlite = requireNodeSqlite();
const db = new sqlite.DatabaseSync(path.join(root, "state", "openclaw.sqlite"));
try {
db.prepare(
`INSERT INTO task_runs (
task_id,
runtime,
requester_session_key,
owner_key,
scope_kind,
child_session_key,
agent_id,
requester_agent_id,
run_id,
task,
status,
delivery_status,
notify_policy,
created_at,
last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
).run(
"legacy-cross-agent",
"subagent",
"agent:main:main",
"agent:main:main",
"session",
"agent:worker:subagent:child",
"worker",
"other-requester",
"legacy-cross-agent-run",
"Inspect worker state",
"running",
"pending",
"done_only",
130,
140,
);
} finally {
db.close();
}
});
const result = await autoMigrateLegacyTaskStateSidecars({
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
expect(result.warnings).toContain(
"Left task registry sidecar in place because 1 row already existed in shared state: legacy-cross-agent",
);
expect(fs.existsSync(taskRunsPath)).toBe(true);
});
it("keeps task sidecars when shared state already has conflicting task rows", async () => {
const root = await makeTempRoot();
const { taskRunsPath, flowRunsPath } = writeLegacyTaskStateSidecars(root);

View File

@@ -658,6 +658,19 @@ function normalizeLegacyTaskRow(row: Record<string, unknown>): SqliteBindRow {
const ownerKey = ownerRaw || requesterRaw || `system:${runtime}:${sourceId || taskId}`;
const scopeRaw = typeof row.scope_kind === "string" ? row.scope_kind : "";
const scopeKind = scopeRaw === "system" || ownerKey.startsWith("system:") ? "system" : "session";
const childSessionKey =
typeof row.child_session_key === "string" ? row.child_session_key.trim() : "";
const persistedAgentId = typeof row.agent_id === "string" ? row.agent_id.trim() : "";
const isSpawnRuntime = runtime === "subagent" || runtime === "acp";
const childAgentId = isSpawnRuntime ? parseAgentSessionKey(childSessionKey)?.agentId : undefined;
const requesterAgentId =
(typeof row.requester_agent_id === "string" ? row.requester_agent_id.trim() : "") ||
(isSpawnRuntime
? (parseAgentSessionKey(ownerKey)?.agentId ??
parseAgentSessionKey(requesterRaw)?.agentId ??
(childAgentId && persistedAgentId !== childAgentId ? persistedAgentId : ""))
: "");
const executorAgentId = requesterAgentId ? childAgentId || persistedAgentId : persistedAgentId;
return {
task_id: taskId,
runtime,
@@ -666,10 +679,11 @@ function normalizeLegacyTaskRow(row: Record<string, unknown>): SqliteBindRow {
requester_session_key: scopeKind === "system" ? "" : requesterRaw || ownerKey,
owner_key: ownerKey,
scope_kind: scopeKind,
child_session_key: legacyBindValue(row.child_session_key),
child_session_key: childSessionKey || null,
parent_flow_id: legacyBindValue(row.parent_flow_id),
parent_task_id: legacyBindValue(row.parent_task_id),
agent_id: legacyBindValue(row.agent_id),
agent_id: executorAgentId || null,
requester_agent_id: requesterAgentId || null,
run_id: legacyBindValue(row.run_id),
label: legacyBindValue(row.label),
task: legacyBindValue(row.task ?? ""),
@@ -760,6 +774,7 @@ function readLegacyTaskRows(sourcePath: string): SqliteBindRow[] {
pickLegacyColumn(columns, "parent_flow_id"),
pickLegacyColumn(columns, "parent_task_id"),
pickLegacyColumn(columns, "agent_id"),
pickLegacyColumn(columns, "requester_agent_id"),
pickLegacyColumn(columns, "run_id"),
pickLegacyColumn(columns, "label"),
"task",
@@ -851,15 +866,15 @@ function insertTaskRunRowSql(db: DatabaseSync, row: SqliteBindRow): void {
`
INSERT INTO task_runs (
task_id, runtime, task_kind, source_id, requester_session_key, owner_key, scope_kind,
child_session_key, parent_flow_id, parent_task_id, agent_id, run_id, label, task, status,
delivery_status, notify_policy, created_at, started_at, ended_at, last_event_at,
cleanup_after, error, progress_summary, terminal_summary, terminal_outcome
child_session_key, parent_flow_id, parent_task_id, agent_id, requester_agent_id, run_id,
label, task, status, delivery_status, notify_policy, created_at, started_at, ended_at,
last_event_at, cleanup_after, error, progress_summary, terminal_summary, terminal_outcome
) VALUES (
@task_id, @runtime, @task_kind, @source_id, @requester_session_key, @owner_key,
@scope_kind, @child_session_key, @parent_flow_id, @parent_task_id, @agent_id, @run_id,
@label, @task, @status, @delivery_status, @notify_policy, @created_at, @started_at,
@ended_at, @last_event_at, @cleanup_after, @error, @progress_summary, @terminal_summary,
@terminal_outcome
@scope_kind, @child_session_key, @parent_flow_id, @parent_task_id, @agent_id,
@requester_agent_id, @run_id, @label, @task, @status, @delivery_status, @notify_policy,
@created_at, @started_at, @ended_at, @last_event_at, @cleanup_after, @error,
@progress_summary, @terminal_summary, @terminal_outcome
)
`,
).run(row);
@@ -933,6 +948,7 @@ async function migrateLegacyTaskRunsSidecar(params: {
"parent_flow_id",
"parent_task_id",
"agent_id",
"requester_agent_id",
"run_id",
"label",
"task",

View File

@@ -79,7 +79,7 @@ describe("openclaw state database", () => {
expect(database.path).toBe(path.join(stateDir, "state", "openclaw.sqlite"));
});
it("adds requester agent attribution to existing task tables", () => {
it("migrates requester and executor attribution for existing cross-agent tasks", () => {
const stateDir = createTempStateDir();
const database = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
@@ -90,6 +90,72 @@ describe("openclaw state database", () => {
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(databasePath);
legacyDb.exec("ALTER TABLE task_runs DROP COLUMN requester_agent_id");
legacyDb
.prepare(
`INSERT INTO task_runs (
task_id,
runtime,
requester_session_key,
owner_key,
scope_kind,
child_session_key,
agent_id,
task,
status,
delivery_status,
notify_policy,
created_at,
last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.run(
"legacy-cross-agent",
"subagent",
"agent:main:main",
"agent:main:main",
"session",
"agent:worker:subagent:child",
"main",
"Inspect worker state",
"running",
"pending",
"done_only",
100,
100,
);
legacyDb
.prepare(
`INSERT INTO task_runs (
task_id,
runtime,
requester_session_key,
owner_key,
scope_kind,
child_session_key,
agent_id,
task,
status,
delivery_status,
notify_policy,
created_at,
last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.run(
"legacy-global-cross-agent",
"subagent",
"global",
"global",
"session",
"agent:worker:subagent:global-child",
null,
"Inspect global worker state",
"running",
"pending",
"done_only",
110,
110,
);
legacyDb.close();
const reopened = openOpenClawStateDatabase({
@@ -99,6 +165,169 @@ describe("openclaw state database", () => {
name?: string;
}>;
expect(columns.some((column) => column.name === "requester_agent_id")).toBe(true);
expect(
reopened.db
.prepare(
`SELECT agent_id, requester_agent_id
FROM task_runs
WHERE task_id = ?`,
)
.get("legacy-cross-agent"),
).toEqual({
agent_id: "worker",
requester_agent_id: "main",
});
expect(
reopened.db
.prepare(
`SELECT agent_id, requester_agent_id
FROM task_runs
WHERE task_id = ?`,
)
.get("legacy-global-cross-agent"),
).toEqual({
agent_id: null,
requester_agent_id: null,
});
reopened.db
.prepare(
`INSERT INTO task_runs (
task_id,
runtime,
requester_session_key,
owner_key,
scope_kind,
child_session_key,
agent_id,
requester_agent_id,
task,
status,
delivery_status,
notify_policy,
created_at,
last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.run(
"current-explicit-attribution",
"subagent",
"global",
"global",
"session",
"agent:worker:subagent:current",
"main",
null,
"Current explicit attribution",
"running",
"pending",
"done_only",
200,
200,
);
closeOpenClawStateDatabaseForTest();
const currentReopened = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(
currentReopened.db
.prepare(
`SELECT agent_id, requester_agent_id
FROM task_runs
WHERE task_id = ?`,
)
.get("current-explicit-attribution"),
).toEqual({
agent_id: "main",
requester_agent_id: null,
});
});
it("rolls back the requester attribution column when its backfill fails", () => {
const stateDir = createTempStateDir();
const database = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
const databasePath = database.path;
closeOpenClawStateDatabaseForTest();
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(databasePath);
legacyDb.exec(`
ALTER TABLE task_runs DROP COLUMN requester_agent_id;
CREATE TRIGGER reject_task_attribution_repair
BEFORE UPDATE ON task_runs
BEGIN
SELECT RAISE(ABORT, 'blocked task attribution repair');
END;
`);
legacyDb
.prepare(
`INSERT INTO task_runs (
task_id,
runtime,
requester_session_key,
owner_key,
scope_kind,
child_session_key,
agent_id,
task,
status,
delivery_status,
notify_policy,
created_at,
last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.run(
"blocked-cross-agent",
"subagent",
"agent:main:main",
"agent:main:main",
"session",
"agent:worker:subagent:blocked",
"main",
"Inspect blocked worker state",
"running",
"pending",
"done_only",
100,
100,
);
legacyDb.close();
expect(() =>
openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
}),
).toThrow(/blocked task attribution repair/);
const interruptedDb = new DatabaseSync(databasePath);
const interruptedColumns = interruptedDb
.prepare("PRAGMA table_info(task_runs)")
.all() as Array<{
name?: string;
}>;
expect(interruptedColumns.some((column) => column.name === "requester_agent_id")).toBe(false);
interruptedDb.exec("DROP TRIGGER reject_task_attribution_repair");
interruptedDb.close();
const reopened = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(
reopened.db
.prepare(
`SELECT agent_id, requester_agent_id
FROM task_runs
WHERE task_id = ?`,
)
.get("blocked-cross-agent"),
).toEqual({
agent_id: "worker",
requester_agent_id: "main",
});
});
it("opens databases with early cron tables before creating cron indexes", () => {

View File

@@ -230,13 +230,66 @@ function tableExists(db: DatabaseSync, tableName: string): boolean {
return row?.ok === 1;
}
function ensureColumn(db: DatabaseSync, tableName: string, columnSql: string): void {
function ensureColumn(db: DatabaseSync, tableName: string, columnSql: string): boolean {
const columnName = columnSql.trim().split(/\s+/, 1)[0];
if (!columnName || !tableExists(db, tableName) || tableHasColumn(db, tableName, columnName)) {
return;
return false;
}
// State migrations are additive here; destructive or shape-changing repairs belong in doctor.
db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnSql};`);
return true;
}
function repairLegacyTaskAgentAttribution(db: DatabaseSync): void {
if (!tableExists(db, "task_runs") || !tableHasColumn(db, "task_runs", "requester_agent_id")) {
return;
}
// Before requester_agent_id existed, scoped subagent/ACP rows stored the
// requester in agent_id. Repair only rows with recoverable requester
// provenance; global legacy rows must keep the existing fallback behavior.
db.exec(`
UPDATE task_runs
SET
requester_agent_id = CASE
WHEN owner_key GLOB 'agent:*:*' THEN substr(
owner_key,
7,
instr(substr(owner_key, 7), ':') - 1
)
WHEN requester_session_key GLOB 'agent:*:*' THEN substr(
requester_session_key,
7,
instr(substr(requester_session_key, 7), ':') - 1
)
WHEN agent_id <> substr(
child_session_key,
7,
instr(substr(child_session_key, 7), ':') - 1
) THEN agent_id
ELSE NULL
END,
agent_id = substr(
child_session_key,
7,
instr(substr(child_session_key, 7), ':') - 1
)
WHERE requester_agent_id IS NULL
AND runtime IN ('subagent', 'acp')
AND child_session_key GLOB 'agent:*:*'
AND instr(substr(child_session_key, 7), ':') > 1
AND (
owner_key GLOB 'agent:*:*'
OR requester_session_key GLOB 'agent:*:*'
OR (
agent_id IS NOT NULL
AND agent_id <> substr(
child_session_key,
7,
instr(substr(child_session_key, 7), ':') - 1
)
)
);
`);
}
function hasCanonicalAgentDatabasesPrimaryKey(db: DatabaseSync): boolean {
@@ -857,7 +910,12 @@ function ensureAdditiveStateColumns(db: DatabaseSync): void {
ensureColumn(db, "gateway_restart_sentinel", "continuation_json TEXT");
ensureColumn(db, "gateway_restart_sentinel", "doctor_hint TEXT");
ensureColumn(db, "gateway_restart_sentinel", "stats_json TEXT");
ensureColumn(db, "task_runs", "requester_agent_id TEXT");
runSqliteImmediateTransactionSync(db, () => {
const addedTaskRequesterAgentId = ensureColumn(db, "task_runs", "requester_agent_id TEXT");
if (addedTaskRequesterAgentId) {
repairLegacyTaskAgentAttribution(db);
}
});
ensureColumn(db, "subagent_runs", "task_name TEXT");
}