mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
fix(memory): reindex archived session transcript updates
This commit is contained in:
@@ -66,6 +66,7 @@ Docs: https://docs.openclaw.ai
|
||||
- CLI/devices: request `operator.admin` for `openclaw devices approve <requestId>` only when the exact pending device request would mint or inherit admin-scoped operator access, while keeping lower-scope approvals on the pairing scope.
|
||||
- Memory/embedding: broaden the embedding reindex retry classifier to include transient socket-layer errors (`fetch failed`, `ECONNRESET`, `socket hang up`, `UND_ERR_*`, `closed`) so memory reindex survives provider network hiccups instead of aborting mid-run. Related #56815, #44166. (#76311) Thanks @buyitsydney.
|
||||
- Memory/sessions: keep rotated and deleted session transcripts (`.jsonl.reset.<iso>` / `.jsonl.deleted.<iso>`) searchable end-to-end by indexing their real content in `buildSessionEntry` instead of short-circuiting to empty entries, and by mapping archive hit paths back to their live transcript stem during `memory_search` visibility filtering so hits are no longer dropped at the guard. `.jsonl.bak.<iso>` backups and compaction checkpoints remain opaque. Refs #56131. Thanks @buyitsydney.
|
||||
- Memory/sessions: emit a `sessionTranscriptUpdate` event when `archiveFileOnDisk` rotates a live session transcript into `.jsonl.reset.<iso>` / `.jsonl.deleted.<iso>` / `.jsonl.bak.<iso>`, and bypass the delta-bytes / delta-messages threshold gate in `processSessionDeltaBatch` for usage-counted archive paths (`.jsonl.reset.<iso>` and `.jsonl.deleted.<iso>`). Without the bypass the archive event was forwarded to the listener but dropped at the threshold check, because an archive is a one-shot file-rename mutation rather than an incremental append and would typically land below the default `deltaBytes: 100000` / `deltaMessages: 50` reindex thresholds. Archives now feed the memory sync incremental path the same way `appendMessage` / compaction / tool-result rewrite / chat inject / command execution events already do. Refs #56131. Thanks @buyitsydney.
|
||||
- Memory/search: keep sqlite-vec optional in packaged installs and point missing-extension recovery at the valid `agents.defaults.memorySearch.store.vector.extensionPath` setting. Thanks @willemsej and @vincentkoc.
|
||||
- Gateway: keep directly requested plugin tools invokable under restrictive tool profiles while preserving explicit deny lists and the HTTP safety deny list, preventing catalog/invoke mismatches that surface as "Tool not available". Thanks @BunsDev.
|
||||
- Gateway/update: allow beta binaries to refresh gateway services when the config was last written by the matching stable release version, avoiding false newer-config downgrade blocks during beta channel updates.
|
||||
|
||||
@@ -0,0 +1,171 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import type {
|
||||
OpenClawConfig,
|
||||
ResolvedMemorySearchConfig,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
import type {
|
||||
MemorySource,
|
||||
MemorySyncProgressUpdate,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { MemoryManagerSyncOps } from "./manager-sync-ops.js";
|
||||
|
||||
type MemoryIndexEntry = {
|
||||
path: string;
|
||||
absPath: string;
|
||||
mtimeMs: number;
|
||||
size: number;
|
||||
hash: string;
|
||||
content?: string;
|
||||
};
|
||||
|
||||
type SyncParams = {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
forceSessions?: boolean;
|
||||
sessionFile?: string;
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
};
|
||||
|
||||
class SessionDeltaHarness extends MemoryManagerSyncOps {
|
||||
protected readonly cfg = {} as OpenClawConfig;
|
||||
protected readonly agentId = "main";
|
||||
protected readonly workspaceDir = "/tmp/openclaw-test-workspace";
|
||||
protected readonly settings = {
|
||||
sync: {
|
||||
sessions: {
|
||||
deltaBytes: 100_000,
|
||||
deltaMessages: 50,
|
||||
postCompactionForce: true,
|
||||
},
|
||||
},
|
||||
} as ResolvedMemorySearchConfig;
|
||||
protected readonly batch = {
|
||||
enabled: false,
|
||||
wait: false,
|
||||
concurrency: 1,
|
||||
pollIntervalMs: 0,
|
||||
timeoutMs: 0,
|
||||
};
|
||||
protected readonly vector = { enabled: false, available: false };
|
||||
protected readonly cache = { enabled: false };
|
||||
protected db = null as unknown as DatabaseSync;
|
||||
|
||||
readonly syncCalls: SyncParams[] = [];
|
||||
|
||||
addPendingSessionFile(sessionFile: string) {
|
||||
this.sessionPendingFiles.add(sessionFile);
|
||||
}
|
||||
|
||||
getDirtySessionFiles(): string[] {
|
||||
return Array.from(this.sessionsDirtyFiles);
|
||||
}
|
||||
|
||||
isSessionsDirty(): boolean {
|
||||
return this.sessionsDirty;
|
||||
}
|
||||
|
||||
async processPendingSessionDeltas(): Promise<void> {
|
||||
await (
|
||||
this as unknown as {
|
||||
processSessionDeltaBatch: () => Promise<void>;
|
||||
}
|
||||
).processSessionDeltaBatch();
|
||||
}
|
||||
|
||||
protected computeProviderKey(): string {
|
||||
return "test";
|
||||
}
|
||||
|
||||
protected async sync(params?: SyncParams): Promise<void> {
|
||||
this.syncCalls.push(params ?? {});
|
||||
}
|
||||
|
||||
protected async withTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
_timeoutMs: number,
|
||||
_message: string,
|
||||
): Promise<T> {
|
||||
return await promise;
|
||||
}
|
||||
|
||||
protected getIndexConcurrency(): number {
|
||||
return 1;
|
||||
}
|
||||
|
||||
protected pruneEmbeddingCacheIfNeeded(): void {}
|
||||
|
||||
protected async indexFile(
|
||||
_entry: MemoryIndexEntry,
|
||||
_options: { source: MemorySource; content?: string },
|
||||
): Promise<void> {}
|
||||
}
|
||||
|
||||
describe("session archive delta bypass", () => {
|
||||
let tmpDir = "";
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-archive-delta-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
async function writeSessionFile(name: string): Promise<string> {
|
||||
const filePath = path.join(tmpDir, name);
|
||||
await fs.writeFile(
|
||||
filePath,
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: { role: "user", content: "short archived session" },
|
||||
}) + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
return filePath;
|
||||
}
|
||||
|
||||
it.each(["reset", "deleted"] as const)(
|
||||
"marks below-threshold %s archives dirty immediately",
|
||||
async (reason) => {
|
||||
const archivePath = await writeSessionFile(
|
||||
`session-a.jsonl.${reason}.2026-05-03T05-38-59.000Z`,
|
||||
);
|
||||
const harness = new SessionDeltaHarness();
|
||||
harness.addPendingSessionFile(archivePath);
|
||||
|
||||
await harness.processPendingSessionDeltas();
|
||||
|
||||
expect(harness.getDirtySessionFiles()).toEqual([archivePath]);
|
||||
expect(harness.isSessionsDirty()).toBe(true);
|
||||
expect(harness.syncCalls).toEqual([{ reason: "session-delta" }]);
|
||||
},
|
||||
);
|
||||
|
||||
it("keeps .jsonl.bak archives on the normal below-threshold delta path", async () => {
|
||||
const bakPath = await writeSessionFile("session-a.jsonl.bak.2026-05-03T05-38-59.000Z");
|
||||
const harness = new SessionDeltaHarness();
|
||||
harness.addPendingSessionFile(bakPath);
|
||||
|
||||
await harness.processPendingSessionDeltas();
|
||||
|
||||
expect(harness.getDirtySessionFiles()).toEqual([]);
|
||||
expect(harness.isSessionsDirty()).toBe(false);
|
||||
expect(harness.syncCalls).toEqual([]);
|
||||
});
|
||||
|
||||
it("keeps live transcripts below the configured thresholds", async () => {
|
||||
const livePath = await writeSessionFile("session-a.jsonl");
|
||||
const harness = new SessionDeltaHarness();
|
||||
harness.addPendingSessionFile(livePath);
|
||||
|
||||
await harness.processPendingSessionDeltas();
|
||||
|
||||
expect(harness.getDirtySessionFiles()).toEqual([]);
|
||||
expect(harness.isSessionsDirty()).toBe(false);
|
||||
expect(harness.syncCalls).toEqual([]);
|
||||
});
|
||||
});
|
||||
@@ -17,6 +17,8 @@ import {
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
import {
|
||||
buildSessionEntry,
|
||||
isSessionArchiveArtifactName,
|
||||
isUsageCountedSessionTranscriptFileName,
|
||||
listSessionFilesForAgent,
|
||||
sessionPathForFile,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
|
||||
@@ -491,6 +493,24 @@ export abstract class MemoryManagerSyncOps {
|
||||
this.sessionPendingFiles.clear();
|
||||
let shouldSync = false;
|
||||
for (const sessionFile of pending) {
|
||||
// Usage-counted session archives (`.jsonl.reset.<iso>` and
|
||||
// `.jsonl.deleted.<iso>`) are one-shot mutation events: the file is
|
||||
// written once by the archive rotation and then never touched again.
|
||||
// They carry no incremental `append` semantics, so the delta-bytes /
|
||||
// delta-messages thresholds (designed for live transcripts accumulating
|
||||
// appended messages) cannot gate them correctly — a short archive
|
||||
// below the threshold would simply never reindex. Mark them dirty
|
||||
// directly and skip the delta accounting.
|
||||
const baseName = path.basename(sessionFile);
|
||||
if (
|
||||
isSessionArchiveArtifactName(baseName) &&
|
||||
isUsageCountedSessionTranscriptFileName(baseName)
|
||||
) {
|
||||
this.sessionsDirtyFiles.add(sessionFile);
|
||||
this.sessionsDirty = true;
|
||||
shouldSync = true;
|
||||
continue;
|
||||
}
|
||||
const delta = await this.updateSessionDelta(sessionFile);
|
||||
if (!delta) {
|
||||
continue;
|
||||
|
||||
@@ -12,7 +12,11 @@ export {
|
||||
type SessionFileEntry,
|
||||
type SessionTranscriptClassification,
|
||||
} from "./host/session-files.js";
|
||||
export { parseUsageCountedSessionIdFromFileName } from "./host/openclaw-runtime-session.js";
|
||||
export {
|
||||
isSessionArchiveArtifactName,
|
||||
isUsageCountedSessionTranscriptFileName,
|
||||
parseUsageCountedSessionIdFromFileName,
|
||||
} from "./host/openclaw-runtime-session.js";
|
||||
export { parseQmdQueryJson, type QmdQueryResult } from "./host/qmd-query-parser.js";
|
||||
export {
|
||||
deriveQmdScopeChannel,
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
onSessionTranscriptUpdate,
|
||||
type SessionTranscriptUpdate,
|
||||
} from "../sessions/transcript-events.js";
|
||||
import { archiveFileOnDisk } from "./session-transcript-files.fs.js";
|
||||
|
||||
const subscriptions: Array<() => void> = [];
|
||||
|
||||
afterEach(() => {
|
||||
while (subscriptions.length > 0) {
|
||||
subscriptions.pop()?.();
|
||||
}
|
||||
});
|
||||
|
||||
describe("archiveFileOnDisk transcript updates", () => {
|
||||
it("emits a session transcript update for the archived path on reset", () => {
|
||||
const updates: SessionTranscriptUpdate[] = [];
|
||||
subscriptions.push(onSessionTranscriptUpdate((update) => updates.push(update)));
|
||||
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "oc-archive-events-reset-"));
|
||||
try {
|
||||
const sessionFile = path.join(tmpDir, "live.jsonl");
|
||||
fs.writeFileSync(sessionFile, '{"type":"session-meta","agentId":"main"}\n');
|
||||
|
||||
const archived = archiveFileOnDisk(sessionFile, "reset");
|
||||
|
||||
expect(fs.existsSync(archived)).toBe(true);
|
||||
expect(fs.existsSync(sessionFile)).toBe(false);
|
||||
expect(archived).toContain(".jsonl.reset.");
|
||||
expect(updates).toHaveLength(1);
|
||||
expect(updates[0].sessionFile).toBe(archived);
|
||||
// Archive does not carry a messageId/message payload — this is a
|
||||
// pure-path mutation notification, matching how compaction-only
|
||||
// emits (sessionFile + sessionKey-only) behave.
|
||||
expect(updates[0].message).toBeUndefined();
|
||||
expect(updates[0].messageId).toBeUndefined();
|
||||
} finally {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("also emits for deleted and bak archive reasons", () => {
|
||||
const updates: SessionTranscriptUpdate[] = [];
|
||||
subscriptions.push(onSessionTranscriptUpdate((update) => updates.push(update)));
|
||||
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "oc-archive-events-mixed-"));
|
||||
try {
|
||||
const deletedSource = path.join(tmpDir, "deleted.jsonl");
|
||||
fs.writeFileSync(deletedSource, "{}\n");
|
||||
const deletedArchived = archiveFileOnDisk(deletedSource, "deleted");
|
||||
|
||||
const bakSource = path.join(tmpDir, "bak.jsonl");
|
||||
fs.writeFileSync(bakSource, "{}\n");
|
||||
const bakArchived = archiveFileOnDisk(bakSource, "bak");
|
||||
|
||||
expect(deletedArchived).toContain(".jsonl.deleted.");
|
||||
expect(bakArchived).toContain(".jsonl.bak.");
|
||||
expect(updates.map((update) => update.sessionFile)).toEqual([deletedArchived, bakArchived]);
|
||||
} finally {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
resolveSessionTranscriptPathInDir,
|
||||
} from "../config/sessions/paths.js";
|
||||
import { resolveRequiredHomeDir } from "../infra/home-dir.js";
|
||||
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
|
||||
type ArchiveFileReason = SessionArchiveReason;
|
||||
export type ArchivedSessionTranscript = {
|
||||
@@ -127,6 +128,16 @@ export function archiveFileOnDisk(filePath: string, reason: ArchiveFileReason):
|
||||
const ts = formatSessionArchiveTimestamp();
|
||||
const archived = `${filePath}.${reason}.${ts}`;
|
||||
fs.renameSync(filePath, archived);
|
||||
// Notify the session transcript subscribers (memory index, sessions-history
|
||||
// HTTP, etc.) that a mutation landed on a session-owned path. Without this
|
||||
// emit the memory sync's incremental path never learns the new archive
|
||||
// exists: chokidar does not watch the sessions directory, and the event bus
|
||||
// is the only channel gateway code uses to signal session-file mutations.
|
||||
// All other in-process mutations (append, compaction, tool-result rewrite,
|
||||
// chat inject, command execution) already emit here; archive was the sole
|
||||
// remaining gap, which is why `.jsonl.reset.<iso>` / `.jsonl.deleted.<iso>`
|
||||
// files only surfaced in the index after a full reindex.
|
||||
emitSessionTranscriptUpdate({ sessionFile: archived });
|
||||
return archived;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user