diff --git a/src/agents/subagent-control.test.ts b/src/agents/subagent-control.test.ts index 878b3d91c14..b8b691040e2 100644 --- a/src/agents/subagent-control.test.ts +++ b/src/agents/subagent-control.test.ts @@ -217,6 +217,81 @@ describe("sendControlledSubagentMessage", () => { replyText: undefined, }); }); + + it("sends follow-up messages to the newest finished run when stale active rows still exist", async () => { + const childSessionKey = "agent:main:subagent:finished-stale-worker"; + addSubagentRunForTests({ + runId: "run-stale-active-send", + childSessionKey, + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "stale active task", + cleanup: "keep", + createdAt: Date.now() - 9_000, + startedAt: Date.now() - 8_000, + }); + addSubagentRunForTests({ + runId: "run-current-finished-send", + childSessionKey, + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "finished task", + cleanup: "keep", + createdAt: Date.now() - 5_000, + startedAt: Date.now() - 4_000, + endedAt: Date.now() - 1_000, + outcome: { status: "ok" }, + }); + + __testing.setDepsForTest({ + callGateway: async >(request: CallGatewayOptions) => { + if (request.method === "agent") { + return { runId: "run-followup-stale-send" } as T; + } + if (request.method === "agent.wait") { + return { status: "done" } as T; + } + if (request.method === "chat.history") { + return { messages: [] } as T; + } + throw new Error(`unexpected method: ${request.method}`); + }, + }); + + const result = await sendControlledSubagentMessage({ + cfg: { + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig, + controller: { + controllerSessionKey: "agent:main:main", + callerSessionKey: "agent:main:main", + callerIsSubagent: false, + controlScope: "children", + }, + entry: { + runId: "run-current-finished-send", + childSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + controllerSessionKey: "agent:main:main", + task: "finished task", + cleanup: "keep", + createdAt: Date.now() - 5_000, + startedAt: Date.now() - 4_000, + endedAt: Date.now() - 1_000, + outcome: { status: "ok" }, + }, + message: "continue", + }); + + expect(result).toEqual({ + status: "ok", + runId: "run-followup-stale-send", + replyText: undefined, + }); + }); }); describe("killSubagentRunAdmin", () => { diff --git a/src/agents/subagent-control.ts b/src/agents/subagent-control.ts index bd45dd558b8..f7b2f3a3e02 100644 --- a/src/agents/subagent-control.ts +++ b/src/agents/subagent-control.ts @@ -29,6 +29,7 @@ import { resolveStoredSubagentCapabilities } from "./subagent-capabilities.js"; import { clearSubagentRunSteerRestart, countPendingDescendantRuns, + getLatestSubagentRunByChildSessionKey, getSubagentRunByChildSessionKey, getSubagentSessionRuntimeMs, getSubagentSessionStartedAt, @@ -523,7 +524,7 @@ export async function killControlledSubagentRun(params: { error: "Leaf subagents cannot control other sessions.", }; } - const currentEntry = getSubagentRunByChildSessionKey(params.entry.childSessionKey); + const currentEntry = getLatestSubagentRunByChildSessionKey(params.entry.childSessionKey); if (!currentEntry || currentEntry.runId !== params.entry.runId) { return { status: "done" as const, @@ -814,7 +815,7 @@ export async function sendControlledSubagentMessage(params: { error: "Leaf subagents cannot control other sessions.", }; } - const currentEntry = getSubagentRunByChildSessionKey(params.entry.childSessionKey); + const currentEntry = getLatestSubagentRunByChildSessionKey(params.entry.childSessionKey); if (!currentEntry || currentEntry.runId !== params.entry.runId) { return { status: "done" as const, diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index 36663b709b0..ea9780462cc 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -15,6 +15,7 @@ vi.mock("./subagent-announce.js", () => ({ let addSubagentRunForTests: typeof import("./subagent-registry.js").addSubagentRunForTests; let clearSubagentRunSteerRestart: typeof import("./subagent-registry.js").clearSubagentRunSteerRestart; let getSubagentRunByChildSessionKey: typeof import("./subagent-registry.js").getSubagentRunByChildSessionKey; +let getLatestSubagentRunByChildSessionKey: typeof import("./subagent-registry.js").getLatestSubagentRunByChildSessionKey; let initSubagentRegistry: typeof import("./subagent-registry.js").initSubagentRegistry; let listSubagentRunsForRequester: typeof import("./subagent-registry.js").listSubagentRunsForRequester; let registerSubagentRun: typeof import("./subagent-registry.js").registerSubagentRun; @@ -26,6 +27,7 @@ async function loadSubagentRegistryModules(): Promise { ({ addSubagentRunForTests, clearSubagentRunSteerRestart, + getLatestSubagentRunByChildSessionKey, getSubagentRunByChildSessionKey, initSubagentRegistry, listSubagentRunsForRequester, @@ -584,6 +586,52 @@ describe("subagent registry persistence", () => { expect(resolved?.endedAt).toBeUndefined(); }); + it("can resolve the newest child-session row even when an older stale row is still active", async () => { + const childSessionKey = "agent:main:subagent:disk-latest"; + await writePersistedRegistry( + { + version: 2, + runs: { + "run-current-ended": { + runId: "run-current-ended", + childSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "completed latest", + cleanup: "keep", + createdAt: 200, + startedAt: 210, + endedAt: 220, + outcome: { status: "ok" }, + }, + "run-stale-active": { + runId: "run-stale-active", + childSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "stale active", + cleanup: "keep", + createdAt: 100, + startedAt: 110, + }, + }, + }, + { seedChildSessions: false }, + ); + + resetSubagentRegistryForTests({ persist: false }); + + const resolved = withEnv({ VITEST: undefined, NODE_ENV: "development" }, () => + getLatestSubagentRunByChildSessionKey(childSessionKey), + ); + + expect(resolved).toMatchObject({ + runId: "run-current-ended", + childSessionKey, + }); + expect(resolved?.endedAt).toBe(220); + }); + it("resume guard prunes orphan runs before announce retry", async () => { tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-")); process.env.OPENCLAW_STATE_DIR = tempStateDir; diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index fdde8cc5fea..fb1567b2db2 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -1780,6 +1780,27 @@ export function getSubagentRunByChildSessionKey(childSessionKey: string): Subage return latestActive ?? latestEnded; } +export function getLatestSubagentRunByChildSessionKey( + childSessionKey: string, +): SubagentRunRecord | null { + const key = childSessionKey.trim(); + if (!key) { + return null; + } + + let latest: SubagentRunRecord | null = null; + for (const entry of getSubagentRunsSnapshotForRead(subagentRuns).values()) { + if (entry.childSessionKey !== key) { + continue; + } + if (!latest || entry.createdAt > latest.createdAt) { + latest = entry; + } + } + + return latest; +} + export function initSubagentRegistry() { restoreSubagentRunsOnce(); } diff --git a/src/gateway/session-kill-http.test.ts b/src/gateway/session-kill-http.test.ts index b313b289383..a049b4c428e 100644 --- a/src/gateway/session-kill-http.test.ts +++ b/src/gateway/session-kill-http.test.ts @@ -8,7 +8,7 @@ let cfg: Record = {}; const authMock = vi.fn(async () => ({ ok: true }) as { ok: boolean; rateLimited?: boolean }); const isLocalDirectRequestMock = vi.fn(() => true); const loadSessionEntryMock = vi.fn(); -const getSubagentRunByChildSessionKeyMock = vi.fn(); +const getLatestSubagentRunByChildSessionKeyMock = vi.fn(); const resolveSubagentControllerMock = vi.fn(); const killControlledSubagentRunMock = vi.fn(); const killSubagentRunAdminMock = vi.fn(); @@ -27,7 +27,7 @@ vi.mock("./session-utils.js", () => ({ })); vi.mock("../agents/subagent-registry.js", () => ({ - getSubagentRunByChildSessionKey: getSubagentRunByChildSessionKeyMock, + getLatestSubagentRunByChildSessionKey: getLatestSubagentRunByChildSessionKeyMock, })); vi.mock("../agents/subagent-control.js", () => ({ @@ -80,7 +80,7 @@ beforeEach(() => { isLocalDirectRequestMock.mockReset(); isLocalDirectRequestMock.mockReturnValue(true); loadSessionEntryMock.mockReset(); - getSubagentRunByChildSessionKeyMock.mockReset(); + getLatestSubagentRunByChildSessionKeyMock.mockReset(); resolveSubagentControllerMock.mockReset(); resolveSubagentControllerMock.mockReturnValue({ controllerSessionKey: "agent:main:main" }); killControlledSubagentRunMock.mockReset(); @@ -190,7 +190,7 @@ describe("POST /sessions/:sessionKey/kill", () => { entry: { sessionId: "sess-worker", updatedAt: Date.now() }, canonicalKey: "agent:main:subagent:worker", }); - getSubagentRunByChildSessionKeyMock.mockReturnValue({ + getLatestSubagentRunByChildSessionKeyMock.mockReturnValue({ runId: "run-1", childSessionKey: "agent:main:subagent:worker", }); @@ -205,10 +205,41 @@ describe("POST /sessions/:sessionKey/kill", () => { cfg, agentSessionKey: "agent:main:main", }); - expect(getSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith("agent:main:subagent:worker"); + expect(getLatestSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith( + "agent:main:subagent:worker", + ); expect(killSubagentRunAdminMock).not.toHaveBeenCalled(); }); + it("uses the newest child-session row for requester-owned kills when stale rows still exist", async () => { + isLocalDirectRequestMock.mockReturnValue(false); + authMock.mockResolvedValueOnce({ ok: true }); + loadSessionEntryMock.mockReturnValue({ + entry: { sessionId: "sess-worker", updatedAt: Date.now() }, + canonicalKey: "agent:main:subagent:worker", + }); + getLatestSubagentRunByChildSessionKeyMock.mockReturnValue({ + runId: "run-current-ended", + childSessionKey: "agent:main:subagent:worker", + endedAt: Date.now() - 1, + }); + killControlledSubagentRunMock.mockResolvedValue({ status: "done" }); + + const response = await post("/sessions/agent%3Amain%3Asubagent%3Aworker/kill", "", { + "x-openclaw-requester-session-key": "agent:main:main", + }); + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ ok: true, killed: false }); + expect(killControlledSubagentRunMock).toHaveBeenCalledWith({ + cfg, + controller: { controllerSessionKey: "agent:main:main" }, + entry: expect.objectContaining({ + runId: "run-current-ended", + childSessionKey: "agent:main:subagent:worker", + }), + }); + }); + it("prefers admin kill when a valid bearer token is present alongside requester headers", async () => { isLocalDirectRequestMock.mockReturnValue(false); loadSessionEntryMock.mockReturnValue({ diff --git a/src/gateway/session-kill-http.ts b/src/gateway/session-kill-http.ts index 8e255b4e387..0d0b6e6ca70 100644 --- a/src/gateway/session-kill-http.ts +++ b/src/gateway/session-kill-http.ts @@ -4,7 +4,7 @@ import { killSubagentRunAdmin, resolveSubagentController, } from "../agents/subagent-control.js"; -import { getSubagentRunByChildSessionKey } from "../agents/subagent-registry.js"; +import { getLatestSubagentRunByChildSessionKey } from "../agents/subagent-registry.js"; import { loadConfig } from "../config/config.js"; import type { AuthRateLimiter } from "./auth-rate-limit.js"; import { isLocalDirectRequest, type ResolvedGatewayAuth } from "./auth.js"; @@ -112,7 +112,7 @@ export async function handleSessionKillHttpRequest( let killed = false; if (!allowAdminKill && requesterSessionKey) { - const runEntry = getSubagentRunByChildSessionKey(canonicalKey); + const runEntry = getLatestSubagentRunByChildSessionKey(canonicalKey); if (runEntry) { const result = await killControlledSubagentRun({ cfg, diff --git a/src/gateway/session-subagent-reactivation.test.ts b/src/gateway/session-subagent-reactivation.test.ts new file mode 100644 index 00000000000..6a5357b8ef2 --- /dev/null +++ b/src/gateway/session-subagent-reactivation.test.ts @@ -0,0 +1,57 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const getLatestSubagentRunByChildSessionKeyMock = vi.fn(); +const replaceSubagentRunAfterSteerMock = vi.fn(); + +vi.mock("../agents/subagent-registry.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getLatestSubagentRunByChildSessionKey: (...args: unknown[]) => + getLatestSubagentRunByChildSessionKeyMock(...args), + replaceSubagentRunAfterSteer: (...args: unknown[]) => replaceSubagentRunAfterSteerMock(...args), + }; +}); + +import { reactivateCompletedSubagentSession } from "./session-subagent-reactivation.js"; + +describe("reactivateCompletedSubagentSession", () => { + beforeEach(() => { + getLatestSubagentRunByChildSessionKeyMock.mockReset(); + replaceSubagentRunAfterSteerMock.mockReset(); + }); + + it("reactivates the newest ended row even when stale active rows still exist for the same child session", () => { + const childSessionKey = "agent:main:subagent:followup-race"; + const latestEndedRun = { + runId: "run-current-ended", + childSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "current ended task", + cleanup: "keep" as const, + createdAt: 20, + startedAt: 21, + endedAt: 22, + outcome: { status: "ok" as const }, + }; + + getLatestSubagentRunByChildSessionKeyMock.mockReturnValue(latestEndedRun); + replaceSubagentRunAfterSteerMock.mockReturnValue(true); + + expect( + reactivateCompletedSubagentSession({ + sessionKey: childSessionKey, + runId: "run-next", + }), + ).toBe(true); + + expect(getLatestSubagentRunByChildSessionKeyMock).toHaveBeenCalledWith(childSessionKey); + expect(replaceSubagentRunAfterSteerMock).toHaveBeenCalledWith({ + previousRunId: "run-current-ended", + nextRunId: "run-next", + fallback: latestEndedRun, + runTimeoutSeconds: 0, + }); + }); +}); diff --git a/src/gateway/session-subagent-reactivation.ts b/src/gateway/session-subagent-reactivation.ts index 3664739a1e5..5bc52999da7 100644 --- a/src/gateway/session-subagent-reactivation.ts +++ b/src/gateway/session-subagent-reactivation.ts @@ -1,5 +1,5 @@ import { - getSubagentRunByChildSessionKey, + getLatestSubagentRunByChildSessionKey, replaceSubagentRunAfterSteer, } from "../agents/subagent-registry.js"; @@ -11,7 +11,7 @@ export function reactivateCompletedSubagentSession(params: { if (!runId) { return false; } - const existing = getSubagentRunByChildSessionKey(params.sessionKey); + const existing = getLatestSubagentRunByChildSessionKey(params.sessionKey); if (!existing || typeof existing.endedAt !== "number") { return false; } diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 2fcbdd1c4ec..72c652ad916 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -1317,6 +1317,60 @@ describe("listSessionsFromStore subagent metadata", () => { expect(followup?.runtimeMs).toBeGreaterThanOrEqual(150_000); }); + test("uses the newest child-session row for stale/current replacement pairs", () => { + const now = Date.now(); + const childSessionKey = "agent:main:subagent:stale-current"; + const store: Record = { + [childSessionKey]: { + sessionId: "sess-stale-current", + updatedAt: now, + spawnedBy: "agent:main:main", + } as SessionEntry, + }; + + addSubagentRunForTests({ + runId: "run-stale-active", + childSessionKey, + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "stale active row", + cleanup: "keep", + createdAt: now - 5_000, + startedAt: now - 4_500, + model: "openai/gpt-5.4", + }); + addSubagentRunForTests({ + runId: "run-current-ended", + childSessionKey, + controllerSessionKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "current ended row", + cleanup: "keep", + createdAt: now - 1_000, + startedAt: now - 900, + endedAt: now - 200, + outcome: { status: "ok" }, + model: "openai/gpt-5.4", + }); + + const result = listSessionsFromStore({ + cfg, + storePath: "/tmp/sessions.json", + store, + opts: {}, + }); + + expect(result.sessions).toHaveLength(1); + expect(result.sessions[0]).toMatchObject({ + key: childSessionKey, + status: "done", + startedAt: now - 900, + endedAt: now - 200, + }); + }); + test("uses persisted active subagent runs when the local worker only has terminal snapshots", async () => { const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-session-utils-subagent-")); const stateDir = path.join(tempRoot, "state"); diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 235d608d89f..d162dc7a65c 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -10,7 +10,7 @@ import { resolveDefaultModelForAgent, } from "../agents/model-selection.js"; import { - getSubagentRunByChildSessionKey, + getLatestSubagentRunByChildSessionKey, getSubagentSessionRuntimeMs, getSubagentSessionStartedAt, listSubagentRunsForController, @@ -1055,7 +1055,7 @@ export function buildGatewaySessionRow(params: { const deliveryFields = normalizeSessionDeliveryFields(entry); const parsedAgent = parseAgentSessionKey(key); const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg)); - const subagentRun = getSubagentRunByChildSessionKey(key); + const subagentRun = getLatestSubagentRunByChildSessionKey(key); const subagentStatus = subagentRun ? resolveSubagentSessionStatus(subagentRun) : undefined; const subagentStartedAt = subagentRun ? getSubagentSessionStartedAt(subagentRun) : undefined; const subagentEndedAt = subagentRun ? subagentRun.endedAt : undefined;