fix(tasks): speed up registry maintenance

Co-authored-by: Lightningxxl <yuanhangxurobin@gmail.com>
Co-authored-by: Gorin Lee <glfruit80@gmail.com>
This commit is contained in:
Peter Steinberger
2026-05-02 13:01:37 +01:00
parent a254fdaf42
commit f46c699887
6 changed files with 397 additions and 75 deletions

View File

@@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai
- Telegram/native commands: pass persisted session files into plugin commands for topic-bound sessions, so `/codex bind` works from Telegram forum topics. Refs #75845 and #76049. Thanks @MatthewSchleder.
- Telegram: honor runtime conversation bindings for native slash commands in bound top-level groups, so commands like `/status@bot` route to the active non-`main` session instead of falling back to the default route. Fixes #75405; supersedes #75558. Thanks @ziptbm and @yfge.
- Gateway/tasks: make task registry maintenance use pass-local backing-session lookups and fresh active child-session indexes, avoiding repeated full task snapshots and session-store clones on large stale registries. Fixes #73517 and #75708; supersedes #74406 and #75709. Thanks @Lightningxxl, @glfruit, and @jared-rebel.
- Models CLI: restore `openclaw models list --provider <id>` catalog and registry fallback rows for unconfigured providers, so provider-specific verification commands no longer report "No models found." Fixes #75517; supersedes #75615. Thanks @lotsoftick and @koshaji.
- Gateway/macOS: write LaunchAgent services with a canonical system PATH and stop preserving old plist PATH entries, so Volta, asdf, fnm, and pnpm shell paths no longer affect gateway child-process Node resolution. Fixes #75233; supersedes #75246. Thanks @nphyde2.
- Slack/hooks: preserve bot alert attachment text in message-received hook content when command text is blank. Fixes #76035; refs #76036. Thanks @amsminn.

View File

