mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:10:43 +00:00
fix: route deferred maintenance through task seams
This commit is contained in:
@@ -13,14 +13,14 @@ import {
|
||||
createQueuedTaskRun,
|
||||
failTaskRunByRunId,
|
||||
recordTaskRunProgressByRunId,
|
||||
setDetachedTaskDeliveryStatusByRunId,
|
||||
startTaskRunByRunId,
|
||||
} from "../../tasks/task-executor.js";
|
||||
import {
|
||||
findTaskByRunId,
|
||||
markTaskTerminalById,
|
||||
setTaskRunDeliveryStatusByRunId,
|
||||
updateTaskNotifyPolicyById,
|
||||
} from "../../tasks/task-registry.js";
|
||||
cancelTaskByIdForOwner,
|
||||
findTaskByRunIdForOwner,
|
||||
updateTaskNotifyPolicyForOwner,
|
||||
} from "../../tasks/task-owner-access.js";
|
||||
import { findActiveSessionTask } from "../session-async-task-status.js";
|
||||
import { resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
@@ -182,14 +182,15 @@ export function resetDeferredTurnMaintenanceStateForTest(): void {
|
||||
}
|
||||
|
||||
function markDeferredTurnMaintenanceTaskScheduleFailure(params: {
|
||||
sessionKey: string;
|
||||
taskId: string;
|
||||
error: unknown;
|
||||
}): void {
|
||||
const errorMessage = formatErrorMessage(params.error);
|
||||
log.warn(`failed to schedule deferred context engine maintenance: ${errorMessage}`);
|
||||
markTaskTerminalById({
|
||||
cancelTaskByIdForOwner({
|
||||
taskId: params.taskId,
|
||||
status: "cancelled",
|
||||
callerOwnerKey: params.sessionKey,
|
||||
endedAt: Date.now(),
|
||||
terminalSummary: `Deferred maintenance could not be scheduled: ${errorMessage}`,
|
||||
});
|
||||
@@ -221,7 +222,10 @@ function promoteTurnMaintenanceTaskVisibility(params: {
|
||||
runId: string;
|
||||
notifyPolicy: "done_only" | "state_changes";
|
||||
}) {
|
||||
const task = findTaskByRunId(params.runId);
|
||||
const task = findTaskByRunIdForOwner({
|
||||
runId: params.runId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
});
|
||||
if (!task) {
|
||||
return createQueuedTaskRun({
|
||||
runtime: "acp",
|
||||
@@ -238,19 +242,25 @@ function promoteTurnMaintenanceTaskVisibility(params: {
|
||||
preferMetadata: true,
|
||||
});
|
||||
}
|
||||
setTaskRunDeliveryStatusByRunId({
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
runId: params.runId,
|
||||
runtime: "acp",
|
||||
sessionKey: params.sessionKey,
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
if (task.notifyPolicy !== params.notifyPolicy) {
|
||||
updateTaskNotifyPolicyById({
|
||||
updateTaskNotifyPolicyForOwner({
|
||||
taskId: task.taskId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
notifyPolicy: params.notifyPolicy,
|
||||
});
|
||||
}
|
||||
return findTaskByRunId(params.runId) ?? task;
|
||||
return (
|
||||
findTaskByRunIdForOwner({
|
||||
runId: params.runId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
}) ?? task
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -285,8 +295,9 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
|
||||
});
|
||||
const rewriteSessionKey = normalizeSessionKey(params.sessionKey ?? params.sessionId);
|
||||
if (params.deferTranscriptRewriteToSessionLane && rewriteSessionKey) {
|
||||
return await enqueueCommandInLane(resolveSessionLane(rewriteSessionKey), async () =>
|
||||
await rewriteTranscriptEntriesInFile(),
|
||||
return await enqueueCommandInLane(
|
||||
resolveSessionLane(rewriteSessionKey),
|
||||
async () => await rewriteTranscriptEntriesInFile(),
|
||||
);
|
||||
}
|
||||
return await rewriteTranscriptEntriesInFile();
|
||||
@@ -443,11 +454,14 @@ async function runDeferredTurnMaintenanceWorker(params: {
|
||||
clearTimeout(longRunningTimer);
|
||||
longRunningTimer = null;
|
||||
}
|
||||
const task = findTaskByRunId(params.runId);
|
||||
const task = findTaskByRunIdForOwner({
|
||||
runId: params.runId,
|
||||
callerOwnerKey: params.sessionKey,
|
||||
});
|
||||
if (task) {
|
||||
markTaskTerminalById({
|
||||
cancelTaskByIdForOwner({
|
||||
taskId: task.taskId,
|
||||
status: "cancelled",
|
||||
callerOwnerKey: params.sessionKey,
|
||||
endedAt: Date.now(),
|
||||
terminalSummary: "Deferred maintenance cancelled during shutdown.",
|
||||
});
|
||||
@@ -502,13 +516,14 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule
|
||||
});
|
||||
const reusableTask = existingTask?.runId?.trim() ? existingTask : undefined;
|
||||
if (existingTask && !reusableTask) {
|
||||
updateTaskNotifyPolicyById({
|
||||
updateTaskNotifyPolicyForOwner({
|
||||
taskId: existingTask.taskId,
|
||||
callerOwnerKey: sessionKey,
|
||||
notifyPolicy: "silent",
|
||||
});
|
||||
markTaskTerminalById({
|
||||
cancelTaskByIdForOwner({
|
||||
taskId: existingTask.taskId,
|
||||
status: "cancelled",
|
||||
callerOwnerKey: sessionKey,
|
||||
endedAt: Date.now(),
|
||||
terminalSummary: "Superseded by refreshed deferred maintenance task.",
|
||||
});
|
||||
@@ -538,6 +553,7 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule
|
||||
);
|
||||
} catch (err) {
|
||||
markDeferredTurnMaintenanceTaskScheduleFailure({
|
||||
sessionKey,
|
||||
taskId: task.taskId,
|
||||
error: err,
|
||||
});
|
||||
@@ -547,6 +563,7 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule
|
||||
const trackedPromise = runPromise
|
||||
.catch((err) => {
|
||||
markDeferredTurnMaintenanceTaskScheduleFailure({
|
||||
sessionKey,
|
||||
taskId: task.taskId,
|
||||
error: err,
|
||||
});
|
||||
|
||||
@@ -3,9 +3,11 @@ import {
|
||||
findTaskByRunId,
|
||||
getTaskById,
|
||||
listTasksForRelatedSessionKey,
|
||||
markTaskTerminalById as markTaskTerminalRecordById,
|
||||
resolveTaskForLookupToken,
|
||||
updateTaskNotifyPolicyById,
|
||||
} from "./task-registry.js";
|
||||
import type { TaskRecord } from "./task-registry.types.js";
|
||||
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
|
||||
import { buildTaskStatusSnapshot } from "./task-status.js";
|
||||
|
||||
function canOwnerAccessTask(task: TaskRecord, callerOwnerKey: string): boolean {
|
||||
@@ -31,6 +33,47 @@ export function findTaskByRunIdForOwner(params: {
|
||||
return task && canOwnerAccessTask(task, params.callerOwnerKey) ? task : undefined;
|
||||
}
|
||||
|
||||
/** Update an owner-visible task's notification policy. */
|
||||
export function updateTaskNotifyPolicyForOwner(params: {
|
||||
taskId: string;
|
||||
callerOwnerKey: string;
|
||||
notifyPolicy: TaskNotifyPolicy;
|
||||
}): TaskRecord | null {
|
||||
const task = getTaskByIdForOwner({
|
||||
taskId: params.taskId,
|
||||
callerOwnerKey: params.callerOwnerKey,
|
||||
});
|
||||
if (!task) {
|
||||
return null;
|
||||
}
|
||||
return updateTaskNotifyPolicyById({
|
||||
taskId: task.taskId,
|
||||
notifyPolicy: params.notifyPolicy,
|
||||
});
|
||||
}
|
||||
|
||||
/** Mark an owner-visible task as cancelled with a caller-provided summary. */
|
||||
export function cancelTaskByIdForOwner(params: {
|
||||
taskId: string;
|
||||
callerOwnerKey: string;
|
||||
endedAt: number;
|
||||
terminalSummary?: string | null;
|
||||
}): TaskRecord | null {
|
||||
const task = getTaskByIdForOwner({
|
||||
taskId: params.taskId,
|
||||
callerOwnerKey: params.callerOwnerKey,
|
||||
});
|
||||
if (!task) {
|
||||
return null;
|
||||
}
|
||||
return markTaskTerminalRecordById({
|
||||
taskId: task.taskId,
|
||||
status: "cancelled",
|
||||
endedAt: params.endedAt,
|
||||
terminalSummary: params.terminalSummary,
|
||||
});
|
||||
}
|
||||
|
||||
export function listTasksForRelatedSessionKeyForOwner(params: {
|
||||
relatedSessionKey: string;
|
||||
callerOwnerKey: string;
|
||||
|
||||
Reference in New Issue
Block a user