mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-15 03:50:40 +00:00
173 lines
5.7 KiB
TypeScript
173 lines
5.7 KiB
TypeScript
import crypto from "node:crypto";
|
|
import { runWithModelFallback } from "../../agents/model-fallback.js";
|
|
import { isCliProvider } from "../../agents/model-selection.js";
|
|
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
|
import { resolveSandboxConfigForAgent, resolveSandboxRuntimeStatus } from "../../agents/sandbox.js";
|
|
import type { OpenClawConfig } from "../../config/config.js";
|
|
import { type SessionEntry, updateSessionStoreEntry } from "../../config/sessions.js";
|
|
import { logVerbose } from "../../globals.js";
|
|
import { registerAgentRunContext } from "../../infra/agent-events.js";
|
|
import type { TemplateContext } from "../templating.js";
|
|
import type { VerboseLevel } from "../thinking.js";
|
|
import type { GetReplyOptions } from "../types.js";
|
|
import {
|
|
buildEmbeddedRunBaseParams,
|
|
buildEmbeddedRunContexts,
|
|
resolveModelFallbackOptions,
|
|
} from "./agent-runner-utils.js";
|
|
import {
|
|
resolveMemoryFlushContextWindowTokens,
|
|
resolveMemoryFlushPromptForRun,
|
|
resolveMemoryFlushSettings,
|
|
shouldRunMemoryFlush,
|
|
} from "./memory-flush.js";
|
|
import type { FollowupRun } from "./queue.js";
|
|
import { incrementCompactionCount } from "./session-updates.js";
|
|
|
|
export async function runMemoryFlushIfNeeded(params: {
|
|
cfg: OpenClawConfig;
|
|
followupRun: FollowupRun;
|
|
sessionCtx: TemplateContext;
|
|
opts?: GetReplyOptions;
|
|
defaultModel: string;
|
|
agentCfgContextTokens?: number;
|
|
resolvedVerboseLevel: VerboseLevel;
|
|
sessionEntry?: SessionEntry;
|
|
sessionStore?: Record<string, SessionEntry>;
|
|
sessionKey?: string;
|
|
storePath?: string;
|
|
isHeartbeat: boolean;
|
|
}): Promise<SessionEntry | undefined> {
|
|
const memoryFlushSettings = resolveMemoryFlushSettings(params.cfg);
|
|
if (!memoryFlushSettings) {
|
|
return params.sessionEntry;
|
|
}
|
|
|
|
const memoryFlushWritable = (() => {
|
|
if (!params.sessionKey) {
|
|
return true;
|
|
}
|
|
const runtime = resolveSandboxRuntimeStatus({
|
|
cfg: params.cfg,
|
|
sessionKey: params.sessionKey,
|
|
});
|
|
if (!runtime.sandboxed) {
|
|
return true;
|
|
}
|
|
const sandboxCfg = resolveSandboxConfigForAgent(params.cfg, runtime.agentId);
|
|
return sandboxCfg.workspaceAccess === "rw";
|
|
})();
|
|
|
|
const shouldFlushMemory =
|
|
memoryFlushSettings &&
|
|
memoryFlushWritable &&
|
|
!params.isHeartbeat &&
|
|
!isCliProvider(params.followupRun.run.provider, params.cfg) &&
|
|
shouldRunMemoryFlush({
|
|
entry:
|
|
params.sessionEntry ??
|
|
(params.sessionKey ? params.sessionStore?.[params.sessionKey] : undefined),
|
|
contextWindowTokens: resolveMemoryFlushContextWindowTokens({
|
|
modelId: params.followupRun.run.model ?? params.defaultModel,
|
|
agentCfgContextTokens: params.agentCfgContextTokens,
|
|
}),
|
|
reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
|
|
softThresholdTokens: memoryFlushSettings.softThresholdTokens,
|
|
});
|
|
|
|
if (!shouldFlushMemory) {
|
|
return params.sessionEntry;
|
|
}
|
|
|
|
let activeSessionEntry = params.sessionEntry;
|
|
const activeSessionStore = params.sessionStore;
|
|
const flushRunId = crypto.randomUUID();
|
|
if (params.sessionKey) {
|
|
registerAgentRunContext(flushRunId, {
|
|
sessionKey: params.sessionKey,
|
|
verboseLevel: params.resolvedVerboseLevel,
|
|
});
|
|
}
|
|
let memoryCompactionCompleted = false;
|
|
const flushSystemPrompt = [
|
|
params.followupRun.run.extraSystemPrompt,
|
|
memoryFlushSettings.systemPrompt,
|
|
]
|
|
.filter(Boolean)
|
|
.join("\n\n");
|
|
try {
|
|
await runWithModelFallback({
|
|
...resolveModelFallbackOptions(params.followupRun.run),
|
|
run: (provider, model) => {
|
|
const { authProfile, embeddedContext, senderContext } = buildEmbeddedRunContexts({
|
|
run: params.followupRun.run,
|
|
sessionCtx: params.sessionCtx,
|
|
hasRepliedRef: params.opts?.hasRepliedRef,
|
|
provider,
|
|
});
|
|
const runBaseParams = buildEmbeddedRunBaseParams({
|
|
run: params.followupRun.run,
|
|
provider,
|
|
model,
|
|
runId: flushRunId,
|
|
authProfile,
|
|
});
|
|
return runEmbeddedPiAgent({
|
|
...embeddedContext,
|
|
...senderContext,
|
|
...runBaseParams,
|
|
prompt: resolveMemoryFlushPromptForRun({
|
|
prompt: memoryFlushSettings.prompt,
|
|
cfg: params.cfg,
|
|
}),
|
|
extraSystemPrompt: flushSystemPrompt,
|
|
onAgentEvent: (evt) => {
|
|
if (evt.stream === "compaction") {
|
|
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
|
|
if (phase === "end") {
|
|
memoryCompactionCompleted = true;
|
|
}
|
|
}
|
|
},
|
|
});
|
|
},
|
|
});
|
|
let memoryFlushCompactionCount =
|
|
activeSessionEntry?.compactionCount ??
|
|
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ??
|
|
0;
|
|
if (memoryCompactionCompleted) {
|
|
const nextCount = await incrementCompactionCount({
|
|
sessionEntry: activeSessionEntry,
|
|
sessionStore: activeSessionStore,
|
|
sessionKey: params.sessionKey,
|
|
storePath: params.storePath,
|
|
});
|
|
if (typeof nextCount === "number") {
|
|
memoryFlushCompactionCount = nextCount;
|
|
}
|
|
}
|
|
if (params.storePath && params.sessionKey) {
|
|
try {
|
|
const updatedEntry = await updateSessionStoreEntry({
|
|
storePath: params.storePath,
|
|
sessionKey: params.sessionKey,
|
|
update: async () => ({
|
|
memoryFlushAt: Date.now(),
|
|
memoryFlushCompactionCount,
|
|
}),
|
|
});
|
|
if (updatedEntry) {
|
|
activeSessionEntry = updatedEntry;
|
|
}
|
|
} catch (err) {
|
|
logVerbose(`failed to persist memory flush metadata: ${String(err)}`);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
logVerbose(`memory flush run failed: ${String(err)}`);
|
|
}
|
|
|
|
return activeSessionEntry;
|
|
}
|