@@ -7,6 +7,7 @@ export {
findLatestTaskForFlowId,
finalizeTaskRunByRunId,
getTaskById,
hasActiveTaskForChildSessionKey,
listTaskRecords,
listTasksForFlowId,
listTasksForOwnerKey,

View File

@@ -53,6 +53,9 @@ afterEach(() => {
function createTaskRegistryMaintenanceHarness(params: {
tasks: TaskRecord[];
sessionStore?: Record<string, SessionEntry>;
loadSessionStore?: TaskRegistryMaintenanceRuntime["loadSessionStore"];
resolveStorePath?: TaskRegistryMaintenanceRuntime["resolveStorePath"];
deriveSessionChatTypeFromKey?: TaskRegistryMaintenanceRuntime["deriveSessionChatTypeFromKey"];
acpEntry?: AcpSessionStoreEntry["entry"];
activeCronJobIds?: string[];
activeRunIds?: string[];
@@ -87,8 +90,11 @@ function createTaskRegistryMaintenanceHarness(params: {
entry: undefined,
storeReadFailed: false,
} satisfies AcpSessionStoreEntry),
loadSessionStore: () => sessionStore,
resolveStorePath: () => "",
loadSessionStore: params.loadSessionStore ?? (() => sessionStore),
resolveStorePath: params.resolveStorePath ?? (() => ""),
...(params.deriveSessionChatTypeFromKey
? { deriveSessionChatTypeFromKey: params.deriveSessionChatTypeFromKey }
: {}),
isCronJobActive: (jobId: string) => activeCronJobIds.has(jobId),
getAgentRunContext: (runId: string) =>
activeRunIds.has(runId) ? { sessionKey: "main" } : undefined,
@@ -101,6 +107,15 @@ function createTaskRegistryMaintenanceHarness(params: {
? { agentId, rest: rest.join(":") }
: null;
},
hasActiveTaskForChildSessionKey: ({ sessionKey, excludeTaskId }) => {
const normalized = sessionKey.trim().toLowerCase();
return Array.from(currentTasks.values()).some(
(task) =>
task.taskId !== excludeTaskId &&
(task.status === "queued" || task.status === "running") &&
task.childSessionKey?.trim().toLowerCase() === normalized,
);
},
deleteTaskRecordById: (taskId: string) => currentTasks.delete(taskId),
ensureTaskRegistryReady: () => {},
getTaskById: (taskId: string) => currentTasks.get(taskId),
@@ -162,6 +177,46 @@ function createTaskRegistryMaintenanceHarness(params: {
}
describe("task-registry maintenance issue #60299", () => {
it("reuses session store reads across stale subagent task checks in one pass", async () => {
const tasks = Array.from({ length: 10 }, (_, index) =>
makeStaleTask({
runtime: "subagent",
taskId: `task-subagent-stale-${index}`,
childSessionKey: `agent:main:subagent:stale-${index}`,
}),
);
const loadSessionStoreMock = vi.fn(() => ({}));
createTaskRegistryMaintenanceHarness({
tasks,
loadSessionStore: loadSessionStoreMock,
resolveStorePath: () => "/tmp/openclaw-test-sessions-main.json",
});
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: tasks.length });
expect(loadSessionStoreMock).toHaveBeenCalledTimes(1);
});
it("reuses CLI channel session type derivation across duplicate stale task checks", async () => {
const childSessionKey = "agent:main:discord:direct:user-1";
const tasks = Array.from({ length: 10 }, (_, index) =>
makeStaleTask({
runtime: "cli",
taskId: `task-cli-channel-stale-${index}`,
childSessionKey,
}),
);
const deriveSessionChatTypeMock = vi.fn(() => "direct" as const);
createTaskRegistryMaintenanceHarness({
tasks,
deriveSessionChatTypeFromKey: deriveSessionChatTypeMock,
});
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: tasks.length });
expect(deriveSessionChatTypeMock).toHaveBeenCalledTimes(1);
});
it("marks stale cron tasks lost once the runtime no longer tracks the job as active", async () => {
const childSessionKey = "agent:main:workspace:channel:test-channel";
const task = makeStaleTask({

View File

@@ -24,16 +24,23 @@ import {
sweepExpiredPluginStateEntries,
} from "../plugin-state/plugin-state-store.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js";
import {
deriveSessionChatTypeFromKey,
type SessionKeyChatType,
} from "../sessions/session-chat-type-shared.js";
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
} from "../shared/string-coerce.js";
import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js";
import {
getDetachedTaskLifecycleRuntime,
tryRecoverTaskBeforeMarkLost,
} from "./detached-task-runtime.js";
import {
deleteTaskRecordById,
ensureTaskRegistryReady,
getTaskById,
hasActiveTaskForChildSessionKey,
listTaskRecords,
markTaskLostById,
markTaskTerminalById,
@@ -79,9 +86,11 @@ type TaskRegistryMaintenanceRuntime = {
unbindSessionBindings?: ReturnType<typeof getSessionBindingService>["unbind"];
loadSessionStore: typeof loadSessionStore;
resolveStorePath: typeof resolveStorePath;
deriveSessionChatTypeFromKey?: typeof deriveSessionChatTypeFromKey;
isCronJobActive: typeof isCronJobActive;
getAgentRunContext: typeof getAgentRunContext;
parseAgentSessionKey: typeof parseAgentSessionKey;
hasActiveTaskForChildSessionKey: typeof hasActiveTaskForChildSessionKey;
deleteTaskRecordById: typeof deleteTaskRecordById;
ensureTaskRegistryReady: typeof ensureTaskRegistryReady;
getTaskById: typeof getTaskById;
@@ -117,9 +126,11 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
unbindSessionBindings: (input) => getSessionBindingService().unbind(input),
loadSessionStore,
resolveStorePath,
deriveSessionChatTypeFromKey,
isCronJobActive,
getAgentRunContext,
parseAgentSessionKey,
hasActiveTaskForChildSessionKey,
deleteTaskRecordById,
ensureTaskRegistryReady,
getTaskById,
@@ -165,6 +176,16 @@ type CronRecoveryContext = {
runLogsByJobId: Map<string, CronRunLogEntry[]>;
};
type SessionStoreLookup = {
store: Record<string, SessionEntry>;
normalizedEntries?: Map<string, SessionEntry>;
};
type BackingSessionLookupContext = {
sessionStoresByPath: Map<string, SessionStoreLookup>;
sessionChatTypesByKey: Map<string, SessionKeyChatType>;
};
function createCronRecoveryContext(): CronRecoveryContext {
return {
storePath: taskRegistryMaintenanceRuntime.resolveCronStorePath(),
@@ -172,30 +193,91 @@ function createCronRecoveryContext(): CronRecoveryContext {
};
}
function findSessionEntryByKey(store: Record<string, unknown>, sessionKey: string): unknown {
const direct = store[sessionKey];
function createBackingSessionLookupContext(): BackingSessionLookupContext {
return {
sessionStoresByPath: new Map<string, SessionStoreLookup>(),
sessionChatTypesByKey: new Map<string, SessionKeyChatType>(),
};
}
function getSessionStoreLookup(
storePath: string,
context?: BackingSessionLookupContext,
): SessionStoreLookup {
if (!context) {
return {
store: taskRegistryMaintenanceRuntime.loadSessionStore(storePath, { clone: false }),
};
}
const cached = context.sessionStoresByPath.get(storePath);
if (cached) {
return cached;
}
const lookup = {
store: taskRegistryMaintenanceRuntime.loadSessionStore(storePath, { clone: false }),
};
context.sessionStoresByPath.set(storePath, lookup);
return lookup;
}
function getNormalizedSessionEntries(lookup: SessionStoreLookup): Map<string, SessionEntry> {
if (lookup.normalizedEntries) {
return lookup.normalizedEntries;
}
const entries = new Map<string, SessionEntry>();
for (const [key, entry] of Object.entries(lookup.store)) {
if (entry) {
entries.set(normalizeLowercaseStringOrEmpty(key), entry);
}
}
lookup.normalizedEntries = entries;
return entries;
}
function findSessionEntryByKey(
lookup: SessionStoreLookup,
sessionKey: string,
): SessionEntry | undefined {
const direct = lookup.store[sessionKey];
if (direct) {
return direct;
}
const normalized = normalizeLowercaseStringOrEmpty(sessionKey);
for (const [key, entry] of Object.entries(store)) {
if (normalizeLowercaseStringOrEmpty(key) === normalized) {
return entry;
}
if (!normalized) {
return undefined;
}
return undefined;
return getNormalizedSessionEntries(lookup).get(normalized);
}
function findTaskSessionEntry(task: TaskRecord): SessionEntry | undefined {
function resolveSessionChatType(
sessionKey: string,
context?: BackingSessionLookupContext,
): SessionKeyChatType {
const derive =
taskRegistryMaintenanceRuntime.deriveSessionChatTypeFromKey ?? deriveSessionChatTypeFromKey;
if (!context) {
return derive(sessionKey);
}
const cached = context.sessionChatTypesByKey.get(sessionKey);
if (cached) {
return cached;
}
const chatType = derive(sessionKey);
context.sessionChatTypesByKey.set(sessionKey, chatType);
return chatType;
}
function findTaskSessionEntry(
task: TaskRecord,
context?: BackingSessionLookupContext,
): SessionEntry | undefined {
const childSessionKey = task.childSessionKey?.trim();
if (!childSessionKey) {
return undefined;
}
const agentId = taskRegistryMaintenanceRuntime.parseAgentSessionKey(childSessionKey)?.agentId;
const storePath = taskRegistryMaintenanceRuntime.resolveStorePath(undefined, { agentId });
const store = taskRegistryMaintenanceRuntime.loadSessionStore(storePath);
const entry = findSessionEntryByKey(store, childSessionKey);
return entry && typeof entry === "object" ? (entry as SessionEntry) : undefined;
return findSessionEntryByKey(getSessionStoreLookup(storePath, context), childSessionKey);
}
function isActiveTask(task: TaskRecord): boolean {
@@ -358,7 +440,7 @@ function hasActiveCliRun(task: TaskRecord): boolean {
return false;
}
function hasBackingSession(task: TaskRecord): boolean {
function hasBackingSession(task: TaskRecord, context?: BackingSessionLookupContext): boolean {
if (task.runtime === "cron") {
if (!taskRegistryMaintenanceRuntime.isCronRuntimeAuthoritative()) {
return true;
@@ -386,12 +468,12 @@ function hasBackingSession(task: TaskRecord): boolean {
}
if (task.runtime === "subagent" || task.runtime === "cli") {
if (task.runtime === "cli") {
const chatType = deriveSessionChatTypeFromKey(childSessionKey);
const chatType = resolveSessionChatType(childSessionKey, context);
if (chatType === "channel" || chatType === "group" || chatType === "direct") {
return false;
}
}
const entry = findTaskSessionEntry(task);
const entry = findTaskSessionEntry(task, context);
if (task.runtime === "subagent" && isSubagentRecoveryWedgedEntry(entry)) {
return false;
}
@@ -401,9 +483,9 @@ function hasBackingSession(task: TaskRecord): boolean {
return true;
}
function resolveTaskLostError(task: TaskRecord): string {
function resolveTaskLostError(task: TaskRecord, context?: BackingSessionLookupContext): string {
if (task.runtime === "subagent") {
const entry = findTaskSessionEntry(task);
const entry = findTaskSessionEntry(task, context);
if (entry && isSubagentRecoveryWedgedEntry(entry)) {
return formatSubagentRecoveryWedgedReason(entry);
}
@@ -411,14 +493,35 @@ function resolveTaskLostError(task: TaskRecord): string {
return "backing session missing";
}
function shouldMarkLost(task: TaskRecord, now: number): boolean {
function shouldMarkLost(
task: TaskRecord,
now: number,
context?: BackingSessionLookupContext,
): boolean {
if (!isActiveTask(task)) {
return false;
}
if (!hasLostGraceExpired(task, now)) {
return false;
}
return !hasBackingSession(task);
return !hasBackingSession(task, context);
}
function hasTaskLostDecisionInputChanged(before: TaskRecord, after: TaskRecord): boolean {
return (
before.status !== after.status ||
before.runtime !== after.runtime ||
before.childSessionKey !== after.childSessionKey ||
before.sourceId !== after.sourceId ||
before.runId !== after.runId ||
before.createdAt !== after.createdAt ||
before.startedAt !== after.startedAt ||
before.lastEventAt !== after.lastEventAt
);
}
function hasDetachedTaskRecoveryHook(): boolean {
return Boolean(getDetachedTaskLifecycleRuntime().tryRecoverTaskBeforeMarkLost);
}
function shouldPruneTerminalTask(task: TaskRecord, now: number): boolean {
@@ -445,31 +548,6 @@ function getNormalizedTaskChildSessionKey(task: TaskRecord): string | undefined
return normalizeOptionalString(task.childSessionKey);
}
function isSameTaskChildSession(a: TaskRecord, b: TaskRecord): boolean {
const left = getNormalizedTaskChildSessionKey(a);
return Boolean(left && left === getNormalizedTaskChildSessionKey(b));
}
function hasActiveTaskForChildSession(task: TaskRecord, tasks: TaskRecord[]): boolean {
return tasks.some(
(candidate) =>
candidate.taskId !== task.taskId &&
isActiveTask(candidate) &&
isSameTaskChildSession(task, candidate),
);
}
function hasActiveTaskForChildSessionKey(sessionKey: string, tasks: TaskRecord[]): boolean {
const normalized = normalizeOptionalString(sessionKey);
if (!normalized) {
return false;
}
return tasks.some(
(candidate) =>
isActiveTask(candidate) && getNormalizedTaskChildSessionKey(candidate) === normalized,
);
}
function getAcpSessionParentKeys(acpEntry: Pick<AcpSessionStoreEntry, "entry">): string[] {
return [
normalizeOptionalString(acpEntry.entry?.spawnedBy),
@@ -507,12 +585,18 @@ function hasActiveSessionBinding(sessionKey: string): boolean {
}
}
function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): boolean {
function shouldCloseTerminalAcpSession(task: TaskRecord): boolean {
if (task.runtime !== "acp" || isActiveTask(task)) {
return false;
}
const sessionKey = getNormalizedTaskChildSessionKey(task);
if (!sessionKey || hasActiveTaskForChildSession(task, tasks)) {
if (
!sessionKey ||
taskRegistryMaintenanceRuntime.hasActiveTaskForChildSessionKey({
sessionKey,
excludeTaskId: task.taskId,
})
) {
return false;
}
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey });
@@ -528,15 +612,15 @@ function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): b
return !hasActiveSessionBinding(sessionKey);
}
function shouldCloseOrphanedParentOwnedAcpSession(
acpEntry: AcpSessionStoreEntry,
tasks: TaskRecord[],
): boolean {
function shouldCloseOrphanedParentOwnedAcpSession(acpEntry: AcpSessionStoreEntry): boolean {
if (!acpEntry.entry || !acpEntry.acp || !isParentOwnedAcpSessionEntry(acpEntry)) {
return false;
}
const sessionKey = normalizeOptionalString(acpEntry.sessionKey);
if (!sessionKey || hasActiveTaskForChildSessionKey(sessionKey, tasks)) {
if (
!sessionKey ||
taskRegistryMaintenanceRuntime.hasActiveTaskForChildSessionKey({ sessionKey })
) {
return false;
}
if (acpEntry.acp.mode === "oneshot") {
@@ -545,8 +629,8 @@ function shouldCloseOrphanedParentOwnedAcpSession(
return !hasActiveSessionBinding(sessionKey);
}
async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): Promise<void> {
if (!shouldCloseTerminalAcpSession(task, tasks)) {
async function cleanupTerminalAcpSession(task: TaskRecord): Promise<void> {
if (!shouldCloseTerminalAcpSession(task)) {
return;
}
const sessionKey = getNormalizedTaskChildSessionKey(task);
@@ -586,7 +670,7 @@ async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]):
}
}
async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promise<void> {
async function cleanupOrphanedParentOwnedAcpSessions(): Promise<void> {
let acpSessions: AcpSessionStoreEntry[];
try {
acpSessions = await taskRegistryMaintenanceRuntime.listAcpSessionEntries({});
@@ -601,7 +685,7 @@ async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promi
continue;
}
seenSessionKeys.add(sessionKey);
if (!shouldCloseOrphanedParentOwnedAcpSession(acpEntry, tasks)) {
if (!shouldCloseOrphanedParentOwnedAcpSession(acpEntry)) {
continue;
}
const closeAcpSession = taskRegistryMaintenanceRuntime.closeAcpSession;
@@ -635,14 +719,19 @@ async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promi
}
}
function markTaskLost(task: TaskRecord, now: number): TaskRecord {
const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter;
function markTaskLost(
task: TaskRecord,
now: number,
context?: BackingSessionLookupContext,
): TaskRecord {
const cleanupAfter =
task.cleanupAfter ?? resolveCleanupAfter({ ...task, endedAt: task.endedAt ?? now });
const updated =
taskRegistryMaintenanceRuntime.markTaskLostById({
taskId: task.taskId,
endedAt: task.endedAt ?? now,
lastEventAt: now,
error: task.error ?? resolveTaskLostError(task),
error: task.error ?? resolveTaskLostError(task, context),
cleanupAfter,
}) ?? task;
void taskRegistryMaintenanceRuntime.maybeDeliverTaskTerminalUpdate(updated.taskId);
@@ -684,13 +773,17 @@ function projectTaskRecovered(task: TaskRecord, recovery: CronTerminalRecovery):
};
}
function projectTaskLost(task: TaskRecord, now: number): TaskRecord {
function projectTaskLost(
task: TaskRecord,
now: number,
context?: BackingSessionLookupContext,
): TaskRecord {
const projected: TaskRecord = {
...task,
status: "lost",
endedAt: task.endedAt ?? now,
lastEventAt: now,
error: task.error ?? resolveTaskLostError(task),
error: task.error ?? resolveTaskLostError(task, context),
};
return {
...projected,
@@ -700,27 +793,46 @@ function projectTaskLost(task: TaskRecord, now: number): TaskRecord {
};
}
export function reconcileTaskRecordForOperatorInspection(
function reconcileTaskRecordForOperatorInspectionWithContexts(
task: TaskRecord,
context: CronRecoveryContext = createCronRecoveryContext(),
context: CronRecoveryContext,
backingSessionContext: BackingSessionLookupContext,
): TaskRecord {
const cronRecovery = resolveDurableCronTaskRecovery(task, context);
if (cronRecovery) {
return projectTaskRecovered(task, cronRecovery);
}
const now = Date.now();
if (!shouldMarkLost(task, now)) {
if (!shouldMarkLost(task, now, backingSessionContext)) {
return task;
}
return projectTaskLost(task, now);
return projectTaskLost(task, now, backingSessionContext);
}
export function reconcileTaskRecordForOperatorInspection(
task: TaskRecord,
context: CronRecoveryContext = createCronRecoveryContext(),
): TaskRecord {
return reconcileTaskRecordForOperatorInspectionWithContexts(
task,
context,
createBackingSessionLookupContext(),
);
}
export function reconcileInspectableTasks(): TaskRecord[] {
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
const cronRecoveryContext = createCronRecoveryContext();
const backingSessionContext = createBackingSessionLookupContext();
return taskRegistryMaintenanceRuntime
.listTaskRecords()
.map((task) => reconcileTaskRecordForOperatorInspection(task, cronRecoveryContext));
.map((task) =>
reconcileTaskRecordForOperatorInspectionWithContexts(
task,
cronRecoveryContext,
backingSessionContext,
),
);
}
configureTaskAuditTaskProvider(reconcileInspectableTasks);
@@ -751,12 +863,13 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary
let cleanupStamped = 0;
let pruned = 0;
const cronRecoveryContext = createCronRecoveryContext();
const backingSessionContext = createBackingSessionLookupContext();
for (const task of taskRegistryMaintenanceRuntime.listTaskRecords()) {
if (resolveDurableCronTaskRecovery(task, cronRecoveryContext)) {
recovered += 1;
continue;
}
if (shouldMarkLost(task, now)) {
if (shouldMarkLost(task, now, backingSessionContext)) {
reconciled += 1;
continue;
}
@@ -800,6 +913,8 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
let pruned = 0;
const tasks = taskRegistryMaintenanceRuntime.listTaskRecords();
const cronRecoveryContext = createCronRecoveryContext();
const backingSessionContext = createBackingSessionLookupContext();
const recoveryHookRegistered = hasDetachedTaskRecoveryHook();
let processed = 0;
for (const task of tasks) {
const current = taskRegistryMaintenanceRuntime.getTaskById(task.taskId);
@@ -818,7 +933,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
}
continue;
}
if (shouldMarkLost(current, now)) {
if (shouldMarkLost(current, now, backingSessionContext)) {
const recovery = await tryRecoverTaskBeforeMarkLost({
taskId: current.taskId,
runtime: current.runtime,
@@ -826,13 +941,26 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
now,
});
const freshAfterHook = taskRegistryMaintenanceRuntime.getTaskById(current.taskId);
if (!freshAfterHook || !shouldMarkLost(freshAfterHook, now)) {
if (!freshAfterHook) {
processed += 1;
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
await yieldToEventLoop();
}
continue;
}
const shouldRecheckFreshTask =
recoveryHookRegistered || hasTaskLostDecisionInputChanged(current, freshAfterHook);
let lostContext = backingSessionContext;
if (shouldRecheckFreshTask) {
lostContext = createBackingSessionLookupContext();
if (!shouldMarkLost(freshAfterHook, now, lostContext)) {
processed += 1;
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
await yieldToEventLoop();
}
continue;
}
}
if (recovery.recovered) {
recovered += 1;
processed += 1;
@@ -841,7 +969,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
}
continue;
}
const next = markTaskLost(freshAfterHook, now);
const next = markTaskLost(freshAfterHook, now, lostContext);
if (next.status === "lost") {
reconciled += 1;
}
@@ -851,7 +979,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
}
continue;
}
await cleanupTerminalAcpSession(current, taskRegistryMaintenanceRuntime.listTaskRecords());
await cleanupTerminalAcpSession(current);
if (
shouldPruneTerminalTask(current, now) &&
taskRegistryMaintenanceRuntime.deleteTaskRecordById(current.taskId)
@@ -877,7 +1005,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
await yieldToEventLoop();
}
}
await cleanupOrphanedParentOwnedAcpSessions(taskRegistryMaintenanceRuntime.listTaskRecords());
await cleanupOrphanedParentOwnedAcpSessions();
if (isPluginStateDatabaseOpen()) {
try {
sweepExpiredPluginStateEntries();

View File

@@ -90,6 +90,7 @@ vi.mock("../utils/message-channel.js", () => ({
function configureTaskRegistryMaintenanceRuntimeForTest(params: {
currentTasks: Map<string, ReturnType<typeof createTaskRecord>>;
snapshotTasks: ReturnType<typeof createTaskRecord>[];
listTaskRecords?: () => ReturnType<typeof createTaskRecord>[];
acpEntry?: AcpSessionStoreEntry;
acpEntries?: AcpSessionStoreEntry[];
sessionBindings?: SessionBindingRecord[];
@@ -123,10 +124,19 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
parseAgentSessionKey: () => null as ParsedAgentSessionKey | null,
isCronJobActive: () => false,
getAgentRunContext: () => undefined,
hasActiveTaskForChildSessionKey: ({ sessionKey, excludeTaskId }) => {
const normalized = sessionKey.trim().toLowerCase();
return Array.from(params.currentTasks.values()).some(
(task) =>
task.taskId !== excludeTaskId &&
(task.status === "queued" || task.status === "running") &&
task.childSessionKey?.trim().toLowerCase() === normalized,
);
},
deleteTaskRecordById: (taskId: string) => params.currentTasks.delete(taskId),
ensureTaskRegistryReady: () => {},
getTaskById: (taskId: string) => params.currentTasks.get(taskId),
listTaskRecords: () => params.snapshotTasks,
listTaskRecords: params.listTaskRecords ?? (() => params.snapshotTasks),
markTaskLostById: (patch: {
taskId: string;
endedAt: number;
@@ -1628,6 +1638,103 @@ describe("task-registry", () => {
});
});
it("does not relist task records for each terminal ACP cleanup check", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const tasks = Array.from({ length: 20 }, (_, index) => {
const task = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
requesterSessionKey: "agent:main:main",
scopeKind: "session",
childSessionKey: `agent:claude:acp:terminal-${index}`,
runId: `run-terminal-acp-snapshot-${index}`,
task: `Terminal ACP task ${index}`,
status: "succeeded",
deliveryStatus: "delivered",
});
return {
...task,
endedAt: now - 60_000,
lastEventAt: now - 60_000,
};
});
const currentTasks = new Map(tasks.map((task) => [task.taskId, task]));
let listCalls = 0;
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks,
snapshotTasks: tasks,
listTaskRecords: () => {
listCalls += 1;
return tasks;
},
});
await runTaskRegistryMaintenance();
expect(listCalls).toBe(1);
});
});
it("keeps terminal ACP cleanup from closing a child session with fresh active work", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const parentSessionKey = "agent:main:telegram:direct:owner";
const childSessionKey = "agent:claude:acp:shared-child";
const terminal = createTaskRecord({
runtime: "acp",
ownerKey: parentSessionKey,
requesterSessionKey: parentSessionKey,
scopeKind: "session",
childSessionKey,
runId: "run-terminal-acp-shared",
task: "Old ACP task",
status: "succeeded",
deliveryStatus: "delivered",
});
const terminalCurrent = {
...terminal,
endedAt: now - 60_000,
lastEventAt: now - 60_000,
};
const active = createTaskRecord({
runtime: "acp",
ownerKey: parentSessionKey,
requesterSessionKey: parentSessionKey,
scopeKind: "session",
childSessionKey,
runId: "run-active-acp-shared",
task: "Current ACP task",
status: "running",
deliveryStatus: "pending",
});
const closeAcpSession = vi.fn().mockResolvedValue(undefined);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks: new Map([
[terminal.taskId, terminalCurrent],
[active.taskId, active],
]),
snapshotTasks: [terminalCurrent],
acpEntry: createAcpSessionStoreEntry({
sessionKey: childSessionKey,
parentSessionKey,
mode: "oneshot",
}),
closeAcpSession,
});
await runTaskRegistryMaintenance();
expect(closeAcpSession).not.toHaveBeenCalled();
});
});
it("closes stale terminal persistent ACP sessions only when no binding remains", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
@@ -1977,6 +2084,7 @@ describe("task-registry", () => {
parseAgentSessionKey: () => null,
isCronJobActive: () => false,
getAgentRunContext: () => undefined,
hasActiveTaskForChildSessionKey: () => false,
deleteTaskRecordById: () => false,
ensureTaskRegistryReady: () => {},
getTaskById: () => undefined,

View File

@@ -1945,6 +1945,35 @@ export function listTaskRecords(): TaskRecord[] {
.map(({ insertionIndex: _, ...task }) => task);
}
export function hasActiveTaskForChildSessionKey(params: {
sessionKey: string;
excludeTaskId?: string;
}): boolean {
ensureTaskRegistryReady();
const sessionKey = normalizeOptionalString(params.sessionKey);
if (!sessionKey) {
return false;
}
const ids = taskIdsByRelatedSessionKey.get(sessionKey);
if (!ids) {
return false;
}
for (const taskId of ids) {
if (taskId === params.excludeTaskId) {
continue;
}
const task = tasks.get(taskId);
if (
task &&
isActiveTaskStatus(task.status) &&
normalizeOptionalString(task.childSessionKey) === sessionKey
) {
return true;
}
}
return false;
}
export function getTaskRegistrySummary(): TaskRegistrySummary {
ensureTaskRegistryReady();
return summarizeTaskRecords(tasks.values());