mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 08:10:44 +00:00
fix(gateway): bound async session list transcript reads
Keep async sessions.list title/preview hydration on bounded transcript head/tail reads instead of full transcript index builds. Validation: - pnpm test:serial src/gateway/session-utils.fs.test.ts - pnpm test:serial src/gateway/server.sessions.list-changed.test.ts - pnpm exec oxfmt --check --threads=1 src/gateway/session-utils.fs.ts src/gateway/session-utils.fs.test.ts CHANGELOG.md - OPENCLAW_TESTBOX=1 pnpm check:changed on tbx_01kqnw1j8japk3d8z24s6cv141
This commit is contained in:
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Gateway/sessions: keep async `sessions.list` title and preview hydration bounded to transcript head/tail reads so Control UI polling cannot full-scan large session transcripts every refresh. Thanks @vincentkoc.
|
||||
- Gateway: preserve stack diagnostics when `chat.send` or agent attachment parsing/staging fails, improving image-send failure triage. Refs #63432. (#75135) Thanks @keen0206.
|
||||
- Maintainer workflow: push prepared PR heads through GitHub's verified commit API by default and require an explicit override before git-protocol pushes can publish unsigned commits. Thanks @BunsDev.
|
||||
- Feishu: resolve setup/status probes through the selected/default account so multi-account configs with account-scoped app credentials show as configured and probeable. Fixes #72930. Thanks @brokemac79.
|
||||
|
||||
@@ -24,6 +24,7 @@ import {
|
||||
readSessionMessagesAsync,
|
||||
readSessionMessages,
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
readSessionTitleFieldsFromTranscriptAsync,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
resolveSessionTranscriptCandidates,
|
||||
} from "./session-utils.fs.js";
|
||||
@@ -471,6 +472,23 @@ describe("readSessionTitleFieldsFromTranscript cache", () => {
|
||||
expect(readSpy.mock.calls.length).toBeGreaterThan(readsAfterFirst);
|
||||
readSpy.mockRestore();
|
||||
});
|
||||
|
||||
test("keeps async title extraction bounded like the sync path", async () => {
|
||||
const sessionId = "test-cache-async-bounded";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
...Array.from({ length: 30 }, (_, index) => ({
|
||||
message: { role: "assistant", content: `filler ${index} ${"x".repeat(512)}` },
|
||||
})),
|
||||
{ message: { role: "user", content: "late title should not require a full scan" } },
|
||||
{ message: { role: "assistant", content: "tail preview" } },
|
||||
]);
|
||||
|
||||
await expect(readSessionTitleFieldsFromTranscriptAsync(sessionId, storePath)).resolves.toEqual({
|
||||
firstUserMessage: null,
|
||||
lastMessagePreview: "tail preview",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("readSessionMessages", () => {
|
||||
|
||||
@@ -42,6 +42,7 @@ const transcriptMessageCountCache = new Map<
|
||||
>();
|
||||
const MAX_TRANSCRIPT_MESSAGE_COUNT_CACHE_ENTRIES = 5000;
|
||||
const TRANSCRIPT_ASYNC_READ_CHUNK_BYTES = 64 * 1024;
|
||||
type TranscriptFileHandle = Awaited<ReturnType<typeof fs.promises.open>>;
|
||||
|
||||
function readSessionTitleFieldsCacheKey(
|
||||
filePath: string,
|
||||
@@ -813,43 +814,47 @@ export async function readSessionTitleFieldsFromTranscriptAsync(
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
const index = await readSessionTranscriptIndex(filePath);
|
||||
if (!index) {
|
||||
|
||||
if (stat.size === 0) {
|
||||
const empty = { firstUserMessage: null, lastMessagePreview: null };
|
||||
setCachedSessionTitleFields(cacheKey, stat, empty);
|
||||
return empty;
|
||||
}
|
||||
|
||||
let handle: TranscriptFileHandle | null = null;
|
||||
try {
|
||||
handle = await fs.promises.open(filePath, "r");
|
||||
|
||||
let firstUserMessage: string | null = null;
|
||||
try {
|
||||
const chunk = await readTranscriptHeadChunkAsync(handle);
|
||||
if (chunk) {
|
||||
firstUserMessage = extractFirstUserMessageFromTranscriptChunk(chunk, opts);
|
||||
}
|
||||
} catch {
|
||||
// ignore head read errors
|
||||
}
|
||||
|
||||
let lastMessagePreview: string | null = null;
|
||||
try {
|
||||
lastMessagePreview = await readLastMessagePreviewFromOpenTranscriptAsync({
|
||||
handle,
|
||||
size: stat.size,
|
||||
});
|
||||
} catch {
|
||||
// ignore tail read errors
|
||||
}
|
||||
|
||||
const result = { firstUserMessage, lastMessagePreview };
|
||||
setCachedSessionTitleFields(cacheKey, stat, result);
|
||||
return result;
|
||||
} catch {
|
||||
return { firstUserMessage: null, lastMessagePreview: null };
|
||||
}
|
||||
|
||||
let firstUserMessage: string | null = null;
|
||||
for (const entry of index.entries) {
|
||||
const msg = entry.record.message as TranscriptMessage | undefined;
|
||||
if (msg?.role !== "user") {
|
||||
continue;
|
||||
}
|
||||
if (opts?.includeInterSession !== true && hasInterSessionUserProvenance(msg)) {
|
||||
continue;
|
||||
}
|
||||
const text = extractTextFromContent(msg.content);
|
||||
if (text) {
|
||||
firstUserMessage = text;
|
||||
break;
|
||||
} finally {
|
||||
if (handle) {
|
||||
await handle.close().catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
let lastMessagePreview: string | null = null;
|
||||
for (const entry of index.entries.toReversed()) {
|
||||
const msg = entry.record.message as TranscriptMessage | undefined;
|
||||
if (!msg || (msg.role !== "user" && msg.role !== "assistant")) {
|
||||
continue;
|
||||
}
|
||||
const text = extractTextFromContent(msg.content);
|
||||
if (text) {
|
||||
lastMessagePreview = text;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const result = { firstUserMessage, lastMessagePreview };
|
||||
setCachedSessionTitleFields(cacheKey, stat, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
function extractTextFromContent(content: TranscriptMessage["content"]): string | null {
|
||||
@@ -883,6 +888,18 @@ function readTranscriptHeadChunk(fd: number, maxBytes = 8192): string | null {
|
||||
return buf.toString("utf-8", 0, bytesRead);
|
||||
}
|
||||
|
||||
async function readTranscriptHeadChunkAsync(
|
||||
handle: TranscriptFileHandle,
|
||||
maxBytes = 8192,
|
||||
): Promise<string | null> {
|
||||
const buffer = Buffer.alloc(maxBytes);
|
||||
const { bytesRead } = await handle.read(buffer, 0, buffer.length, 0);
|
||||
if (bytesRead <= 0) {
|
||||
return null;
|
||||
}
|
||||
return buffer.toString("utf-8", 0, bytesRead);
|
||||
}
|
||||
|
||||
function extractFirstUserMessageFromTranscriptChunk(
|
||||
chunk: string,
|
||||
opts?: { includeInterSession?: boolean },
|
||||
@@ -993,6 +1010,41 @@ function readLastMessagePreviewFromOpenTranscript(params: {
|
||||
return null;
|
||||
}
|
||||
|
||||
async function readLastMessagePreviewFromOpenTranscriptAsync(params: {
|
||||
handle: TranscriptFileHandle;
|
||||
size: number;
|
||||
}): Promise<string | null> {
|
||||
const readStart = Math.max(0, params.size - LAST_MSG_MAX_BYTES);
|
||||
const readLen = Math.min(params.size, LAST_MSG_MAX_BYTES);
|
||||
const buffer = Buffer.alloc(readLen);
|
||||
const { bytesRead } = await params.handle.read(buffer, 0, readLen, readStart);
|
||||
if (bytesRead <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const chunk = buffer.toString("utf-8", 0, bytesRead);
|
||||
const lines = chunk.split(/\r?\n/).filter((line) => line.trim());
|
||||
const tailLines = lines.slice(-LAST_MSG_MAX_LINES);
|
||||
|
||||
for (let i = tailLines.length - 1; i >= 0; i--) {
|
||||
const line = tailLines[i];
|
||||
try {
|
||||
const parsed = JSON.parse(line);
|
||||
const msg = parsed?.message as TranscriptMessage | undefined;
|
||||
if (msg?.role !== "user" && msg?.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
const text = extractTextFromContent(msg.content);
|
||||
if (text) {
|
||||
return text;
|
||||
}
|
||||
} catch {
|
||||
// skip malformed
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function readLastMessagePreviewFromTranscript(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
|
||||
Reference in New Issue
Block a user