Files
openclaw/src/shared/store-writer-queue.ts
Peter Steinberger 0b8aabe864 docs: document auth profile failure policy contract (#89613)
* docs: document markdown marker renderer

* docs: document rendered markdown chunking

* docs: document markdown text chunking

* docs: document shared text chunking

* docs: document plugin text chunking exports

* docs: document avatar policy constants

* docs: document node match candidates

* docs: document scoped expiring id cache

* docs: document runtime import normalization

* docs: document string sample summaries

* docs: document session usage timeseries types

* docs: document session usage response types

* docs: document manifest frontmatter shapes

* docs: document channel route input metadata

* docs: document pair loop guard settings

* docs: document migration config patch helpers

* docs: document api provider registry

* docs: document tool call repair payloads

* docs: document plugin tool payload helpers

* docs: document lazy promise loader

* docs: document store writer queue state

* docs: document thread binding lifecycle

* docs: document concurrency helper contract

* docs: document gateway client info contract

* docs: document delivery context contracts

* docs: document secret ref defaults contract

* docs: document command gating contract

* docs: document avatar policy contract

* docs: document node match policy

* docs: document message channel normalization

* docs: document boolean parsing contract

* docs: document zod parse helpers

* docs: document direct dm guard policy

* docs: document fixed window limiter contract

* docs: document node presence event contract

* docs: document secret normalization contract

* docs: document progress draft line removal

* docs: document usage formatting contracts

* docs: document agent run status contract

* docs: document runtime import helpers

* docs: document provider utility ownership

* docs: document invalid config helpers

* docs: document json compat parser

* docs: document channel config metadata ownership

* docs: document channel logging helpers

* docs: document sender identity validation ownership

* docs: document string sampling helper

* docs: document global singleton helpers

* docs: document transcript tool helpers

* docs: document exec safe-bin normalization

* docs: document reaction level resolver

* docs: document account snapshot redaction boundary

* docs: document messaging target helpers

* docs: document thread binding messages

* docs: document conversation binding context

* docs: document conversation resolution helper

* docs: document owner display secret retention

* docs: document provider request config types

* docs: document skills config types

* docs: document memory config types

* docs: document imessage config types

* docs: document crestodian config types

* docs: document tools config policies

* docs: document shared config base types

* docs: document channel config contracts

* docs: document openclaw config state types

* docs: document model config contracts

* docs: document shared agent config types

* docs: document agent defaults config types

* docs: document secret input contracts

* docs: document auth config contracts

* docs: document gateway config contracts

* docs: document tool call stream repair contracts

* docs: document memory host facades

* docs: document llm core contracts

* docs: document markdown core contracts

* docs: document gateway connect error contracts

* docs: document gateway protocol primitives

* docs: document gateway frame schemas

* docs: document gateway device schemas

* docs: document gateway environment schemas

* docs: document gateway push schemas

* docs: document gateway plugin schemas

* docs: document gateway artifact schemas

* docs: document gateway command schemas

* docs: document gateway task schemas

* docs: document gateway exec approval schemas

* docs: document gateway secret schemas

* docs: document gateway config schemas

* docs: document gateway snapshot schemas

* docs: document gateway chat schemas

* docs: document gateway wizard schemas

* docs: document gateway node schemas

* docs: document gateway plugin approval schemas

* docs: document gateway talk schemas

* docs: document gateway agent schemas

* docs: document gateway session schemas

* docs: document gateway cron schemas

* docs: document gateway agent model skill schemas

* docs: document gateway skill proposal tool schemas

* docs: document gateway protocol registry

* docs: document gateway channel status schemas

* docs: document gateway schema regression tests

* docs: document gateway schema barrel

* docs: document gateway validator tests

* docs: document gateway primitive push tests

* docs: document gateway contract tests

* docs: document native protocol guard

* docs: document channel schema tests

* docs: document gateway protocol smoke tests

* docs: document gateway protocol entrypoint

* docs: document gateway protocol type exports

* docs: document gateway error codes

* docs: document protocol schema registry

* docs: document talk audio codec

* docs: document talk activation names

* docs: document talk consult questions

* docs: document talk consult tool

* docs: document talk run control contracts

* docs: document talk run control adapter

* docs: document talkback consult queue

* docs: document talk consult transcript guard

* docs: document talk fast context runtime

* docs: document forced talk consult coordinator

* docs: document talk output activity tracker

* docs: document talk event metrics

* docs: document talk diagnostics

* docs: document talk observability hook

* docs: document talk provider resolver

* docs: document talk provider registry

* docs: document talk runtime primitives

* docs: document talk consult controller logs

* docs: document channel identity helpers

* docs: document channel account allowlist helpers

* docs: document channel metadata draft controls

* docs: document channel ingress policy

* docs: document channel sender access gates

* docs: document channel catalog message contracts

* docs: document channel account plugin helpers

* docs: document configured binding helpers

* docs: document channel acp approval config helpers

* docs: document channel bundled config write helpers

* docs: document channel plugin utility contracts

* docs: document channel config access helpers

* docs: document channel message action helpers

* docs: document channel outbound runtime helpers

* docs: document channel pairing promotion helpers

* docs: document channel registry helpers

* docs: document channel setup wizard helpers

* docs: document channel lifecycle status helpers

* docs: document channel target thread helpers

* docs: document channel session binding helpers

* docs: document channel package module probes

* docs: document channel setup wizard contracts

* docs: document channel plugin API barrels

* docs: document channel contract test helpers

* docs: document channel core helpers

* docs: document small core facades

* docs: document provider runtime helpers

* docs: document persistence and realtime helpers

* docs: document mcp and state helpers

* docs: document tool planner contracts

* docs: document music generation runtime

* docs: document crestodian command flow

* docs: document utility helpers

* docs: document node host helpers

* docs: document transcript contracts

* docs: document trajectory export contracts

* docs: document image generation contracts

* docs: document routing helper contracts

* docs: document session helper contracts

* docs: document video generation contracts

* docs: document model catalog contracts

* docs: document proxy capture contracts

* docs: document status rendering contracts

* docs: document test helper contracts

* docs: document wizard setup contracts

* docs: document process contracts

* docs: document memory host sdk contracts

* docs: document tts contracts

* docs: document secrets runtime contracts

* docs: document shared helper contracts

* docs: document hook runtime contracts

* docs: document security audit contracts

* docs: document flow contracts

* docs: document media understanding contracts

* docs: document tui contracts

* docs: document logging contracts

* docs: document llm contracts

* docs: document cron contracts

* docs: document daemon contracts

* docs: document task contracts

* docs: document acp contracts

* docs: document test utility contracts

* docs: document skill contracts

* docs: document config contracts

* docs: document outbound infra contracts

* docs: document command analysis contracts

* docs: document provider usage infra contracts

* docs: document file safety infra contracts

* docs: document exec approval infra contracts

* docs: document gateway runtime infra contracts

* docs: document infra utility contracts

* docs: document infra queue storage contracts

* docs: document heartbeat infra contracts

* docs: document remaining infra contracts

* docs: document gateway auth contracts

* docs: document gateway display helpers

* docs: document gateway http helpers

* docs: document gateway node helpers

* docs: document gateway mcp helpers

* docs: document gateway support helpers

* docs: document gateway server runtime helpers

* docs: document gateway runtime bootstrap helpers

* docs: document gateway session events

* docs: document gateway utility helpers

* docs: document gateway talk helpers

* docs: document gateway helper contracts

* docs: document gateway server method helpers

* docs: document gateway server auth helpers

* docs: document gateway server tests

* docs: document gateway test helpers

* docs: document gateway node tests

* docs: document gateway channel tests

* docs: document gateway session tests

* docs: document gateway server startup tests

* docs: document gateway tool test helpers

* docs: document gateway server test helpers

* docs: document gateway server method tests

* docs: document remaining gateway tests

* docs: document plugin sdk public subpaths

* docs: document plugin sdk runtime helpers

* docs: document plugin sdk memory provider helpers

* docs: document plugin sdk runtime facades

* docs: document plugin sdk command approval helpers

* docs: document plugin sdk runtime types

* docs: document plugin sdk browser account helpers

* docs: document plugin sdk media memory helpers

* docs: document plugin sdk core tests

* docs: document plugin sdk contract helpers

* docs: document plugin sdk test helpers

* docs: document remaining plugin sdk tests

* docs: document cli utility helpers

* docs: document cli runtime helpers

* docs: document cli command registration helpers

* docs: document node cli helpers

* docs: document cli program registration

* docs: document message cli registration

* docs: document daemon cli helpers

* docs: document cli route parsers
2026-06-03 15:20:39 -07:00

145 lines
4.3 KiB
TypeScript

/** Pending exclusive store write plus the promise hooks for its caller. */
export type StoreWriterTask = {
/** Write operation to run once earlier tasks for the same store path finish. */
fn: () => Promise<unknown>;
/** Resolves the caller's promise with the write result. */
resolve: (value: unknown) => void;
/** Rejects the caller's promise with the write failure or test cleanup error. */
reject: (reason: unknown) => void;
};
/** Per-store-path FIFO queue that serializes file writes within one process. */
export type StoreWriterQueue = {
/** True while a drain loop owns this queue. */
running: boolean;
/** Writes waiting behind the active drain. */
pending: StoreWriterTask[];
/** Active drain promise, reused by waiters until the current batch settles. */
drainPromise: Promise<void> | null;
};
/** Store writer queues keyed by the canonical store path. */
export type StoreWriterQueues = Map<string, StoreWriterQueue>;
function getOrCreateStoreWriterQueue(
queues: StoreWriterQueues,
storePath: string,
): StoreWriterQueue {
const existing = queues.get(storePath);
if (existing) {
return existing;
}
const created: StoreWriterQueue = { running: false, pending: [], drainPromise: null };
queues.set(storePath, created);
return created;
}
async function drainStoreWriterQueue(queues: StoreWriterQueues, storePath: string): Promise<void> {
const queue = queues.get(storePath);
if (!queue) {
return;
}
if (queue.drainPromise) {
await queue.drainPromise;
return;
}
queue.running = true;
queue.drainPromise = (async () => {
try {
while (queue.pending.length > 0) {
const task = queue.pending.shift();
if (!task) {
continue;
}
let result: unknown;
let failed: unknown;
let hasFailure = false;
try {
result = await task.fn();
} catch (err) {
hasFailure = true;
failed = err;
}
if (hasFailure) {
task.reject(failed);
continue;
}
task.resolve(result);
}
} finally {
queue.running = false;
queue.drainPromise = null;
if (queue.pending.length === 0) {
queues.delete(storePath);
} else {
// Late enqueues after the loop drained run in a fresh microtask so this
// drainPromise can settle before the next writer batch starts.
queueMicrotask(() => {
void drainStoreWriterQueue(queues, storePath);
});
}
}
})();
await queue.drainPromise;
}
/** Runs one store write after prior writes for the same store path have finished. */
export async function runQueuedStoreWrite<T>(params: {
queues: StoreWriterQueues;
storePath: string;
label: string;
fn: () => Promise<T>;
}): Promise<T> {
if (!params.storePath || typeof params.storePath !== "string") {
throw new Error(
`${params.label}: storePath must be a non-empty string, got ${JSON.stringify(
params.storePath,
)}`,
);
}
const queue = getOrCreateStoreWriterQueue(params.queues, params.storePath);
return await new Promise<T>((resolve, reject) => {
const task: StoreWriterTask = {
fn: async () => await params.fn(),
resolve: (value) => resolve(value as T),
reject,
};
queue.pending.push(task);
void drainStoreWriterQueue(params.queues, params.storePath);
});
}
/** Rejects pending queued writes and clears queue state for test cleanup. */
export function clearStoreWriterQueuesForTest(queues: StoreWriterQueues, message: string): void {
for (const queue of queues.values()) {
for (const task of queue.pending) {
task.reject(new Error(message));
}
}
queues.clear();
}
/** Waits for active drains to settle while rejecting still-pending test writes. */
export async function drainStoreWriterQueuesForTest(
queues: StoreWriterQueues,
message: string,
): Promise<void> {
while (queues.size > 0) {
const activeQueues = [...queues.values()];
for (const queue of activeQueues) {
for (const task of queue.pending) {
task.reject(new Error(message));
}
queue.pending.length = 0;
}
const activeDrains = activeQueues.flatMap((queue) =>
queue.drainPromise ? [queue.drainPromise] : [],
);
if (activeDrains.length === 0) {
queues.clear();
return;
}
await Promise.allSettled(activeDrains);
}
}