Files
openclaw/src/auto-reply/reply/queue/state.ts
Peter Steinberger 694ca50e97 Revert "refactor: move runtime state to SQLite"
This reverts commit f91de52f0d.
2026-05-13 13:33:38 +01:00

159 lines
4.7 KiB
TypeScript

import { resolveGlobalMap } from "../../../shared/global-singleton.js";
import { normalizeOptionalString } from "../../../shared/string-coerce.js";
import { applyQueueRuntimeSettings } from "../../../utils/queue-helpers.js";
import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js";
export type FollowupQueueState = {
items: FollowupRun[];
draining: boolean;
lastEnqueuedAt: number;
mode: QueueMode;
debounceMs: number;
cap: number;
dropPolicy: QueueDropPolicy;
droppedCount: number;
summaryLines: string[];
lastRun?: FollowupRun["run"];
};
export const DEFAULT_QUEUE_DEBOUNCE_MS = 500;
export const DEFAULT_QUEUE_CAP = 20;
export const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
/**
* Share followup queues across bundled chunks so busy-session enqueue/drain
* logic observes one queue registry per process.
*/
const FOLLOWUP_QUEUES_KEY = Symbol.for("openclaw.followupQueues");
export const FOLLOWUP_QUEUES = resolveGlobalMap<string, FollowupQueueState>(FOLLOWUP_QUEUES_KEY);
export function getExistingFollowupQueue(key: string): FollowupQueueState | undefined {
const cleaned = key.trim();
if (!cleaned) {
return undefined;
}
return FOLLOWUP_QUEUES.get(cleaned);
}
export function getFollowupQueue(key: string, settings: QueueSettings): FollowupQueueState {
const existing = FOLLOWUP_QUEUES.get(key);
if (existing) {
applyQueueRuntimeSettings({
target: existing,
settings,
});
return existing;
}
const created: FollowupQueueState = {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
debounceMs:
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: DEFAULT_QUEUE_DEBOUNCE_MS,
cap:
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: DEFAULT_QUEUE_CAP,
dropPolicy: settings.dropPolicy ?? DEFAULT_QUEUE_DROP,
droppedCount: 0,
summaryLines: [],
};
applyQueueRuntimeSettings({
target: created,
settings,
});
FOLLOWUP_QUEUES.set(key, created);
return created;
}
export function clearFollowupQueue(key: string): number {
const cleaned = key.trim();
const queue = getExistingFollowupQueue(cleaned);
if (!queue) {
return 0;
}
const cleared = queue.items.length + queue.droppedCount;
queue.items.length = 0;
queue.droppedCount = 0;
queue.summaryLines = [];
queue.lastRun = undefined;
queue.lastEnqueuedAt = 0;
FOLLOWUP_QUEUES.delete(cleaned);
return cleared;
}
export function refreshQueuedFollowupSession(params: {
key: string;
previousSessionId?: string;
nextSessionId?: string;
nextSessionFile?: string;
nextProvider?: string;
nextModel?: string;
nextModelOverrideSource?: "auto" | "user";
nextAuthProfileId?: string;
nextAuthProfileIdSource?: "auto" | "user";
}): void {
const cleaned = params.key.trim();
if (!cleaned) {
return;
}
const queue = getExistingFollowupQueue(cleaned);
if (!queue) {
return;
}
const shouldRewriteSession =
Boolean(params.previousSessionId) &&
Boolean(params.nextSessionId) &&
params.previousSessionId !== params.nextSessionId;
const shouldRewriteSelection =
typeof params.nextProvider === "string" ||
typeof params.nextModel === "string" ||
Object.hasOwn(params, "nextModelOverrideSource") ||
Object.hasOwn(params, "nextAuthProfileId") ||
Object.hasOwn(params, "nextAuthProfileIdSource");
if (!shouldRewriteSession && !shouldRewriteSelection) {
return;
}
const rewriteRun = (run?: FollowupRun["run"]) => {
if (!run) {
return;
}
if (shouldRewriteSession && run.sessionId === params.previousSessionId) {
run.sessionId = params.nextSessionId!;
const nextSessionFile = normalizeOptionalString(params.nextSessionFile);
if (nextSessionFile) {
run.sessionFile = nextSessionFile;
}
}
if (shouldRewriteSelection) {
if (typeof params.nextProvider === "string") {
run.provider = params.nextProvider;
}
if (typeof params.nextModel === "string") {
run.model = params.nextModel;
}
if (Object.hasOwn(params, "nextModelOverrideSource")) {
run.hasSessionModelOverride = Boolean(run.provider || run.model);
run.modelOverrideSource = params.nextModelOverrideSource;
}
if (Object.hasOwn(params, "nextAuthProfileId")) {
run.authProfileId = normalizeOptionalString(params.nextAuthProfileId);
}
if (Object.hasOwn(params, "nextAuthProfileIdSource")) {
run.authProfileIdSource = run.authProfileId ? params.nextAuthProfileIdSource : undefined;
}
}
};
rewriteRun(queue.lastRun);
for (const item of queue.items) {
rewriteRun(item.run);
}
}