From 205d6b730f4c7db0283f641c8b7dcad272ad725e Mon Sep 17 00:00:00 2001 From: Fermin Quant <14808645+ferminquant@users.noreply.github.com> Date: Thu, 28 May 2026 16:29:26 -0400 Subject: [PATCH] fix(agents): attribute embedded tool logs to channels Fixes #50565. --- ...mpt.spawn-workspace.context-engine.test.ts | 17 ++++++ .../embedded-agent-runner/run/attempt.ts | 1 + ...sion.subscribeembeddedagentsession.test.ts | 59 +++++++++++++++++++ src/agents/embedded-agent-subscribe.ts | 12 +++- src/agents/embedded-agent-subscribe.types.ts | 2 + 5 files changed, 90 insertions(+), 1 deletion(-) diff --git a/src/agents/embedded-agent-runner/run/attempt.spawn-workspace.context-engine.test.ts b/src/agents/embedded-agent-runner/run/attempt.spawn-workspace.context-engine.test.ts index ccaba4625f3..e6164541dcd 100644 --- a/src/agents/embedded-agent-runner/run/attempt.spawn-workspace.context-engine.test.ts +++ b/src/agents/embedded-agent-runner/run/attempt.spawn-workspace.context-engine.test.ts @@ -2032,6 +2032,23 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { expect(params.sessionKey).toBe(sessionKey); }); + it("forwards the normalized message channel to the embedded subscription", async () => { + await createContextEngineAttemptRunner({ + contextEngine: createContextEngineBootstrapAndAssemble(), + sessionKey, + tempPaths, + attemptOverrides: { + messageChannel: "TELEGRAM", + }, + }); + + const subscriptionParams = requireRecord( + hoisted.subscribeEmbeddedAgentSessionMock.mock.calls[0]?.[0], + "subscription params", + ); + expect(subscriptionParams.messageChannel).toBe("telegram"); + }); + it("skips maintenance when afterTurn fails", async () => { const { bootstrap, assemble } = createContextEngineBootstrapAndAssemble(); const afterTurn = vi.fn(async () => { diff --git a/src/agents/embedded-agent-runner/run/attempt.ts b/src/agents/embedded-agent-runner/run/attempt.ts index 1567afbaa1e..355af44ca76 100644 --- a/src/agents/embedded-agent-runner/run/attempt.ts +++ b/src/agents/embedded-agent-runner/run/attempt.ts @@ -2924,6 +2924,7 @@ export async function runEmbeddedAttempt( buildEmbeddedSubscriptionParams({ session: activeSession, runId: params.runId, + messageChannel: runtimeChannel, initialReplayState: params.initialReplayState, hookRunner: getGlobalHookRunner() ?? undefined, verboseLevel: params.verboseLevel, diff --git a/src/agents/embedded-agent-subscribe.subscribe-embedded-agent-session.subscribeembeddedagentsession.test.ts b/src/agents/embedded-agent-subscribe.subscribe-embedded-agent-session.subscribeembeddedagentsession.test.ts index fe8834667a5..b4d563c39e5 100644 --- a/src/agents/embedded-agent-subscribe.subscribe-embedded-agent-session.subscribeembeddedagentsession.test.ts +++ b/src/agents/embedded-agent-subscribe.subscribe-embedded-agent-session.subscribeembeddedagentsession.test.ts @@ -1,7 +1,12 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import type { AssistantMessage } from "openclaw/plugin-sdk/llm"; import { describe, expect, it, vi } from "vitest"; import { HEARTBEAT_RESPONSE_TOOL_NAME } from "../auto-reply/heartbeat-tool-response.js"; import * as agentEvents from "../infra/agent-events.js"; +import { resetLogger, setLoggerOverride } from "../logging/logger.js"; +import { parseLogLine } from "../logging/parse-log-line.js"; import { THINKING_TAG_CASES, createSubscribedSessionHarness, @@ -114,6 +119,45 @@ describe("subscribeEmbeddedAgentSession", () => { }); } + async function captureToolLifecycleLogSubsystems(messageChannel?: string): Promise { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-tool-log-attribution-")); + const logFile = path.join(tempDir, "openclaw.log"); + try { + setLoggerOverride({ + level: "debug", + consoleLevel: "silent", + file: logFile, + }); + const { emit } = createSubscribedHarness({ + runId: "run-log-attribution", + messageChannel, + }); + + emitToolRun({ + emit, + toolName: "exec", + toolCallId: "tool-log-attribution", + args: { command: "echo ok" }, + isError: false, + result: { ok: true }, + }); + + const logText = await fs.readFile(logFile, "utf8"); + const subsystems: string[] = []; + for (const line of logText.trim().split(/\n+/)) { + const parsed = parseLogLine(line); + if (parsed?.message.includes("embedded run tool")) { + subsystems.push(parsed.subsystem ?? ""); + } + } + return subsystems; + } finally { + resetLogger(); + setLoggerOverride(null); + await fs.rm(tempDir, { recursive: true, force: true }); + } + } + function findBlockReplyPayload( onBlockReply: { mock: { calls: unknown[][] } }, text: string, @@ -195,6 +239,21 @@ describe("subscribeEmbeddedAgentSession", () => { }); }); + it.each([ + ["telegram", "gateway/channels/telegram"], + [undefined, "agent/embedded"], + ["openclaw", "agent/embedded"], + ["not a channel", "agent/embedded"], + ] as const)( + "attributes tool lifecycle logs for channel=%s", + async (messageChannel, subsystem) => { + await expect(captureToolLifecycleLogSubsystems(messageChannel)).resolves.toEqual([ + subsystem, + subsystem, + ]); + }, + ); + it("does not double-count usage when done and message_end carry the same snapshot", () => { const { emit, subscription } = createSubscribedSessionHarness({ runId: "run" }); const usage = { diff --git a/src/agents/embedded-agent-subscribe.ts b/src/agents/embedded-agent-subscribe.ts index b320451697f..6fdbde4f2bb 100644 --- a/src/agents/embedded-agent-subscribe.ts +++ b/src/agents/embedded-agent-subscribe.ts @@ -10,6 +10,7 @@ import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-span import { normalizeOptionalString } from "../shared/string-coerce.js"; import { findFinalTagMatches } from "../shared/text/final-tags.js"; import { hasOrphanReasoningCloseBoundary } from "../shared/text/reasoning-tags.js"; +import { isDeliverableMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js"; import { EmbeddedBlockChunker } from "./embedded-agent-block-chunker.js"; import { isMessagingToolDuplicateNormalized, @@ -59,7 +60,15 @@ const STREAM_STRIPPED_BLOCK_TAG_NAMES = [ "antml:thinking", "antml:thought", ] as const; -const log = createSubsystemLogger("agent/embedded"); +const embeddedLog = createSubsystemLogger("agent/embedded"); + +function resolveEmbeddedAgentSessionLogger(messageChannel?: string) { + const normalizedChannel = normalizeMessageChannel(messageChannel); + if (normalizedChannel && isDeliverableMessageChannel(normalizedChannel)) { + return createSubsystemLogger(`gateway/channels/${normalizedChannel}`); + } + return embeddedLog; +} function isPotentialTrailingBlockTagFragment(fragment: string): boolean { if (!fragment.startsWith("<") || fragment.includes(">")) { @@ -124,6 +133,7 @@ function collectPendingMediaFromInternalEvents( export type { SubscribeEmbeddedAgentSessionParams } from "./embedded-agent-subscribe.types.js"; export function subscribeEmbeddedAgentSession(params: SubscribeEmbeddedAgentSessionParams) { + const log = resolveEmbeddedAgentSessionLogger(params.messageChannel); const reasoningMode = params.reasoningMode ?? "off"; const canShowReasoning = params.thinkingLevel !== "off"; const toolResultFormat = params.toolResultFormat ?? "markdown"; diff --git a/src/agents/embedded-agent-subscribe.types.ts b/src/agents/embedded-agent-subscribe.types.ts index a67172c10f2..25850f70798 100644 --- a/src/agents/embedded-agent-subscribe.types.ts +++ b/src/agents/embedded-agent-subscribe.types.ts @@ -25,6 +25,8 @@ export type { export type SubscribeEmbeddedAgentSessionParams = { session: AgentSession; runId: string; + /** Originating message channel used for subsystem log attribution. */ + messageChannel?: string; initialReplayState?: EmbeddedRunReplayState; hookRunner?: HookRunner; verboseLevel?: VerboseLevel;