fix: deliver compaction hook messages (#76651) (thanks @simplyclever914)

This commit is contained in:
Peter Steinberger
2026-05-03 14:34:32 +01:00
parent 9de06e3dee
commit 825ad57513
8 changed files with 221 additions and 3 deletions

View File

@@ -71,7 +71,7 @@ Docs: https://docs.openclaw.ai
- Channels/secrets: resolve SecretRef-backed channel credentials through external plugin secret contracts after the plugin split, covering runtime startup, target discovery, webhook auth, disabled-account enumeration, and late-bound web_search config. Fixes #76371. (#76449) Thanks @joshavant and @neeravmakwana.
- Docker/Gateway: pass Docker setup `.env` values into gateway and CLI containers and preserve exec SecretRef `passEnv` keys in managed service plans, so 1Password Connect-backed Discord tokens keep resolving after doctor or plugin repair. Thanks @vincentkoc.
- Control UI/WebChat: explain compaction boundaries in chat history and link directly to session checkpoint controls so pre-compaction turns no longer look silently lost after refresh. Fixes #76415. Thanks @BunsDev.
- Agents/compaction: add an optional bundled compaction notifier hook and retry once from the compacted transcript when automatic compaction leaves a turn without a final visible reply. Thanks @simplyclever914.
- Agents/compaction: add an optional bundled compaction notifier hook and retry once from the compacted transcript when automatic compaction leaves a turn without a final visible reply. (#76651) Thanks @simplyclever914.
- Agents/incomplete-turn: detect and surface a warning when the agent's final text after a tool-call chain is silently dropped because the post-tool assistant response was never produced, instead of completing the turn with only the pre-tool analysis text. Fixes #76477. Thanks @amknight.
- Channels/WhatsApp: attach native outbound mention metadata for group text and media captions by resolving `@+<digits>` and `@<digits>` tokens against WhatsApp participant data, including LID groups. Fixes #39879; carries forward #56863. Thanks @kengi1437, @joe2643, and @fridayck.
- Channels/WhatsApp: require outbound mention tokens to end at a word boundary so phone-number prefixes inside longer strings no longer trigger hidden native mentions.
@@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai
- Plugins/install: require OpenClaw-owned install provenance before granting official npm plugin scanner trust, so direct npm package names no longer bypass launch-code scanning while catalog, onboarding, and doctor installs stay trusted. Thanks @fede-kamel and @vincentkoc.
- Network proxy: preserve target TLS hostname validation for Node HTTPS requests routed through the managed HTTP proxy, so Discord-style CONNECT traffic no longer validates certificates against the local proxy host. Fixes #74809. (#76442) Thanks @jesse-merhi and @abnershang.
- Gateway/sessions: keep async `sessions.list` title and preview hydration bounded to transcript head/tail reads so Control UI polling cannot full-scan large session transcripts every refresh. Thanks @vincentkoc.
- Gateway/performance: cache per-run verbose-level session reads, skip a redundant `lsof` scan in `gateway --force` when no listener was killed, and make the Gateway startup benchmark print usage for `--help`.
- Gateway/sessions: keep agent runtime metadata on lightweight `sessions.list` rows so model-only session patches do not make Control UI lose runtime identity. Thanks @vincentkoc.
- Gateway/sessions: keep bulk `sessions.list` rows lightweight by skipping per-row transcript usage fallback, display model inference, and plugin projection, avoiding event-loop stalls in large session stores. Thanks @Marvinthebored and @vincentkoc.
- Gateway/models: keep read-only `models.list` fallbacks on persisted/current metadata and configured rows while using static auth checks, so missing `models.json` files no longer runtime-load provider discovery or stall gateway after restart. Fixes #76382; refs #76360 and #75707. Thanks @trojy13, @RayWoo, @AnathemaOfficial, and @vincentkoc.

View File

@@ -647,6 +647,55 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
tokenCount: 0,
});
});
it("forwards internal compaction hook messages to the caller", async () => {
const onHookMessages = vi.fn();
triggerInternalHook.mockImplementation(async (event: unknown) => {
const hookEvent = event as { action?: string; messages?: string[] };
hookEvent.messages?.push(`${hookEvent.action} notice`);
});
const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({
originalMessages: sessionMessages.slice(1) as AgentMessage[],
currentMessages: sessionMessages.slice(1) as AgentMessage[],
estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number,
});
const hookState = await compactTesting.runBeforeCompactionHooks({
hookRunner,
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionAgentId: "main",
workspaceDir: "/tmp",
metrics: beforeMetrics,
onHookMessages,
});
await compactTesting.runAfterCompactionHooks({
hookRunner,
sessionId: "session-1",
sessionAgentId: "main",
hookSessionKey: hookState.hookSessionKey,
missingSessionKey: hookState.missingSessionKey,
workspaceDir: "/tmp",
messageCountAfter: 1,
tokensAfter: 10,
compactedCount: 1,
sessionFile: "/tmp/session.jsonl",
onHookMessages,
});
expect(onHookMessages).toHaveBeenNthCalledWith(1, {
phase: "before",
messages: ["compact:before notice"],
sessionId: "session-1",
sessionKey: "agent:main:session-1",
});
expect(onHookMessages).toHaveBeenNthCalledWith(2, {
phase: "after",
messages: ["compact:after notice"],
sessionId: "session-1",
sessionKey: "agent:main:session-1",
});
});
it("emits a transcript update after successful compaction", async () => {
const listener = vi.fn();
const cleanup = onSessionTranscriptUpdate(listener);

View File

@@ -1130,6 +1130,7 @@ async function compactEmbeddedPiSessionDirectOnce(
workspaceDir: effectiveWorkspace,
messageProvider: resolvedMessageProvider,
metrics: beforeHookMetrics,
onHookMessages: params.onCompactionHookMessages,
});
const { messageCountOriginal } = beforeHookMetrics;
const diagEnabled = log.isEnabled("debug");
@@ -1314,6 +1315,7 @@ async function compactEmbeddedPiSessionDirectOnce(
summaryLength: typeof result.summary === "string" ? result.summary.length : undefined,
tokensBefore: result.tokensBefore,
firstKeptEntryId: effectiveFirstKeptEntryId,
onHookMessages: params.onCompactionHookMessages,
});
return {
ok: true,

View File

@@ -72,6 +72,12 @@ export type CompactEmbeddedPiSessionParams = {
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
ownerNumbers?: string[];
abortSignal?: AbortSignal;
onCompactionHookMessages?: (payload: {
phase: "before" | "after";
messages: string[];
sessionId: string;
sessionKey: string;
}) => void | Promise<void>;
/** Allow runtime plugins for this compaction to late-bind the gateway subagent. */
allowGatewaySubagentBinding?: boolean;
};

View File

@@ -178,6 +178,12 @@ export async function runBeforeCompactionHooks(params: {
workspaceDir: string;
messageProvider?: string;
metrics: ReturnType<typeof buildBeforeCompactionHookMetrics>;
onHookMessages?: (payload: {
phase: "before";
messages: string[];
sessionId: string;
sessionKey: string;
}) => void | Promise<void>;
}) {
const missingSessionKey = !params.sessionKey || !params.sessionKey.trim();
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
@@ -191,6 +197,14 @@ export async function runBeforeCompactionHooks(params: {
tokenCountOriginal: params.metrics.tokenCountOriginal,
});
await triggerInternalHook(hookEvent);
if (hookEvent.messages.length > 0) {
await params.onHookMessages?.({
phase: "before",
messages: hookEvent.messages.slice(),
sessionId: params.sessionId,
sessionKey: hookSessionKey,
});
}
} catch (err) {
log.warn("session:compact:before hook failed", {
errorMessage: formatErrorMessage(err),
@@ -261,6 +275,12 @@ export async function runAfterCompactionHooks(params: {
summaryLength?: number;
tokensBefore?: number;
firstKeptEntryId?: string;
onHookMessages?: (payload: {
phase: "after";
messages: string[];
sessionId: string;
sessionKey: string;
}) => void | Promise<void>;
}) {
try {
const hookEvent = createInternalHookEvent("session", "compact:after", params.hookSessionKey, {
@@ -275,6 +295,14 @@ export async function runAfterCompactionHooks(params: {
firstKeptEntryId: params.firstKeptEntryId,
});
await triggerInternalHook(hookEvent);
if (hookEvent.messages.length > 0) {
await params.onHookMessages?.({
phase: "after",
messages: hookEvent.messages.slice(),
sessionId: params.sessionId,
sessionKey: params.hookSessionKey,
});
}
} catch (err) {
log.warn("session:compact:after hook failed", {
errorMessage: formatErrorMessage(err),

View File

@@ -901,6 +901,24 @@ export async function runEmbeddedPiAgent(
activeSessionFile = nextSessionFile;
}
};
const onCompactionHookMessages = async (payload: {
phase: "before" | "after";
messages: string[];
}) => {
const messages = payload.messages.filter((message) => message.trim().length > 0);
if (messages.length === 0) {
return;
}
await params.onAgentEvent?.({
stream: "compaction",
data: {
phase: payload.phase === "before" ? "start" : "end",
...(payload.phase === "after" ? { completed: true } : {}),
messages,
},
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
});
};
// When the engine owns compaction, compactEmbeddedPiSessionDirect is
// bypassed. Fire lifecycle hooks here so recovery paths still notify
// subscribers like memory extensions and usage trackers.
@@ -1369,6 +1387,7 @@ export async function runEmbeddedPiAgent(
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
ownerNumbers: params.ownerNumbers,
}),
onCompactionHookMessages,
...(attempt.promptCache ? { promptCache: attempt.promptCache } : {}),
runId: params.runId,
trigger: "timeout_recovery",
@@ -1525,6 +1544,7 @@ export async function runEmbeddedPiAgent(
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
ownerNumbers: params.ownerNumbers,
}),
onCompactionHookMessages,
...(attempt.promptCache ? { promptCache: attempt.promptCache } : {}),
runId: params.runId,
trigger: "overflow",

View File

@@ -1745,6 +1745,78 @@ describe("runAgentTurnWithFallback", () => {
);
});
it("delivers compaction hook messages without duplicating notifyUser notices", async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "compaction",
data: { phase: "start", messages: ["Hook before"] },
});
await params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", completed: true, messages: ["Hook after"] },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun,
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { onBlockReply },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("success");
expect(onBlockReply).toHaveBeenCalledTimes(2);
expect(onBlockReply).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
text: "Hook before",
replyToId: "msg",
replyToCurrent: true,
isCompactionNotice: true,
}),
);
expect(onBlockReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
text: "Hook after",
replyToId: "msg",
replyToCurrent: true,
isCompactionNotice: true,
}),
);
});
it("prefers onCompactionEnd callback over default notice when notifyUser is enabled", async () => {
const onBlockReply = vi.fn();
const onCompactionEnd = vi.fn();

View File

@@ -967,6 +967,31 @@ export async function runAgentTurnWithFallback(params: {
logVerbose(`compaction ${phase} notice delivery failed (non-fatal): ${String(err)}`);
}
};
const readCompactionHookMessages = (value: unknown): string[] => {
if (!Array.isArray(value)) {
return [];
}
return value
.filter((entry): entry is string => typeof entry === "string")
.map((entry) => entry.trim())
.filter((entry) => entry.length > 0);
};
const sendCompactionHookMessages = async (messages: string[]) => {
if (!params.opts?.onBlockReply || messages.length === 0) {
return;
}
const noticePayload = params.applyReplyToMode({
text: messages.join("\n\n"),
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
});
try {
await params.opts.onBlockReply(noticePayload);
} catch (err) {
logVerbose(`compaction hook notice delivery failed (non-fatal): ${String(err)}`);
}
};
const shouldSurfaceToControlUi = isInternalMessageChannel(
params.followupRun.run.messageProvider ??
params.sessionCtx.Surface ??
@@ -1591,12 +1616,19 @@ export async function runAgentTurnWithFallback(params: {
// Track auto-compaction and notify higher layers.
if (evt.stream === "compaction") {
const phase = readStringValue(evt.data.phase) ?? "";
const hookMessages = readCompactionHookMessages(evt.data.messages);
if (phase === "start") {
// Keep custom compaction callbacks active, but gate the
// fallback user-facing notice behind explicit opt-in.
if (params.opts?.onCompactionStart) {
await params.opts.onCompactionStart();
} else if (shouldNotifyUserAboutCompaction) {
}
if (hookMessages.length > 0) {
await sendCompactionHookMessages(hookMessages);
} else if (
!params.opts?.onCompactionStart &&
shouldNotifyUserAboutCompaction
) {
// Send directly via opts.onBlockReply (bypassing the
// pipeline) so the notice does not cause final payloads
// to be discarded on non-streaming model paths.
@@ -1609,9 +1641,17 @@ export async function runAgentTurnWithFallback(params: {
attemptCompactionCount += 1;
if (params.opts?.onCompactionEnd) {
await params.opts.onCompactionEnd();
} else if (shouldNotifyUserAboutCompaction) {
}
if (hookMessages.length > 0) {
await sendCompactionHookMessages(hookMessages);
} else if (
!params.opts?.onCompactionEnd &&
shouldNotifyUserAboutCompaction
) {
await sendCompactionNotice("end");
}
} else if (hookMessages.length > 0) {
await sendCompactionHookMessages(hookMessages);
} else if (shouldNotifyUserAboutCompaction) {
await sendCompactionNotice("incomplete");
}