diff --git a/CHANGELOG.md b/CHANGELOG.md index b7323c5c0d2..74523cf9072 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai - Telegram/native commands: pass persisted session files into plugin commands for topic-bound sessions, so `/codex bind` works from Telegram forum topics. Refs #75845 and #76049. Thanks @MatthewSchleder. - Telegram: honor runtime conversation bindings for native slash commands in bound top-level groups, so commands like `/status@bot` route to the active non-`main` session instead of falling back to the default route. Fixes #75405; supersedes #75558. Thanks @ziptbm and @yfge. +- Gateway/tasks: make task registry maintenance use pass-local backing-session lookups and fresh active child-session indexes, avoiding repeated full task snapshots and session-store clones on large stale registries. Fixes #73517 and #75708; supersedes #74406 and #75709. Thanks @Lightningxxl, @glfruit, and @jared-rebel. - Models CLI: restore `openclaw models list --provider ` catalog and registry fallback rows for unconfigured providers, so provider-specific verification commands no longer report "No models found." Fixes #75517; supersedes #75615. Thanks @lotsoftick and @koshaji. - Gateway/macOS: write LaunchAgent services with a canonical system PATH and stop preserving old plist PATH entries, so Volta, asdf, fnm, and pnpm shell paths no longer affect gateway child-process Node resolution. Fixes #75233; supersedes #75246. Thanks @nphyde2. - Slack/hooks: preserve bot alert attachment text in message-received hook content when command text is blank. Fixes #76035; refs #76036. Thanks @amsminn. diff --git a/src/tasks/runtime-internal.ts b/src/tasks/runtime-internal.ts index 0c45fb8ee79..384ff9b6bec 100644 --- a/src/tasks/runtime-internal.ts +++ b/src/tasks/runtime-internal.ts @@ -7,6 +7,7 @@ export { findLatestTaskForFlowId, finalizeTaskRunByRunId, getTaskById, + hasActiveTaskForChildSessionKey, listTaskRecords, listTasksForFlowId, listTasksForOwnerKey, diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index ffe9e31f969..9996e776898 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -53,6 +53,9 @@ afterEach(() => { function createTaskRegistryMaintenanceHarness(params: { tasks: TaskRecord[]; sessionStore?: Record; + loadSessionStore?: TaskRegistryMaintenanceRuntime["loadSessionStore"]; + resolveStorePath?: TaskRegistryMaintenanceRuntime["resolveStorePath"]; + deriveSessionChatTypeFromKey?: TaskRegistryMaintenanceRuntime["deriveSessionChatTypeFromKey"]; acpEntry?: AcpSessionStoreEntry["entry"]; activeCronJobIds?: string[]; activeRunIds?: string[]; @@ -87,8 +90,11 @@ function createTaskRegistryMaintenanceHarness(params: { entry: undefined, storeReadFailed: false, } satisfies AcpSessionStoreEntry), - loadSessionStore: () => sessionStore, - resolveStorePath: () => "", + loadSessionStore: params.loadSessionStore ?? (() => sessionStore), + resolveStorePath: params.resolveStorePath ?? (() => ""), + ...(params.deriveSessionChatTypeFromKey + ? { deriveSessionChatTypeFromKey: params.deriveSessionChatTypeFromKey } + : {}), isCronJobActive: (jobId: string) => activeCronJobIds.has(jobId), getAgentRunContext: (runId: string) => activeRunIds.has(runId) ? { sessionKey: "main" } : undefined, @@ -101,6 +107,15 @@ function createTaskRegistryMaintenanceHarness(params: { ? { agentId, rest: rest.join(":") } : null; }, + hasActiveTaskForChildSessionKey: ({ sessionKey, excludeTaskId }) => { + const normalized = sessionKey.trim().toLowerCase(); + return Array.from(currentTasks.values()).some( + (task) => + task.taskId !== excludeTaskId && + (task.status === "queued" || task.status === "running") && + task.childSessionKey?.trim().toLowerCase() === normalized, + ); + }, deleteTaskRecordById: (taskId: string) => currentTasks.delete(taskId), ensureTaskRegistryReady: () => {}, getTaskById: (taskId: string) => currentTasks.get(taskId), @@ -162,6 +177,46 @@ function createTaskRegistryMaintenanceHarness(params: { } describe("task-registry maintenance issue #60299", () => { + it("reuses session store reads across stale subagent task checks in one pass", async () => { + const tasks = Array.from({ length: 10 }, (_, index) => + makeStaleTask({ + runtime: "subagent", + taskId: `task-subagent-stale-${index}`, + childSessionKey: `agent:main:subagent:stale-${index}`, + }), + ); + const loadSessionStoreMock = vi.fn(() => ({})); + + createTaskRegistryMaintenanceHarness({ + tasks, + loadSessionStore: loadSessionStoreMock, + resolveStorePath: () => "/tmp/openclaw-test-sessions-main.json", + }); + + expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: tasks.length }); + expect(loadSessionStoreMock).toHaveBeenCalledTimes(1); + }); + + it("reuses CLI channel session type derivation across duplicate stale task checks", async () => { + const childSessionKey = "agent:main:discord:direct:user-1"; + const tasks = Array.from({ length: 10 }, (_, index) => + makeStaleTask({ + runtime: "cli", + taskId: `task-cli-channel-stale-${index}`, + childSessionKey, + }), + ); + const deriveSessionChatTypeMock = vi.fn(() => "direct" as const); + + createTaskRegistryMaintenanceHarness({ + tasks, + deriveSessionChatTypeFromKey: deriveSessionChatTypeMock, + }); + + expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: tasks.length }); + expect(deriveSessionChatTypeMock).toHaveBeenCalledTimes(1); + }); + it("marks stale cron tasks lost once the runtime no longer tracks the job as active", async () => { const childSessionKey = "agent:main:workspace:channel:test-channel"; const task = makeStaleTask({ diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index a5b43b0240b..3fa3edd2fc5 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -24,16 +24,23 @@ import { sweepExpiredPluginStateEntries, } from "../plugin-state/plugin-state-store.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; -import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js"; +import { + deriveSessionChatTypeFromKey, + type SessionKeyChatType, +} from "../sessions/session-chat-type-shared.js"; import { normalizeLowercaseStringOrEmpty, normalizeOptionalString, } from "../shared/string-coerce.js"; -import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js"; +import { + getDetachedTaskLifecycleRuntime, + tryRecoverTaskBeforeMarkLost, +} from "./detached-task-runtime.js"; import { deleteTaskRecordById, ensureTaskRegistryReady, getTaskById, + hasActiveTaskForChildSessionKey, listTaskRecords, markTaskLostById, markTaskTerminalById, @@ -79,9 +86,11 @@ type TaskRegistryMaintenanceRuntime = { unbindSessionBindings?: ReturnType["unbind"]; loadSessionStore: typeof loadSessionStore; resolveStorePath: typeof resolveStorePath; + deriveSessionChatTypeFromKey?: typeof deriveSessionChatTypeFromKey; isCronJobActive: typeof isCronJobActive; getAgentRunContext: typeof getAgentRunContext; parseAgentSessionKey: typeof parseAgentSessionKey; + hasActiveTaskForChildSessionKey: typeof hasActiveTaskForChildSessionKey; deleteTaskRecordById: typeof deleteTaskRecordById; ensureTaskRegistryReady: typeof ensureTaskRegistryReady; getTaskById: typeof getTaskById; @@ -117,9 +126,11 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = { unbindSessionBindings: (input) => getSessionBindingService().unbind(input), loadSessionStore, resolveStorePath, + deriveSessionChatTypeFromKey, isCronJobActive, getAgentRunContext, parseAgentSessionKey, + hasActiveTaskForChildSessionKey, deleteTaskRecordById, ensureTaskRegistryReady, getTaskById, @@ -165,6 +176,16 @@ type CronRecoveryContext = { runLogsByJobId: Map; }; +type SessionStoreLookup = { + store: Record; + normalizedEntries?: Map; +}; + +type BackingSessionLookupContext = { + sessionStoresByPath: Map; + sessionChatTypesByKey: Map; +}; + function createCronRecoveryContext(): CronRecoveryContext { return { storePath: taskRegistryMaintenanceRuntime.resolveCronStorePath(), @@ -172,30 +193,91 @@ function createCronRecoveryContext(): CronRecoveryContext { }; } -function findSessionEntryByKey(store: Record, sessionKey: string): unknown { - const direct = store[sessionKey]; +function createBackingSessionLookupContext(): BackingSessionLookupContext { + return { + sessionStoresByPath: new Map(), + sessionChatTypesByKey: new Map(), + }; +} + +function getSessionStoreLookup( + storePath: string, + context?: BackingSessionLookupContext, +): SessionStoreLookup { + if (!context) { + return { + store: taskRegistryMaintenanceRuntime.loadSessionStore(storePath, { clone: false }), + }; + } + const cached = context.sessionStoresByPath.get(storePath); + if (cached) { + return cached; + } + const lookup = { + store: taskRegistryMaintenanceRuntime.loadSessionStore(storePath, { clone: false }), + }; + context.sessionStoresByPath.set(storePath, lookup); + return lookup; +} + +function getNormalizedSessionEntries(lookup: SessionStoreLookup): Map { + if (lookup.normalizedEntries) { + return lookup.normalizedEntries; + } + const entries = new Map(); + for (const [key, entry] of Object.entries(lookup.store)) { + if (entry) { + entries.set(normalizeLowercaseStringOrEmpty(key), entry); + } + } + lookup.normalizedEntries = entries; + return entries; +} + +function findSessionEntryByKey( + lookup: SessionStoreLookup, + sessionKey: string, +): SessionEntry | undefined { + const direct = lookup.store[sessionKey]; if (direct) { return direct; } const normalized = normalizeLowercaseStringOrEmpty(sessionKey); - for (const [key, entry] of Object.entries(store)) { - if (normalizeLowercaseStringOrEmpty(key) === normalized) { - return entry; - } + if (!normalized) { + return undefined; } - return undefined; + return getNormalizedSessionEntries(lookup).get(normalized); } -function findTaskSessionEntry(task: TaskRecord): SessionEntry | undefined { +function resolveSessionChatType( + sessionKey: string, + context?: BackingSessionLookupContext, +): SessionKeyChatType { + const derive = + taskRegistryMaintenanceRuntime.deriveSessionChatTypeFromKey ?? deriveSessionChatTypeFromKey; + if (!context) { + return derive(sessionKey); + } + const cached = context.sessionChatTypesByKey.get(sessionKey); + if (cached) { + return cached; + } + const chatType = derive(sessionKey); + context.sessionChatTypesByKey.set(sessionKey, chatType); + return chatType; +} + +function findTaskSessionEntry( + task: TaskRecord, + context?: BackingSessionLookupContext, +): SessionEntry | undefined { const childSessionKey = task.childSessionKey?.trim(); if (!childSessionKey) { return undefined; } const agentId = taskRegistryMaintenanceRuntime.parseAgentSessionKey(childSessionKey)?.agentId; const storePath = taskRegistryMaintenanceRuntime.resolveStorePath(undefined, { agentId }); - const store = taskRegistryMaintenanceRuntime.loadSessionStore(storePath); - const entry = findSessionEntryByKey(store, childSessionKey); - return entry && typeof entry === "object" ? (entry as SessionEntry) : undefined; + return findSessionEntryByKey(getSessionStoreLookup(storePath, context), childSessionKey); } function isActiveTask(task: TaskRecord): boolean { @@ -358,7 +440,7 @@ function hasActiveCliRun(task: TaskRecord): boolean { return false; } -function hasBackingSession(task: TaskRecord): boolean { +function hasBackingSession(task: TaskRecord, context?: BackingSessionLookupContext): boolean { if (task.runtime === "cron") { if (!taskRegistryMaintenanceRuntime.isCronRuntimeAuthoritative()) { return true; @@ -386,12 +468,12 @@ function hasBackingSession(task: TaskRecord): boolean { } if (task.runtime === "subagent" || task.runtime === "cli") { if (task.runtime === "cli") { - const chatType = deriveSessionChatTypeFromKey(childSessionKey); + const chatType = resolveSessionChatType(childSessionKey, context); if (chatType === "channel" || chatType === "group" || chatType === "direct") { return false; } } - const entry = findTaskSessionEntry(task); + const entry = findTaskSessionEntry(task, context); if (task.runtime === "subagent" && isSubagentRecoveryWedgedEntry(entry)) { return false; } @@ -401,9 +483,9 @@ function hasBackingSession(task: TaskRecord): boolean { return true; } -function resolveTaskLostError(task: TaskRecord): string { +function resolveTaskLostError(task: TaskRecord, context?: BackingSessionLookupContext): string { if (task.runtime === "subagent") { - const entry = findTaskSessionEntry(task); + const entry = findTaskSessionEntry(task, context); if (entry && isSubagentRecoveryWedgedEntry(entry)) { return formatSubagentRecoveryWedgedReason(entry); } @@ -411,14 +493,35 @@ function resolveTaskLostError(task: TaskRecord): string { return "backing session missing"; } -function shouldMarkLost(task: TaskRecord, now: number): boolean { +function shouldMarkLost( + task: TaskRecord, + now: number, + context?: BackingSessionLookupContext, +): boolean { if (!isActiveTask(task)) { return false; } if (!hasLostGraceExpired(task, now)) { return false; } - return !hasBackingSession(task); + return !hasBackingSession(task, context); +} + +function hasTaskLostDecisionInputChanged(before: TaskRecord, after: TaskRecord): boolean { + return ( + before.status !== after.status || + before.runtime !== after.runtime || + before.childSessionKey !== after.childSessionKey || + before.sourceId !== after.sourceId || + before.runId !== after.runId || + before.createdAt !== after.createdAt || + before.startedAt !== after.startedAt || + before.lastEventAt !== after.lastEventAt + ); +} + +function hasDetachedTaskRecoveryHook(): boolean { + return Boolean(getDetachedTaskLifecycleRuntime().tryRecoverTaskBeforeMarkLost); } function shouldPruneTerminalTask(task: TaskRecord, now: number): boolean { @@ -445,31 +548,6 @@ function getNormalizedTaskChildSessionKey(task: TaskRecord): string | undefined return normalizeOptionalString(task.childSessionKey); } -function isSameTaskChildSession(a: TaskRecord, b: TaskRecord): boolean { - const left = getNormalizedTaskChildSessionKey(a); - return Boolean(left && left === getNormalizedTaskChildSessionKey(b)); -} - -function hasActiveTaskForChildSession(task: TaskRecord, tasks: TaskRecord[]): boolean { - return tasks.some( - (candidate) => - candidate.taskId !== task.taskId && - isActiveTask(candidate) && - isSameTaskChildSession(task, candidate), - ); -} - -function hasActiveTaskForChildSessionKey(sessionKey: string, tasks: TaskRecord[]): boolean { - const normalized = normalizeOptionalString(sessionKey); - if (!normalized) { - return false; - } - return tasks.some( - (candidate) => - isActiveTask(candidate) && getNormalizedTaskChildSessionKey(candidate) === normalized, - ); -} - function getAcpSessionParentKeys(acpEntry: Pick): string[] { return [ normalizeOptionalString(acpEntry.entry?.spawnedBy), @@ -507,12 +585,18 @@ function hasActiveSessionBinding(sessionKey: string): boolean { } } -function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): boolean { +function shouldCloseTerminalAcpSession(task: TaskRecord): boolean { if (task.runtime !== "acp" || isActiveTask(task)) { return false; } const sessionKey = getNormalizedTaskChildSessionKey(task); - if (!sessionKey || hasActiveTaskForChildSession(task, tasks)) { + if ( + !sessionKey || + taskRegistryMaintenanceRuntime.hasActiveTaskForChildSessionKey({ + sessionKey, + excludeTaskId: task.taskId, + }) + ) { return false; } const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey }); @@ -528,15 +612,15 @@ function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): b return !hasActiveSessionBinding(sessionKey); } -function shouldCloseOrphanedParentOwnedAcpSession( - acpEntry: AcpSessionStoreEntry, - tasks: TaskRecord[], -): boolean { +function shouldCloseOrphanedParentOwnedAcpSession(acpEntry: AcpSessionStoreEntry): boolean { if (!acpEntry.entry || !acpEntry.acp || !isParentOwnedAcpSessionEntry(acpEntry)) { return false; } const sessionKey = normalizeOptionalString(acpEntry.sessionKey); - if (!sessionKey || hasActiveTaskForChildSessionKey(sessionKey, tasks)) { + if ( + !sessionKey || + taskRegistryMaintenanceRuntime.hasActiveTaskForChildSessionKey({ sessionKey }) + ) { return false; } if (acpEntry.acp.mode === "oneshot") { @@ -545,8 +629,8 @@ function shouldCloseOrphanedParentOwnedAcpSession( return !hasActiveSessionBinding(sessionKey); } -async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): Promise { - if (!shouldCloseTerminalAcpSession(task, tasks)) { +async function cleanupTerminalAcpSession(task: TaskRecord): Promise { + if (!shouldCloseTerminalAcpSession(task)) { return; } const sessionKey = getNormalizedTaskChildSessionKey(task); @@ -586,7 +670,7 @@ async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): } } -async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promise { +async function cleanupOrphanedParentOwnedAcpSessions(): Promise { let acpSessions: AcpSessionStoreEntry[]; try { acpSessions = await taskRegistryMaintenanceRuntime.listAcpSessionEntries({}); @@ -601,7 +685,7 @@ async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promi continue; } seenSessionKeys.add(sessionKey); - if (!shouldCloseOrphanedParentOwnedAcpSession(acpEntry, tasks)) { + if (!shouldCloseOrphanedParentOwnedAcpSession(acpEntry)) { continue; } const closeAcpSession = taskRegistryMaintenanceRuntime.closeAcpSession; @@ -635,14 +719,19 @@ async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promi } } -function markTaskLost(task: TaskRecord, now: number): TaskRecord { - const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter; +function markTaskLost( + task: TaskRecord, + now: number, + context?: BackingSessionLookupContext, +): TaskRecord { + const cleanupAfter = + task.cleanupAfter ?? resolveCleanupAfter({ ...task, endedAt: task.endedAt ?? now }); const updated = taskRegistryMaintenanceRuntime.markTaskLostById({ taskId: task.taskId, endedAt: task.endedAt ?? now, lastEventAt: now, - error: task.error ?? resolveTaskLostError(task), + error: task.error ?? resolveTaskLostError(task, context), cleanupAfter, }) ?? task; void taskRegistryMaintenanceRuntime.maybeDeliverTaskTerminalUpdate(updated.taskId); @@ -684,13 +773,17 @@ function projectTaskRecovered(task: TaskRecord, recovery: CronTerminalRecovery): }; } -function projectTaskLost(task: TaskRecord, now: number): TaskRecord { +function projectTaskLost( + task: TaskRecord, + now: number, + context?: BackingSessionLookupContext, +): TaskRecord { const projected: TaskRecord = { ...task, status: "lost", endedAt: task.endedAt ?? now, lastEventAt: now, - error: task.error ?? resolveTaskLostError(task), + error: task.error ?? resolveTaskLostError(task, context), }; return { ...projected, @@ -700,27 +793,46 @@ function projectTaskLost(task: TaskRecord, now: number): TaskRecord { }; } -export function reconcileTaskRecordForOperatorInspection( +function reconcileTaskRecordForOperatorInspectionWithContexts( task: TaskRecord, - context: CronRecoveryContext = createCronRecoveryContext(), + context: CronRecoveryContext, + backingSessionContext: BackingSessionLookupContext, ): TaskRecord { const cronRecovery = resolveDurableCronTaskRecovery(task, context); if (cronRecovery) { return projectTaskRecovered(task, cronRecovery); } const now = Date.now(); - if (!shouldMarkLost(task, now)) { + if (!shouldMarkLost(task, now, backingSessionContext)) { return task; } - return projectTaskLost(task, now); + return projectTaskLost(task, now, backingSessionContext); +} + +export function reconcileTaskRecordForOperatorInspection( + task: TaskRecord, + context: CronRecoveryContext = createCronRecoveryContext(), +): TaskRecord { + return reconcileTaskRecordForOperatorInspectionWithContexts( + task, + context, + createBackingSessionLookupContext(), + ); } export function reconcileInspectableTasks(): TaskRecord[] { taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); const cronRecoveryContext = createCronRecoveryContext(); + const backingSessionContext = createBackingSessionLookupContext(); return taskRegistryMaintenanceRuntime .listTaskRecords() - .map((task) => reconcileTaskRecordForOperatorInspection(task, cronRecoveryContext)); + .map((task) => + reconcileTaskRecordForOperatorInspectionWithContexts( + task, + cronRecoveryContext, + backingSessionContext, + ), + ); } configureTaskAuditTaskProvider(reconcileInspectableTasks); @@ -751,12 +863,13 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary let cleanupStamped = 0; let pruned = 0; const cronRecoveryContext = createCronRecoveryContext(); + const backingSessionContext = createBackingSessionLookupContext(); for (const task of taskRegistryMaintenanceRuntime.listTaskRecords()) { if (resolveDurableCronTaskRecovery(task, cronRecoveryContext)) { recovered += 1; continue; } - if (shouldMarkLost(task, now)) { + if (shouldMarkLost(task, now, backingSessionContext)) { reconciled += 1; continue; } @@ -800,6 +913,8 @@ export async function runTaskRegistryMaintenance(): Promise ({ function configureTaskRegistryMaintenanceRuntimeForTest(params: { currentTasks: Map>; snapshotTasks: ReturnType[]; + listTaskRecords?: () => ReturnType[]; acpEntry?: AcpSessionStoreEntry; acpEntries?: AcpSessionStoreEntry[]; sessionBindings?: SessionBindingRecord[]; @@ -123,10 +124,19 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: { parseAgentSessionKey: () => null as ParsedAgentSessionKey | null, isCronJobActive: () => false, getAgentRunContext: () => undefined, + hasActiveTaskForChildSessionKey: ({ sessionKey, excludeTaskId }) => { + const normalized = sessionKey.trim().toLowerCase(); + return Array.from(params.currentTasks.values()).some( + (task) => + task.taskId !== excludeTaskId && + (task.status === "queued" || task.status === "running") && + task.childSessionKey?.trim().toLowerCase() === normalized, + ); + }, deleteTaskRecordById: (taskId: string) => params.currentTasks.delete(taskId), ensureTaskRegistryReady: () => {}, getTaskById: (taskId: string) => params.currentTasks.get(taskId), - listTaskRecords: () => params.snapshotTasks, + listTaskRecords: params.listTaskRecords ?? (() => params.snapshotTasks), markTaskLostById: (patch: { taskId: string; endedAt: number; @@ -1628,6 +1638,103 @@ describe("task-registry", () => { }); }); + it("does not relist task records for each terminal ACP cleanup check", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const now = Date.now(); + const tasks = Array.from({ length: 20 }, (_, index) => { + const task = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + scopeKind: "session", + childSessionKey: `agent:claude:acp:terminal-${index}`, + runId: `run-terminal-acp-snapshot-${index}`, + task: `Terminal ACP task ${index}`, + status: "succeeded", + deliveryStatus: "delivered", + }); + return { + ...task, + endedAt: now - 60_000, + lastEventAt: now - 60_000, + }; + }); + const currentTasks = new Map(tasks.map((task) => [task.taskId, task])); + let listCalls = 0; + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks, + snapshotTasks: tasks, + listTaskRecords: () => { + listCalls += 1; + return tasks; + }, + }); + + await runTaskRegistryMaintenance(); + + expect(listCalls).toBe(1); + }); + }); + + it("keeps terminal ACP cleanup from closing a child session with fresh active work", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const now = Date.now(); + const parentSessionKey = "agent:main:telegram:direct:owner"; + const childSessionKey = "agent:claude:acp:shared-child"; + const terminal = createTaskRecord({ + runtime: "acp", + ownerKey: parentSessionKey, + requesterSessionKey: parentSessionKey, + scopeKind: "session", + childSessionKey, + runId: "run-terminal-acp-shared", + task: "Old ACP task", + status: "succeeded", + deliveryStatus: "delivered", + }); + const terminalCurrent = { + ...terminal, + endedAt: now - 60_000, + lastEventAt: now - 60_000, + }; + const active = createTaskRecord({ + runtime: "acp", + ownerKey: parentSessionKey, + requesterSessionKey: parentSessionKey, + scopeKind: "session", + childSessionKey, + runId: "run-active-acp-shared", + task: "Current ACP task", + status: "running", + deliveryStatus: "pending", + }); + const closeAcpSession = vi.fn().mockResolvedValue(undefined); + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks: new Map([ + [terminal.taskId, terminalCurrent], + [active.taskId, active], + ]), + snapshotTasks: [terminalCurrent], + acpEntry: createAcpSessionStoreEntry({ + sessionKey: childSessionKey, + parentSessionKey, + mode: "oneshot", + }), + closeAcpSession, + }); + + await runTaskRegistryMaintenance(); + + expect(closeAcpSession).not.toHaveBeenCalled(); + }); + }); + it("closes stale terminal persistent ACP sessions only when no binding remains", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -1977,6 +2084,7 @@ describe("task-registry", () => { parseAgentSessionKey: () => null, isCronJobActive: () => false, getAgentRunContext: () => undefined, + hasActiveTaskForChildSessionKey: () => false, deleteTaskRecordById: () => false, ensureTaskRegistryReady: () => {}, getTaskById: () => undefined, diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 45de0684c81..e359f5b6098 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -1945,6 +1945,35 @@ export function listTaskRecords(): TaskRecord[] { .map(({ insertionIndex: _, ...task }) => task); } +export function hasActiveTaskForChildSessionKey(params: { + sessionKey: string; + excludeTaskId?: string; +}): boolean { + ensureTaskRegistryReady(); + const sessionKey = normalizeOptionalString(params.sessionKey); + if (!sessionKey) { + return false; + } + const ids = taskIdsByRelatedSessionKey.get(sessionKey); + if (!ids) { + return false; + } + for (const taskId of ids) { + if (taskId === params.excludeTaskId) { + continue; + } + const task = tasks.get(taskId); + if ( + task && + isActiveTaskStatus(task.status) && + normalizeOptionalString(task.childSessionKey) === sessionKey + ) { + return true; + } + } + return false; +} + export function getTaskRegistrySummary(): TaskRegistrySummary { ensureTaskRegistryReady(); return summarizeTaskRecords(tasks.values());