fix(agents): attribute embedded tool logs to channels

Fixes #50565.
This commit is contained in:
Fermin Quant
2026-05-28 16:29:26 -04:00
committed by GitHub
parent a661506b0f
commit 205d6b730f
5 changed files with 90 additions and 1 deletions

View File

@@ -2032,6 +2032,23 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
expect(params.sessionKey).toBe(sessionKey);
});
it("forwards the normalized message channel to the embedded subscription", async () => {
await createContextEngineAttemptRunner({
contextEngine: createContextEngineBootstrapAndAssemble(),
sessionKey,
tempPaths,
attemptOverrides: {
messageChannel: "TELEGRAM",
},
});
const subscriptionParams = requireRecord(
hoisted.subscribeEmbeddedAgentSessionMock.mock.calls[0]?.[0],
"subscription params",
);
expect(subscriptionParams.messageChannel).toBe("telegram");
});
it("skips maintenance when afterTurn fails", async () => {
const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble();
const afterTurn = vi.fn(async () => {

View File

@@ -2924,6 +2924,7 @@ export async function runEmbeddedAttempt(
buildEmbeddedSubscriptionParams({
session: activeSession,
runId: params.runId,
messageChannel: runtimeChannel,
initialReplayState: params.initialReplayState,
hookRunner: getGlobalHookRunner() ?? undefined,
verboseLevel: params.verboseLevel,

View File

@@ -1,7 +1,12 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { AssistantMessage } from "openclaw/plugin-sdk/llm";
import { describe, expect, it, vi } from "vitest";
import { HEARTBEAT_RESPONSE_TOOL_NAME } from "../auto-reply/heartbeat-tool-response.js";
import * as agentEvents from "../infra/agent-events.js";
import { resetLogger, setLoggerOverride } from "../logging/logger.js";
import { parseLogLine } from "../logging/parse-log-line.js";
import {
THINKING_TAG_CASES,
createSubscribedSessionHarness,
@@ -114,6 +119,45 @@ describe("subscribeEmbeddedAgentSession", () => {
});
}
async function captureToolLifecycleLogSubsystems(messageChannel?: string): Promise<string[]> {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-tool-log-attribution-"));
const logFile = path.join(tempDir, "openclaw.log");
try {
setLoggerOverride({
level: "debug",
consoleLevel: "silent",
file: logFile,
});
const { emit } = createSubscribedHarness({
runId: "run-log-attribution",
messageChannel,
});
emitToolRun({
emit,
toolName: "exec",
toolCallId: "tool-log-attribution",
args: { command: "echo ok" },
isError: false,
result: { ok: true },
});
const logText = await fs.readFile(logFile, "utf8");
const subsystems: string[] = [];
for (const line of logText.trim().split(/\n+/)) {
const parsed = parseLogLine(line);
if (parsed?.message.includes("embedded run tool")) {
subsystems.push(parsed.subsystem ?? "");
}
}
return subsystems;
} finally {
resetLogger();
setLoggerOverride(null);
await fs.rm(tempDir, { recursive: true, force: true });
}
}
function findBlockReplyPayload(
onBlockReply: { mock: { calls: unknown[][] } },
text: string,
@@ -195,6 +239,21 @@ describe("subscribeEmbeddedAgentSession", () => {
});
});
it.each([
["telegram", "gateway/channels/telegram"],
[undefined, "agent/embedded"],
["openclaw", "agent/embedded"],
["not a channel", "agent/embedded"],
] as const)(
"attributes tool lifecycle logs for channel=%s",
async (messageChannel, subsystem) => {
await expect(captureToolLifecycleLogSubsystems(messageChannel)).resolves.toEqual([
subsystem,
subsystem,
]);
},
);
it("does not double-count usage when done and message_end carry the same snapshot", () => {
const { emit, subscription } = createSubscribedSessionHarness({ runId: "run" });
const usage = {

View File

@@ -10,6 +10,7 @@ import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-span
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { findFinalTagMatches } from "../shared/text/final-tags.js";
import { hasOrphanReasoningCloseBoundary } from "../shared/text/reasoning-tags.js";
import { isDeliverableMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js";
import { EmbeddedBlockChunker } from "./embedded-agent-block-chunker.js";
import {
isMessagingToolDuplicateNormalized,
@@ -59,7 +60,15 @@ const STREAM_STRIPPED_BLOCK_TAG_NAMES = [
"antml:thinking",
"antml:thought",
] as const;
const log = createSubsystemLogger("agent/embedded");
const embeddedLog = createSubsystemLogger("agent/embedded");
function resolveEmbeddedAgentSessionLogger(messageChannel?: string) {
const normalizedChannel = normalizeMessageChannel(messageChannel);
if (normalizedChannel && isDeliverableMessageChannel(normalizedChannel)) {
return createSubsystemLogger(`gateway/channels/${normalizedChannel}`);
}
return embeddedLog;
}
function isPotentialTrailingBlockTagFragment(fragment: string): boolean {
if (!fragment.startsWith("<") || fragment.includes(">")) {
@@ -124,6 +133,7 @@ function collectPendingMediaFromInternalEvents(
export type { SubscribeEmbeddedAgentSessionParams } from "./embedded-agent-subscribe.types.js";
export function subscribeEmbeddedAgentSession(params: SubscribeEmbeddedAgentSessionParams) {
const log = resolveEmbeddedAgentSessionLogger(params.messageChannel);
const reasoningMode = params.reasoningMode ?? "off";
const canShowReasoning = params.thinkingLevel !== "off";
const toolResultFormat = params.toolResultFormat ?? "markdown";

View File

@@ -25,6 +25,8 @@ export type {
export type SubscribeEmbeddedAgentSessionParams = {
session: AgentSession;
runId: string;
/** Originating message channel used for subsystem log attribution. */
messageChannel?: string;
initialReplayState?: EmbeddedRunReplayState;
hookRunner?: HookRunner;
verboseLevel?: VerboseLevel;