fix: prefer latest subagent rows for session control

This commit is contained in:
Tak Hoffman
2026-03-24 17:12:26 -05:00
parent b8a0258618
commit c90ae1ee7f
10 changed files with 300 additions and 13 deletions

View File

@@ -217,6 +217,81 @@ describe("sendControlledSubagentMessage", () => {
replyText: undefined,
});
});
it("sends follow-up messages to the newest finished run when stale active rows still exist", async () => {
const childSessionKey = "agent:main:subagent:finished-stale-worker";
addSubagentRunForTests({
runId: "run-stale-active-send",
childSessionKey,
controllerSessionKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "stale active task",
cleanup: "keep",
createdAt: Date.now() - 9_000,
startedAt: Date.now() - 8_000,
});
addSubagentRunForTests({
runId: "run-current-finished-send",
childSessionKey,
controllerSessionKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "finished task",
cleanup: "keep",
createdAt: Date.now() - 5_000,
startedAt: Date.now() - 4_000,
endedAt: Date.now() - 1_000,
outcome: { status: "ok" },
});
__testing.setDepsForTest({
callGateway: async <T = Record<string, unknown>>(request: CallGatewayOptions) => {
if (request.method === "agent") {
return { runId: "run-followup-stale-send" } as T;
}
if (request.method === "agent.wait") {
return { status: "done" } as T;
}
if (request.method === "chat.history") {
return { messages: [] } as T;
}
throw new Error(`unexpected method: ${request.method}`);
},
});
const result = await sendControlledSubagentMessage({
cfg: {
channels: { whatsapp: { allowFrom: ["*"] } },
} as OpenClawConfig,
controller: {
controllerSessionKey: "agent:main:main",
callerSessionKey: "agent:main:main",
callerIsSubagent: false,
controlScope: "children",
},
entry: {
runId: "run-current-finished-send",
childSessionKey,
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
controllerSessionKey: "agent:main:main",
task: "finished task",
cleanup: "keep",
createdAt: Date.now() - 5_000,
startedAt: Date.now() - 4_000,
endedAt: Date.now() - 1_000,
outcome: { status: "ok" },
},
message: "continue",
});
expect(result).toEqual({
status: "ok",
runId: "run-followup-stale-send",
replyText: undefined,
});
});
});
describe("killSubagentRunAdmin", () => {

View File

@@ -29,6 +29,7 @@ import { resolveStoredSubagentCapabilities } from "./subagent-capabilities.js";
import {
clearSubagentRunSteerRestart,
countPendingDescendantRuns,
getLatestSubagentRunByChildSessionKey,
getSubagentRunByChildSessionKey,
getSubagentSessionRuntimeMs,
getSubagentSessionStartedAt,
@@ -523,7 +524,7 @@ export async function killControlledSubagentRun(params: {
error: "Leaf subagents cannot control other sessions.",
};
}
const currentEntry = getSubagentRunByChildSessionKey(params.entry.childSessionKey);
const currentEntry = getLatestSubagentRunByChildSessionKey(params.entry.childSessionKey);
if (!currentEntry || currentEntry.runId !== params.entry.runId) {
return {
status: "done" as const,
@@ -814,7 +815,7 @@ export async function sendControlledSubagentMessage(params: {
error: "Leaf subagents cannot control other sessions.",
};
}
const currentEntry = getSubagentRunByChildSessionKey(params.entry.childSessionKey);
const currentEntry = getLatestSubagentRunByChildSessionKey(params.entry.childSessionKey);
if (!currentEntry || currentEntry.runId !== params.entry.runId) {
return {
status: "done" as const,

View File

@@ -15,6 +15,7 @@ vi.mock("./subagent-announce.js", () => ({
let addSubagentRunForTests: typeof import("./subagent-registry.js").addSubagentRunForTests;
let clearSubagentRunSteerRestart: typeof import("./subagent-registry.js").clearSubagentRunSteerRestart;
let getSubagentRunByChildSessionKey: typeof import("./subagent-registry.js").getSubagentRunByChildSessionKey;
let getLatestSubagentRunByChildSessionKey: typeof import("./subagent-registry.js").getLatestSubagentRunByChildSessionKey;
let initSubagentRegistry: typeof import("./subagent-registry.js").initSubagentRegistry;
let listSubagentRunsForRequester: typeof import("./subagent-registry.js").listSubagentRunsForRequester;
let registerSubagentRun: typeof import("./subagent-registry.js").registerSubagentRun;
@@ -26,6 +27,7 @@ async function loadSubagentRegistryModules(): Promise<void> {
({
addSubagentRunForTests,
clearSubagentRunSteerRestart,
getLatestSubagentRunByChildSessionKey,
getSubagentRunByChildSessionKey,
initSubagentRegistry,
listSubagentRunsForRequester,
@@ -584,6 +586,52 @@ describe("subagent registry persistence", () => {
expect(resolved?.endedAt).toBeUndefined();
});
it("can resolve the newest child-session row even when an older stale row is still active", async () => {
const childSessionKey = "agent:main:subagent:disk-latest";
await writePersistedRegistry(
{
version: 2,
runs: {
"run-current-ended": {
runId: "run-current-ended",
childSessionKey,
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "completed latest",
cleanup: "keep",
createdAt: 200,
startedAt: 210,
endedAt: 220,
outcome: { status: "ok" },
},
"run-stale-active": {
runId: "run-stale-active",
childSessionKey,
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "stale active",
cleanup: "keep",
createdAt: 100,
startedAt: 110,
},
},
},
{ seedChildSessions: false },
);
resetSubagentRegistryForTests({ persist: false });
const resolved = withEnv({ VITEST: undefined, NODE_ENV: "development" }, () =>
getLatestSubagentRunByChildSessionKey(childSessionKey),
);
expect(resolved).toMatchObject({
runId: "run-current-ended",
childSessionKey,
});
expect(resolved?.endedAt).toBe(220);
});
it("resume guard prunes orphan runs before announce retry", async () => {
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
process.env.OPENCLAW_STATE_DIR = tempStateDir;

View File

@@ -1780,6 +1780,27 @@ export function getSubagentRunByChildSessionKey(childSessionKey: string): Subage
return latestActive ?? latestEnded;
}
export function getLatestSubagentRunByChildSessionKey(
childSessionKey: string,
): SubagentRunRecord | null {
const key = childSessionKey.trim();
if (!key) {
return null;
}
let latest: SubagentRunRecord | null = null;
for (const entry of getSubagentRunsSnapshotForRead(subagentRuns).values()) {
if (entry.childSessionKey !== key) {
continue;
}
if (!latest || entry.createdAt > latest.createdAt) {
latest = entry;
}
}
return latest;
}
export function initSubagentRegistry() {
restoreSubagentRunsOnce();
}

View File

@@ -8,7 +8,7 @@ let cfg: Record<string, unknown> = {};
const authMock = vi.fn(async () => ({ ok: true }) as { ok: boolean; rateLimited?: boolean });
const isLocalDirectRequestMock = vi.fn(() => true);
const loadSessionEntryMock = vi.fn();
const getSubagentRunByChildSessionKeyMock = vi.fn();
const getLatestSubagentRunByChildSessionKeyMock = vi.fn();
const resolveSubagentControllerMock = vi.fn();
const killControlledSubagentRunMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
@@ -27,7 +27,7 @@ vi.mock("./session-utils.js", () => ({
}));
vi.mock("../agents/subagent-registry.js", () => ({
getSubagentRunByChildSessionKey: getSubagentRunByChildSessionKeyMock,
getLatestSubagentRunByChildSessionKey: getLatestSubagentRunByChildSessionKeyMock,
}));
vi.mock("../agents/subagent-control.js", () => ({
@@ -80,7 +80,7 @@ beforeEach(() => {
isLocalDirectRequestMock.mockReset();
isLocalDirectRequestMock.mockReturnValue(true);
loadSessionEntryMock.mockReset();
getSubagentRunByChildSessionKeyMock.mockReset();
getLatestSubagentRunByChildSessionKeyMock.mockReset();
resolveSubagentControllerMock.mockReset();
resolveSubagentControllerMock.mockReturnValue({ controllerSessionKey: "agent:main:main" });
killControlledSubagentRunMock.mockReset();
@@ -190,7 +190,7 @@ describe("POST /sessions/:sessionKey/kill", () => {
entry: { sessionId: "sess-worker", updatedAt: Date.now() },
canonicalKey: "agent:main:subagent:worker",
});
getSubagentRunByChildSessionKeyMock.mockReturnValue({
getLatestSubagentRunByChildSessionKeyMock.mockReturnValue({
runId: "run-1",
childSessionKey: "agent:main:subagent:worker",
});
@@ -205,10 +205,41 @@ describe("POST /sessions/:sessionKey/kill", () => {
cfg,
agentSessionKey: "agent:main:main",
});
expect(getSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith("agent:main:subagent:worker");
expect(getLatestSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith(
"agent:main:subagent:worker",
);
expect(killSubagentRunAdminMock).not.toHaveBeenCalled();
});
it("uses the newest child-session row for requester-owned kills when stale rows still exist", async () => {
isLocalDirectRequestMock.mockReturnValue(false);
authMock.mockResolvedValueOnce({ ok: true });
loadSessionEntryMock.mockReturnValue({
entry: { sessionId: "sess-worker", updatedAt: Date.now() },
canonicalKey: "agent:main:subagent:worker",
});
getLatestSubagentRunByChildSessionKeyMock.mockReturnValue({
runId: "run-current-ended",
childSessionKey: "agent:main:subagent:worker",
endedAt: Date.now() - 1,
});
killControlledSubagentRunMock.mockResolvedValue({ status: "done" });
const response = await post("/sessions/agent%3Amain%3Asubagent%3Aworker/kill", "", {
"x-openclaw-requester-session-key": "agent:main:main",
});
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({ ok: true, killed: false });
expect(killControlledSubagentRunMock).toHaveBeenCalledWith({
cfg,
controller: { controllerSessionKey: "agent:main:main" },
entry: expect.objectContaining({
runId: "run-current-ended",
childSessionKey: "agent:main:subagent:worker",
}),
});
});
it("prefers admin kill when a valid bearer token is present alongside requester headers", async () => {
isLocalDirectRequestMock.mockReturnValue(false);
loadSessionEntryMock.mockReturnValue({

View File

@@ -4,7 +4,7 @@ import {
killSubagentRunAdmin,
resolveSubagentController,
} from "../agents/subagent-control.js";
import { getSubagentRunByChildSessionKey } from "../agents/subagent-registry.js";
import { getLatestSubagentRunByChildSessionKey } from "../agents/subagent-registry.js";
import { loadConfig } from "../config/config.js";
import type { AuthRateLimiter } from "./auth-rate-limit.js";
import { isLocalDirectRequest, type ResolvedGatewayAuth } from "./auth.js";
@@ -112,7 +112,7 @@ export async function handleSessionKillHttpRequest(
let killed = false;
if (!allowAdminKill && requesterSessionKey) {
const runEntry = getSubagentRunByChildSessionKey(canonicalKey);
const runEntry = getLatestSubagentRunByChildSessionKey(canonicalKey);
if (runEntry) {
const result = await killControlledSubagentRun({
cfg,

View File

@@ -0,0 +1,57 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const getLatestSubagentRunByChildSessionKeyMock = vi.fn();
const replaceSubagentRunAfterSteerMock = vi.fn();
vi.mock("../agents/subagent-registry.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../agents/subagent-registry.js")>();
return {
...actual,
getLatestSubagentRunByChildSessionKey: (...args: unknown[]) =>
getLatestSubagentRunByChildSessionKeyMock(...args),
replaceSubagentRunAfterSteer: (...args: unknown[]) => replaceSubagentRunAfterSteerMock(...args),
};
});
import { reactivateCompletedSubagentSession } from "./session-subagent-reactivation.js";
describe("reactivateCompletedSubagentSession", () => {
beforeEach(() => {
getLatestSubagentRunByChildSessionKeyMock.mockReset();
replaceSubagentRunAfterSteerMock.mockReset();
});
it("reactivates the newest ended row even when stale active rows still exist for the same child session", () => {
const childSessionKey = "agent:main:subagent:followup-race";
const latestEndedRun = {
runId: "run-current-ended",
childSessionKey,
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "current ended task",
cleanup: "keep" as const,
createdAt: 20,
startedAt: 21,
endedAt: 22,
outcome: { status: "ok" as const },
};
getLatestSubagentRunByChildSessionKeyMock.mockReturnValue(latestEndedRun);
replaceSubagentRunAfterSteerMock.mockReturnValue(true);
expect(
reactivateCompletedSubagentSession({
sessionKey: childSessionKey,
runId: "run-next",
}),
).toBe(true);
expect(getLatestSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith(childSessionKey);
expect(replaceSubagentRunAfterSteerMock).toHaveBeenCalledWith({
previousRunId: "run-current-ended",
nextRunId: "run-next",
fallback: latestEndedRun,
runTimeoutSeconds: 0,
});
});
});

View File

@@ -1,5 +1,5 @@
import {
getSubagentRunByChildSessionKey,
getLatestSubagentRunByChildSessionKey,
replaceSubagentRunAfterSteer,
} from "../agents/subagent-registry.js";
@@ -11,7 +11,7 @@ export function reactivateCompletedSubagentSession(params: {
if (!runId) {
return false;
}
const existing = getSubagentRunByChildSessionKey(params.sessionKey);
const existing = getLatestSubagentRunByChildSessionKey(params.sessionKey);
if (!existing || typeof existing.endedAt !== "number") {
return false;
}

View File

@@ -1317,6 +1317,60 @@ describe("listSessionsFromStore subagent metadata", () => {
expect(followup?.runtimeMs).toBeGreaterThanOrEqual(150_000);
});
test("uses the newest child-session row for stale/current replacement pairs", () => {
const now = Date.now();
const childSessionKey = "agent:main:subagent:stale-current";
const store: Record<string, SessionEntry> = {
[childSessionKey]: {
sessionId: "sess-stale-current",
updatedAt: now,
spawnedBy: "agent:main:main",
} as SessionEntry,
};
addSubagentRunForTests({
runId: "run-stale-active",
childSessionKey,
controllerSessionKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "stale active row",
cleanup: "keep",
createdAt: now - 5_000,
startedAt: now - 4_500,
model: "openai/gpt-5.4",
});
addSubagentRunForTests({
runId: "run-current-ended",
childSessionKey,
controllerSessionKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "current ended row",
cleanup: "keep",
createdAt: now - 1_000,
startedAt: now - 900,
endedAt: now - 200,
outcome: { status: "ok" },
model: "openai/gpt-5.4",
});
const result = listSessionsFromStore({
cfg,
storePath: "/tmp/sessions.json",
store,
opts: {},
});
expect(result.sessions).toHaveLength(1);
expect(result.sessions[0]).toMatchObject({
key: childSessionKey,
status: "done",
startedAt: now - 900,
endedAt: now - 200,
});
});
test("uses persisted active subagent runs when the local worker only has terminal snapshots", async () => {
const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-subagent-"));
const stateDir = path.join(tempRoot, "state");

View File

@@ -10,7 +10,7 @@ import {
resolveDefaultModelForAgent,
} from "../agents/model-selection.js";
import {
getSubagentRunByChildSessionKey,
getLatestSubagentRunByChildSessionKey,
getSubagentSessionRuntimeMs,
getSubagentSessionStartedAt,
listSubagentRunsForController,
@@ -1055,7 +1055,7 @@ export function buildGatewaySessionRow(params: {
const deliveryFields = normalizeSessionDeliveryFields(entry);
const parsedAgent = parseAgentSessionKey(key);
const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg));
const subagentRun = getSubagentRunByChildSessionKey(key);
const subagentRun = getLatestSubagentRunByChildSessionKey(key);
const subagentStatus = subagentRun ? resolveSubagentSessionStatus(subagentRun) : undefined;
const subagentStartedAt = subagentRun ? getSubagentSessionStartedAt(subagentRun) : undefined;
const subagentEndedAt = subagentRun ? subagentRun.endedAt : undefined;