mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 19:10:58 +00:00
Run context-engine turn maintenance as idle-aware background work (#65233)
Merged via squash.
Prepared head SHA: e9f6c679ba
Co-authored-by: 100yenadmin <239388517+100yenadmin@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
@@ -4,6 +4,12 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Changes
|
||||
|
||||
### Fixes
|
||||
|
||||
- Agents/context engines: run opt-in turn maintenance as idle-aware background work so the next foreground turn no longer waits on proactive maintenance. (#65233) thanks @100yenadmin
|
||||
|
||||
## 2026.4.12
|
||||
|
||||
### Changes
|
||||
@@ -44,6 +50,7 @@ Docs: https://docs.openclaw.ai
|
||||
- WhatsApp/outbound: fall back to the first `mediaUrls` entry when `mediaUrl` is empty so gateway media sends stop silently dropping attachments that already have a resolved media list. (#64394) Thanks @eric-fr4 and @vincentkoc.
|
||||
- Doctor/Discord: stop `openclaw doctor --fix` from rewriting legacy Discord preview-streaming config into the nested modern shape, so downgrades can still recover without hand-editing `channels.discord.streaming`. (#65035) Thanks @vincentkoc.
|
||||
- Gateway/auth: blank the shipped example gateway credential in `.env.example` and fail startup when a copied placeholder token or password is still configured, so operators cannot accidentally launch with a publicly known secret. (#64586) Thanks @navarrotech and @vincentkoc.
|
||||
|
||||
- Memory/active-memory+dreaming: keep active-memory recall runs on the strongest resolved channel, consume managed dreaming heartbeat events exactly once, stop dreaming from re-ingesting its own narrative transcripts, and add explicit repair/dedupe recovery flows in CLI, doctor, and the Dreams UI.
|
||||
- Agents/queueing: carry orphaned active-turn user text into the next prompt before repairing transcript ordering, so follow-up messages that arrive mid-run are no longer silently dropped. (#65388) Thanks @adminfedres and @vincentkoc.
|
||||
- Gateway/keepalive: stop marking WebSocket tick broadcasts as droppable so slow or backpressured clients do not self-disconnect with `tick timeout` while long-running work is still alive. (#65256) Thanks @100yenadmin and @vincentkoc.
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
fd679707dd78dbf63460876ea137ada61c536d7815ff8f6eb02e4c4b40a765cb plugin-sdk-api-baseline.json
|
||||
bd52b020f75ef21f49b8934bc142a7cf877844791d9dfcda8577281e99a753f2 plugin-sdk-api-baseline.jsonl
|
||||
600f05b14825fa01eb9d63ab6cab5f33c74ff44a48cab5c65457ab08e5b0e91a plugin-sdk-api-baseline.json
|
||||
99d649a86a30756b18b91686f3683e6e829c5e316e1370266ec4fee344bc55cb plugin-sdk-api-baseline.jsonl
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,14 +1,268 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type {
|
||||
ContextEngine,
|
||||
ContextEngineMaintenanceResult,
|
||||
ContextEngineRuntimeContext,
|
||||
} from "../../context-engine/types.js";
|
||||
import { sleepWithAbort } from "../../infra/backoff.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { enqueueCommandInLane, getQueueSize } from "../../process/command-queue.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import {
|
||||
completeTaskRunByRunId,
|
||||
createQueuedTaskRun,
|
||||
failTaskRunByRunId,
|
||||
recordTaskRunProgressByRunId,
|
||||
setDetachedTaskDeliveryStatusByRunId,
|
||||
startTaskRunByRunId,
|
||||
} from "../../tasks/task-executor.js";
|
||||
import {
|
||||
cancelTaskByIdForOwner,
|
||||
findTaskByRunIdForOwner,
|
||||
updateTaskNotifyPolicyForOwner,
|
||||
} from "../../tasks/task-owner-access.js";
|
||||
import { findActiveSessionTask } from "../session-async-task-status.js";
|
||||
import { resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
import {
|
||||
rewriteTranscriptEntriesInSessionFile,
|
||||
rewriteTranscriptEntriesInSessionManager,
|
||||
} from "./transcript-rewrite.js";
|
||||
|
||||
const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance";
|
||||
const TURN_MAINTENANCE_TASK_LABEL = "Context engine turn maintenance";
|
||||
const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after turn.";
|
||||
const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:";
|
||||
const TURN_MAINTENANCE_WAIT_POLL_MS = 100;
|
||||
const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000;
|
||||
const DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY = Symbol.for(
|
||||
"openclaw.contextEngineTurnMaintenanceAbortState",
|
||||
);
|
||||
type DeferredTurnMaintenanceScheduleParams = {
|
||||
contextEngine: ContextEngine;
|
||||
sessionId: string;
|
||||
sessionKey: string;
|
||||
sessionFile: string;
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
};
|
||||
|
||||
type DeferredTurnMaintenanceRunState = {
|
||||
promise: Promise<void>;
|
||||
rerunRequested: boolean;
|
||||
latestParams: DeferredTurnMaintenanceScheduleParams;
|
||||
};
|
||||
|
||||
const activeDeferredTurnMaintenanceRuns = new Map<string, DeferredTurnMaintenanceRunState>();
|
||||
|
||||
type DeferredTurnMaintenanceSignal = "SIGINT" | "SIGTERM";
|
||||
type DeferredTurnMaintenanceProcessLike = Pick<NodeJS.Process, "on" | "off"> &
|
||||
Partial<Pick<NodeJS.Process, "listenerCount" | "kill" | "pid">> & {
|
||||
[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]?: DeferredTurnMaintenanceAbortState;
|
||||
};
|
||||
type DeferredTurnMaintenanceAbortState = {
|
||||
registered: boolean;
|
||||
controllers: Set<AbortController>;
|
||||
cleanupHandlers: Map<DeferredTurnMaintenanceSignal, () => void>;
|
||||
};
|
||||
|
||||
function resolveDeferredTurnMaintenanceAbortState(
|
||||
processLike: DeferredTurnMaintenanceProcessLike,
|
||||
): DeferredTurnMaintenanceAbortState {
|
||||
const existing = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY];
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const created: DeferredTurnMaintenanceAbortState = {
|
||||
registered: false,
|
||||
controllers: new Set<AbortController>(),
|
||||
cleanupHandlers: new Map<DeferredTurnMaintenanceSignal, () => void>(),
|
||||
};
|
||||
processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY] = created;
|
||||
return created;
|
||||
}
|
||||
|
||||
function unregisterDeferredTurnMaintenanceAbortSignalHandlers(
|
||||
processLike: DeferredTurnMaintenanceProcessLike,
|
||||
state: DeferredTurnMaintenanceAbortState,
|
||||
): void {
|
||||
if (!state.registered) {
|
||||
return;
|
||||
}
|
||||
for (const [signal, handler] of state.cleanupHandlers) {
|
||||
processLike.off(signal, handler);
|
||||
}
|
||||
state.cleanupHandlers.clear();
|
||||
state.registered = false;
|
||||
}
|
||||
|
||||
function normalizeSessionKey(sessionKey?: string): string | undefined {
|
||||
return normalizeOptionalString(sessionKey) || undefined;
|
||||
}
|
||||
|
||||
function resolveDeferredTurnMaintenanceLane(sessionKey: string): string {
|
||||
return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`;
|
||||
}
|
||||
|
||||
export function createDeferredTurnMaintenanceAbortSignal(params?: {
|
||||
processLike?: DeferredTurnMaintenanceProcessLike;
|
||||
}): {
|
||||
abortSignal?: AbortSignal;
|
||||
dispose: () => void;
|
||||
} {
|
||||
if (typeof AbortController === "undefined") {
|
||||
return { abortSignal: undefined, dispose: () => {} };
|
||||
}
|
||||
|
||||
const processLike = (params?.processLike ?? process) as DeferredTurnMaintenanceProcessLike;
|
||||
const state = resolveDeferredTurnMaintenanceAbortState(processLike);
|
||||
const handleTerminationSignal = (signalName: DeferredTurnMaintenanceSignal) => {
|
||||
const shouldReraise =
|
||||
typeof processLike.listenerCount === "function"
|
||||
? processLike.listenerCount(signalName) === 1
|
||||
: false;
|
||||
for (const activeController of state.controllers) {
|
||||
if (!activeController.signal.aborted) {
|
||||
activeController.abort(
|
||||
new Error(`received ${signalName} while waiting for deferred maintenance`),
|
||||
);
|
||||
}
|
||||
}
|
||||
state.controllers.clear();
|
||||
unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state);
|
||||
if (shouldReraise && typeof processLike.kill === "function") {
|
||||
try {
|
||||
processLike.kill(processLike.pid ?? process.pid, signalName);
|
||||
} catch {
|
||||
// Ignore shutdown-path failures.
|
||||
}
|
||||
}
|
||||
};
|
||||
if (!state.registered) {
|
||||
state.registered = true;
|
||||
const onSigint = () => handleTerminationSignal("SIGINT");
|
||||
const onSigterm = () => handleTerminationSignal("SIGTERM");
|
||||
state.cleanupHandlers.set("SIGINT", onSigint);
|
||||
state.cleanupHandlers.set("SIGTERM", onSigterm);
|
||||
processLike.on("SIGINT", onSigint);
|
||||
processLike.on("SIGTERM", onSigterm);
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
state.controllers.add(controller);
|
||||
let disposed = false;
|
||||
|
||||
const cleanup = () => {
|
||||
if (disposed) {
|
||||
return;
|
||||
}
|
||||
disposed = true;
|
||||
state.controllers.delete(controller);
|
||||
if (state.controllers.size === 0) {
|
||||
unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state);
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
abortSignal: controller.signal,
|
||||
dispose: cleanup,
|
||||
};
|
||||
}
|
||||
|
||||
export function resetDeferredTurnMaintenanceStateForTest(): void {
|
||||
activeDeferredTurnMaintenanceRuns.clear();
|
||||
const processLike = process as DeferredTurnMaintenanceProcessLike;
|
||||
const state = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY];
|
||||
if (!state) {
|
||||
return;
|
||||
}
|
||||
state.controllers.clear();
|
||||
unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state);
|
||||
delete processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY];
|
||||
}
|
||||
|
||||
function markDeferredTurnMaintenanceTaskScheduleFailure(params: {
|
||||
sessionKey: string;
|
||||
taskId: string;
|
||||
error: unknown;
|
||||
}): void {
|
||||
const errorMessage = formatErrorMessage(params.error);
|
||||
log.warn(`failed to schedule deferred context engine maintenance: ${errorMessage}`);
|
||||
cancelTaskByIdForOwner({
|
||||
taskId: params.taskId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
endedAt: Date.now(),
|
||||
terminalSummary: `Deferred maintenance could not be scheduled: ${errorMessage}`,
|
||||
});
|
||||
}
|
||||
|
||||
function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) {
|
||||
const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice(
|
||||
0,
|
||||
8,
|
||||
)}`;
|
||||
return createQueuedTaskRun({
|
||||
runtime: "acp",
|
||||
taskKind: TURN_MAINTENANCE_TASK_KIND,
|
||||
sourceId: TURN_MAINTENANCE_TASK_KIND,
|
||||
requesterSessionKey: params.sessionKey,
|
||||
ownerKey: params.sessionKey,
|
||||
scopeKind: "session",
|
||||
runId,
|
||||
label: TURN_MAINTENANCE_TASK_LABEL,
|
||||
task: TURN_MAINTENANCE_TASK_TASK,
|
||||
notifyPolicy: "silent",
|
||||
deliveryStatus: "pending",
|
||||
preferMetadata: true,
|
||||
});
|
||||
}
|
||||
|
||||
function promoteTurnMaintenanceTaskVisibility(params: {
|
||||
sessionKey: string;
|
||||
runId: string;
|
||||
notifyPolicy: "done_only" | "state_changes";
|
||||
}) {
|
||||
const task = findTaskByRunIdForOwner({
|
||||
runId: params.runId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
});
|
||||
if (!task) {
|
||||
return createQueuedTaskRun({
|
||||
runtime: "acp",
|
||||
taskKind: TURN_MAINTENANCE_TASK_KIND,
|
||||
sourceId: TURN_MAINTENANCE_TASK_KIND,
|
||||
requesterSessionKey: params.sessionKey,
|
||||
ownerKey: params.sessionKey,
|
||||
scopeKind: "session",
|
||||
runId: params.runId,
|
||||
label: TURN_MAINTENANCE_TASK_LABEL,
|
||||
task: TURN_MAINTENANCE_TASK_TASK,
|
||||
notifyPolicy: params.notifyPolicy,
|
||||
deliveryStatus: "pending",
|
||||
preferMetadata: true,
|
||||
});
|
||||
}
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
runId: params.runId,
|
||||
runtime: "acp",
|
||||
sessionKey: params.sessionKey,
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
if (task.notifyPolicy !== params.notifyPolicy) {
|
||||
updateTaskNotifyPolicyForOwner({
|
||||
taskId: task.taskId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
notifyPolicy: params.notifyPolicy,
|
||||
});
|
||||
}
|
||||
return (
|
||||
findTaskByRunIdForOwner({
|
||||
runId: params.runId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
}) ?? task
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attach runtime-owned transcript rewrite helpers to an existing
|
||||
* context-engine runtime context payload.
|
||||
@@ -19,9 +273,12 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
|
||||
sessionFile: string;
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
allowDeferredCompactionExecution?: boolean;
|
||||
deferTranscriptRewriteToSessionLane?: boolean;
|
||||
}): ContextEngineRuntimeContext {
|
||||
return {
|
||||
...params.runtimeContext,
|
||||
...(params.allowDeferredCompactionExecution ? { allowDeferredCompactionExecution: true } : {}),
|
||||
rewriteTranscriptEntries: async (request) => {
|
||||
if (params.sessionManager) {
|
||||
return rewriteTranscriptEntriesInSessionManager({
|
||||
@@ -29,16 +286,314 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
|
||||
replacements: request.replacements,
|
||||
});
|
||||
}
|
||||
return await rewriteTranscriptEntriesInSessionFile({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
request,
|
||||
});
|
||||
const rewriteTranscriptEntriesInFile = async () =>
|
||||
await rewriteTranscriptEntriesInSessionFile({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
request,
|
||||
});
|
||||
const rewriteSessionKey = normalizeSessionKey(params.sessionKey ?? params.sessionId);
|
||||
if (params.deferTranscriptRewriteToSessionLane && rewriteSessionKey) {
|
||||
return await enqueueCommandInLane(
|
||||
resolveSessionLane(rewriteSessionKey),
|
||||
async () => await rewriteTranscriptEntriesInFile(),
|
||||
);
|
||||
}
|
||||
return await rewriteTranscriptEntriesInFile();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function executeContextEngineMaintenance(params: {
|
||||
contextEngine: ContextEngine;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
sessionFile: string;
|
||||
reason: "bootstrap" | "compaction" | "turn";
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
executionMode: "foreground" | "background";
|
||||
}): Promise<ContextEngineMaintenanceResult | undefined> {
|
||||
if (typeof params.contextEngine.maintain !== "function") {
|
||||
return undefined;
|
||||
}
|
||||
const result = await params.contextEngine.maintain({
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
runtimeContext: buildContextEngineMaintenanceRuntimeContext({
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
sessionManager: params.executionMode === "background" ? undefined : params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
allowDeferredCompactionExecution: params.executionMode === "background",
|
||||
deferTranscriptRewriteToSessionLane: params.executionMode === "background",
|
||||
}),
|
||||
});
|
||||
if (result.changed) {
|
||||
log.info(
|
||||
`[context-engine] maintenance(${params.reason}) changed transcript ` +
|
||||
`rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` +
|
||||
`sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`,
|
||||
);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async function runDeferredTurnMaintenanceWorker(params: {
|
||||
contextEngine: ContextEngine;
|
||||
sessionId: string;
|
||||
sessionKey: string;
|
||||
sessionFile: string;
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
runId: string;
|
||||
}): Promise<void> {
|
||||
let surfacedUserNotice = false;
|
||||
let longRunningTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
const shutdownAbort = createDeferredTurnMaintenanceAbortSignal();
|
||||
const surfaceMaintenanceUpdate = (summary: string, eventSummary: string) => {
|
||||
promoteTurnMaintenanceTaskVisibility({
|
||||
sessionKey: params.sessionKey,
|
||||
runId: params.runId,
|
||||
notifyPolicy: "state_changes",
|
||||
});
|
||||
surfacedUserNotice = true;
|
||||
recordTaskRunProgressByRunId({
|
||||
runId: params.runId,
|
||||
runtime: "acp",
|
||||
sessionKey: params.sessionKey,
|
||||
lastEventAt: Date.now(),
|
||||
progressSummary: summary,
|
||||
eventSummary,
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
const sessionLane = resolveSessionLane(params.sessionKey);
|
||||
const startedWaitingAt = Date.now();
|
||||
let lastWaitNoticeAt = 0;
|
||||
|
||||
for (;;) {
|
||||
while (getQueueSize(sessionLane) > 0) {
|
||||
const now = Date.now();
|
||||
if (
|
||||
now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS &&
|
||||
now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS
|
||||
) {
|
||||
lastWaitNoticeAt = now;
|
||||
surfaceMaintenanceUpdate(
|
||||
"Waiting for the session lane to go idle.",
|
||||
surfacedUserNotice
|
||||
? "Still waiting for the session lane to go idle."
|
||||
: "Deferred maintenance is waiting for the session lane to go idle.",
|
||||
);
|
||||
}
|
||||
await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS, shutdownAbort.abortSignal);
|
||||
}
|
||||
await Promise.resolve();
|
||||
if (getQueueSize(sessionLane) === 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const runningAt = Date.now();
|
||||
startTaskRunByRunId({
|
||||
runId: params.runId,
|
||||
runtime: "acp",
|
||||
sessionKey: params.sessionKey,
|
||||
startedAt: runningAt,
|
||||
lastEventAt: runningAt,
|
||||
progressSummary: "Running deferred maintenance.",
|
||||
eventSummary: "Starting deferred maintenance.",
|
||||
});
|
||||
longRunningTimer = setTimeout(() => {
|
||||
try {
|
||||
surfaceMaintenanceUpdate(
|
||||
"Deferred maintenance is still running.",
|
||||
"Deferred maintenance is still running.",
|
||||
);
|
||||
} catch (error) {
|
||||
log.warn(`failed to surface deferred maintenance progress: ${String(error)}`);
|
||||
}
|
||||
}, TURN_MAINTENANCE_LONG_WAIT_MS);
|
||||
|
||||
const result = await executeContextEngineMaintenance({
|
||||
contextEngine: params.contextEngine,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
reason: "turn",
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
executionMode: "background",
|
||||
});
|
||||
if (longRunningTimer) {
|
||||
clearTimeout(longRunningTimer);
|
||||
longRunningTimer = null;
|
||||
}
|
||||
|
||||
const endedAt = Date.now();
|
||||
completeTaskRunByRunId({
|
||||
runId: params.runId,
|
||||
runtime: "acp",
|
||||
sessionKey: params.sessionKey,
|
||||
endedAt,
|
||||
lastEventAt: endedAt,
|
||||
progressSummary: result?.changed
|
||||
? "Deferred maintenance completed with transcript changes."
|
||||
: "Deferred maintenance completed.",
|
||||
terminalSummary: result?.changed
|
||||
? `Rewrote ${result.rewrittenEntries} transcript entr${result.rewrittenEntries === 1 ? "y" : "ies"} and freed ${result.bytesFreed} bytes.`
|
||||
: "No transcript changes were needed.",
|
||||
});
|
||||
} catch (err) {
|
||||
if (shutdownAbort.abortSignal?.aborted) {
|
||||
if (longRunningTimer) {
|
||||
clearTimeout(longRunningTimer);
|
||||
longRunningTimer = null;
|
||||
}
|
||||
const task = findTaskByRunIdForOwner({
|
||||
runId: params.runId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
});
|
||||
if (task) {
|
||||
cancelTaskByIdForOwner({
|
||||
taskId: task.taskId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
endedAt: Date.now(),
|
||||
terminalSummary: "Deferred maintenance cancelled during shutdown.",
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (longRunningTimer) {
|
||||
clearTimeout(longRunningTimer);
|
||||
longRunningTimer = null;
|
||||
}
|
||||
const endedAt = Date.now();
|
||||
const reason = formatErrorMessage(err);
|
||||
if (!surfacedUserNotice) {
|
||||
promoteTurnMaintenanceTaskVisibility({
|
||||
sessionKey: params.sessionKey,
|
||||
runId: params.runId,
|
||||
notifyPolicy: "done_only",
|
||||
});
|
||||
}
|
||||
failTaskRunByRunId({
|
||||
runId: params.runId,
|
||||
runtime: "acp",
|
||||
sessionKey: params.sessionKey,
|
||||
endedAt,
|
||||
lastEventAt: endedAt,
|
||||
error: reason,
|
||||
progressSummary: "Deferred maintenance failed.",
|
||||
terminalSummary: reason,
|
||||
});
|
||||
log.warn(`deferred context engine maintenance failed: ${reason}`);
|
||||
} finally {
|
||||
shutdownAbort.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceScheduleParams): void {
|
||||
const sessionKey = normalizeSessionKey(params.sessionKey);
|
||||
if (!sessionKey) {
|
||||
return;
|
||||
}
|
||||
const activeRun = activeDeferredTurnMaintenanceRuns.get(sessionKey);
|
||||
if (activeRun) {
|
||||
activeRun.rerunRequested = true;
|
||||
activeRun.latestParams = { ...params, sessionKey };
|
||||
return;
|
||||
}
|
||||
|
||||
const existingTask = findActiveSessionTask({
|
||||
sessionKey,
|
||||
runtime: "acp",
|
||||
taskKind: TURN_MAINTENANCE_TASK_KIND,
|
||||
});
|
||||
const reusableTask = existingTask?.runId?.trim() ? existingTask : undefined;
|
||||
if (existingTask && !reusableTask) {
|
||||
updateTaskNotifyPolicyForOwner({
|
||||
taskId: existingTask.taskId,
|
||||
callerOwnerKey: sessionKey,
|
||||
notifyPolicy: "silent",
|
||||
});
|
||||
cancelTaskByIdForOwner({
|
||||
taskId: existingTask.taskId,
|
||||
callerOwnerKey: sessionKey,
|
||||
endedAt: Date.now(),
|
||||
terminalSummary: "Superseded by refreshed deferred maintenance task.",
|
||||
});
|
||||
}
|
||||
const task =
|
||||
reusableTask ??
|
||||
buildTurnMaintenanceTaskDescriptor({
|
||||
sessionKey,
|
||||
});
|
||||
log.info(
|
||||
`[context-engine] deferred turn maintenance ${reusableTask ? "resuming" : "queued"} ` +
|
||||
`taskId=${task.taskId} sessionKey=${sessionKey} lane=${resolveDeferredTurnMaintenanceLane(sessionKey)}`,
|
||||
);
|
||||
|
||||
const schedulerAbort = createDeferredTurnMaintenanceAbortSignal();
|
||||
let runPromise: Promise<void>;
|
||||
try {
|
||||
runPromise = enqueueCommandInLane(resolveDeferredTurnMaintenanceLane(sessionKey), async () =>
|
||||
runDeferredTurnMaintenanceWorker({
|
||||
contextEngine: params.contextEngine,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
runId: task.runId!,
|
||||
}),
|
||||
);
|
||||
} catch (err) {
|
||||
schedulerAbort.dispose();
|
||||
markDeferredTurnMaintenanceTaskScheduleFailure({
|
||||
sessionKey,
|
||||
taskId: task.taskId,
|
||||
error: err,
|
||||
});
|
||||
return;
|
||||
}
|
||||
let state!: DeferredTurnMaintenanceRunState;
|
||||
const trackedPromise = runPromise
|
||||
.catch((err) => {
|
||||
markDeferredTurnMaintenanceTaskScheduleFailure({
|
||||
sessionKey,
|
||||
taskId: task.taskId,
|
||||
error: err,
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
schedulerAbort.dispose();
|
||||
const current = activeDeferredTurnMaintenanceRuns.get(sessionKey);
|
||||
if (current !== state) {
|
||||
return;
|
||||
}
|
||||
const shutdownTriggered = schedulerAbort.abortSignal?.aborted === true;
|
||||
const rerunParams =
|
||||
current.rerunRequested && !shutdownTriggered ? current.latestParams : undefined;
|
||||
activeDeferredTurnMaintenanceRuns.delete(sessionKey);
|
||||
if (rerunParams) {
|
||||
scheduleDeferredTurnMaintenance(rerunParams);
|
||||
}
|
||||
});
|
||||
state = {
|
||||
promise: trackedPromise,
|
||||
rerunRequested: false,
|
||||
latestParams: { ...params, sessionKey },
|
||||
};
|
||||
activeDeferredTurnMaintenanceRuns.set(sessionKey, state);
|
||||
void trackedPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run optional context-engine transcript maintenance and normalize the result.
|
||||
*/
|
||||
@@ -50,32 +605,45 @@ export async function runContextEngineMaintenance(params: {
|
||||
reason: "bootstrap" | "compaction" | "turn";
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
executionMode?: "foreground" | "background";
|
||||
}): Promise<ContextEngineMaintenanceResult | undefined> {
|
||||
if (typeof params.contextEngine?.maintain !== "function") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await params.contextEngine.maintain({
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
runtimeContext: buildContextEngineMaintenanceRuntimeContext({
|
||||
const executionMode = params.executionMode ?? "foreground";
|
||||
const shouldDefer =
|
||||
params.reason === "turn" &&
|
||||
executionMode !== "background" &&
|
||||
params.contextEngine.info.turnMaintenanceMode === "background";
|
||||
|
||||
if (shouldDefer) {
|
||||
try {
|
||||
scheduleDeferredTurnMaintenance({
|
||||
contextEngine: params.contextEngine,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionKey: params.sessionKey ?? params.sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
}),
|
||||
});
|
||||
if (result.changed) {
|
||||
log.info(
|
||||
`[context-engine] maintenance(${params.reason}) changed transcript ` +
|
||||
`rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` +
|
||||
`sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`,
|
||||
);
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`);
|
||||
}
|
||||
return result;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
return await executeContextEngineMaintenance({
|
||||
contextEngine: params.contextEngine,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
reason: params.reason,
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
executionMode,
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`);
|
||||
return undefined;
|
||||
|
||||
@@ -50,6 +50,13 @@ export type ContextEngineInfo = {
|
||||
version?: string;
|
||||
/** True when the engine manages its own compaction lifecycle. */
|
||||
ownsCompaction?: boolean;
|
||||
/**
|
||||
* Controls how turn-triggered maintenance should be executed.
|
||||
*
|
||||
* Engines remain compatible by default unless the host explicitly opts into
|
||||
* background turn maintenance.
|
||||
*/
|
||||
turnMaintenanceMode?: "foreground" | "background";
|
||||
};
|
||||
|
||||
export type SubagentSpawnPreparation = {
|
||||
@@ -128,6 +135,11 @@ export type ContextEnginePromptCacheInfo = {
|
||||
};
|
||||
|
||||
export type ContextEngineRuntimeContext = Record<string, unknown> & {
|
||||
/**
|
||||
* True when the host has explicitly opted this maintenance run into
|
||||
* consuming deferred compaction debt.
|
||||
*/
|
||||
allowDeferredCompactionExecution?: boolean;
|
||||
/** Optional prompt-cache telemetry for cache-aware engines. */
|
||||
promptCache?: ContextEnginePromptCacheInfo;
|
||||
/**
|
||||
|
||||
@@ -43,4 +43,39 @@ describe("backoff helpers", () => {
|
||||
cause: expect.anything(),
|
||||
});
|
||||
});
|
||||
|
||||
it("advances with fake timers", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const sleeper = sleepWithAbort(50);
|
||||
await vi.advanceTimersByTimeAsync(49);
|
||||
await expect(
|
||||
Promise.race([sleeper.then(() => "done"), Promise.resolve("pending")]),
|
||||
).resolves.toBe("pending");
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
await expect(sleeper).resolves.toBeUndefined();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects if the signal aborts during listener registration", async () => {
|
||||
let aborted = false;
|
||||
const signal = {
|
||||
get aborted() {
|
||||
return aborted;
|
||||
},
|
||||
get reason() {
|
||||
return new Error("listener-registration-race");
|
||||
},
|
||||
addEventListener(_event: string, _listener: EventListenerOrEventListenerObject) {
|
||||
aborted = true;
|
||||
},
|
||||
removeEventListener() {},
|
||||
} as unknown as AbortSignal;
|
||||
|
||||
await expect(sleepWithAbort(50, signal)).rejects.toMatchObject({
|
||||
message: "aborted",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import { setTimeout as delay } from "node:timers/promises";
|
||||
|
||||
export type BackoffPolicy = {
|
||||
initialMs: number;
|
||||
maxMs: number;
|
||||
@@ -17,12 +15,45 @@ export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) {
|
||||
if (ms <= 0) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await delay(ms, undefined, { signal: abortSignal });
|
||||
} catch (err) {
|
||||
if (abortSignal?.aborted) {
|
||||
throw new Error("aborted", { cause: err });
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
let settled = false;
|
||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||
const onAbort = () => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
timer = null;
|
||||
}
|
||||
if (abortSignal) {
|
||||
abortSignal.removeEventListener("abort", onAbort);
|
||||
}
|
||||
reject(new Error("aborted", { cause: abortSignal?.reason ?? new Error("aborted") }));
|
||||
};
|
||||
|
||||
if (abortSignal) {
|
||||
abortSignal.addEventListener("abort", onAbort, { once: true });
|
||||
if (abortSignal.aborted) {
|
||||
onAbort();
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
||||
timer = setTimeout(() => {
|
||||
settled = true;
|
||||
if (abortSignal) {
|
||||
abortSignal.removeEventListener("abort", onAbort);
|
||||
}
|
||||
timer = null;
|
||||
resolve();
|
||||
}, ms);
|
||||
|
||||
if (abortSignal) {
|
||||
if (abortSignal.aborted) {
|
||||
onAbort();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3,9 +3,11 @@ import {
|
||||
findTaskByRunId,
|
||||
getTaskById,
|
||||
listTasksForRelatedSessionKey,
|
||||
markTaskTerminalById as markTaskTerminalRecordById,
|
||||
resolveTaskForLookupToken,
|
||||
updateTaskNotifyPolicyById,
|
||||
} from "./task-registry.js";
|
||||
import type { TaskRecord } from "./task-registry.types.js";
|
||||
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
|
||||
import { buildTaskStatusSnapshot } from "./task-status.js";
|
||||
|
||||
function canOwnerAccessTask(task: TaskRecord, callerOwnerKey: string): boolean {
|
||||
@@ -31,6 +33,47 @@ export function findTaskByRunIdForOwner(params: {
|
||||
return task && canOwnerAccessTask(task, params.callerOwnerKey) ? task : undefined;
|
||||
}
|
||||
|
||||
/** Update an owner-visible task's notification policy. */
|
||||
export function updateTaskNotifyPolicyForOwner(params: {
|
||||
taskId: string;
|
||||
callerOwnerKey: string;
|
||||
notifyPolicy: TaskNotifyPolicy;
|
||||
}): TaskRecord | null {
|
||||
const task = getTaskByIdForOwner({
|
||||
taskId: params.taskId,
|
||||
callerOwnerKey: params.callerOwnerKey,
|
||||
});
|
||||
if (!task) {
|
||||
return null;
|
||||
}
|
||||
return updateTaskNotifyPolicyById({
|
||||
taskId: task.taskId,
|
||||
notifyPolicy: params.notifyPolicy,
|
||||
});
|
||||
}
|
||||
|
||||
/** Mark an owner-visible task as cancelled with a caller-provided summary. */
|
||||
export function cancelTaskByIdForOwner(params: {
|
||||
taskId: string;
|
||||
callerOwnerKey: string;
|
||||
endedAt: number;
|
||||
terminalSummary?: string | null;
|
||||
}): TaskRecord | null {
|
||||
const task = getTaskByIdForOwner({
|
||||
taskId: params.taskId,
|
||||
callerOwnerKey: params.callerOwnerKey,
|
||||
});
|
||||
if (!task) {
|
||||
return null;
|
||||
}
|
||||
return markTaskTerminalRecordById({
|
||||
taskId: task.taskId,
|
||||
status: "cancelled",
|
||||
endedAt: params.endedAt,
|
||||
terminalSummary: params.terminalSummary,
|
||||
});
|
||||
}
|
||||
|
||||
export function listTasksForRelatedSessionKeyForOwner(params: {
|
||||
relatedSessionKey: string;
|
||||
callerOwnerKey: string;
|
||||
|
||||
Reference in New Issue
Block a user