diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index 0a7b7a243af..f427c918b1b 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -385,6 +385,7 @@ public struct AgentEvent: Codable, Sendable { public let seq: Int public let stream: String public let ts: Int + public let spawnedby: String? public let data: [String: AnyCodable] public init( @@ -392,12 +393,14 @@ public struct AgentEvent: Codable, Sendable { seq: Int, stream: String, ts: Int, + spawnedby: String?, data: [String: AnyCodable]) { self.runid = runid self.seq = seq self.stream = stream self.ts = ts + self.spawnedby = spawnedby self.data = data } @@ -406,6 +409,7 @@ public struct AgentEvent: Codable, Sendable { case seq case stream case ts + case spawnedby = "spawnedBy" case data } } @@ -4753,6 +4757,7 @@ public struct ChatInjectParams: Codable, Sendable { public struct ChatEvent: Codable, Sendable { public let runid: String public let sessionkey: String + public let spawnedby: String? public let seq: Int public let state: AnyCodable public let message: AnyCodable? @@ -4764,6 +4769,7 @@ public struct ChatEvent: Codable, Sendable { public init( runid: String, sessionkey: String, + spawnedby: String?, seq: Int, state: AnyCodable, message: AnyCodable?, @@ -4774,6 +4780,7 @@ public struct ChatEvent: Codable, Sendable { { self.runid = runid self.sessionkey = sessionkey + self.spawnedby = spawnedby self.seq = seq self.state = state self.message = message @@ -4786,6 +4793,7 @@ public struct ChatEvent: Codable, Sendable { private enum CodingKeys: String, CodingKey { case runid = "runId" case sessionkey = "sessionKey" + case spawnedby = "spawnedBy" case seq case state case message diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index 0a7b7a243af..f427c918b1b 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -385,6 +385,7 @@ public struct AgentEvent: Codable, Sendable { public let seq: Int public let stream: String public let ts: Int + public let spawnedby: String? public let data: [String: AnyCodable] public init( @@ -392,12 +393,14 @@ public struct AgentEvent: Codable, Sendable { seq: Int, stream: String, ts: Int, + spawnedby: String?, data: [String: AnyCodable]) { self.runid = runid self.seq = seq self.stream = stream self.ts = ts + self.spawnedby = spawnedby self.data = data } @@ -406,6 +409,7 @@ public struct AgentEvent: Codable, Sendable { case seq case stream case ts + case spawnedby = "spawnedBy" case data } } @@ -4753,6 +4757,7 @@ public struct ChatInjectParams: Codable, Sendable { public struct ChatEvent: Codable, Sendable { public let runid: String public let sessionkey: String + public let spawnedby: String? public let seq: Int public let state: AnyCodable public let message: AnyCodable? @@ -4764,6 +4769,7 @@ public struct ChatEvent: Codable, Sendable { public init( runid: String, sessionkey: String, + spawnedby: String?, seq: Int, state: AnyCodable, message: AnyCodable?, @@ -4774,6 +4780,7 @@ public struct ChatEvent: Codable, Sendable { { self.runid = runid self.sessionkey = sessionkey + self.spawnedby = spawnedby self.seq = seq self.state = state self.message = message @@ -4786,6 +4793,7 @@ public struct ChatEvent: Codable, Sendable { private enum CodingKeys: String, CodingKey { case runid = "runId" case sessionkey = "sessionKey" + case spawnedby = "spawnedBy" case seq case state case message diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index 70e86f84300..f76babb0bf2 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -30,6 +30,7 @@ export const AgentEventSchema = Type.Object( seq: Type.Integer({ minimum: 0 }), stream: NonEmptyString, ts: Type.Integer({ minimum: 0 }), + spawnedBy: Type.Optional(NonEmptyString), data: Type.Record(Type.String(), Type.Unknown()), }, { additionalProperties: false }, diff --git a/src/gateway/protocol/schema/logs-chat.ts b/src/gateway/protocol/schema/logs-chat.ts index 642517baeca..ffba8ce86ef 100644 --- a/src/gateway/protocol/schema/logs-chat.ts +++ b/src/gateway/protocol/schema/logs-chat.ts @@ -72,6 +72,7 @@ export const ChatEventSchema = Type.Object( { runId: NonEmptyString, sessionKey: NonEmptyString, + spawnedBy: Type.Optional(NonEmptyString), seq: Type.Integer({ minimum: 0 }), state: Type.Union([ Type.Literal("delta"), diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 8828a2246ab..a9eae4a2f48 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -1538,4 +1538,333 @@ describe("agent event handler", () => { "Disk usage crossed 95 percent on /data and needs cleanup now.", ); }); + + describe("spawnedBy enrichment in chat and agent broadcasts", () => { + it("includes spawnedBy in chat delta broadcasts for subagent sessions", () => { + vi.mocked(loadGatewaySessionRow).mockReturnValue({ + key: "agent:coder:subagent:abc", + kind: "direct", + updatedAt: null, + spawnedBy: "agent:conductor:task:parent-1", + }); + + const { broadcast, nodeSendToSession, handler, chatRunState } = createHarness({ + resolveSessionKeyForRun: () => "agent:coder:subagent:abc", + }); + + chatRunState.registry.add("run-sub-1", { + sessionKey: "agent:coder:subagent:abc", + clientRunId: "client-sub-1", + }); + + handler({ + runId: "run-sub-1", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "hello from subagent" }, + }); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls.length).toBeGreaterThanOrEqual(1); + const [, payload] = chatCalls[0]; + expect(payload).toMatchObject({ + sessionKey: "agent:coder:subagent:abc", + spawnedBy: "agent:conductor:task:parent-1", + state: "delta", + }); + + const nodeCalls = sessionChatCalls(nodeSendToSession); + expect(nodeCalls.length).toBeGreaterThanOrEqual(1); + expect(nodeCalls[0][2]).toMatchObject({ + spawnedBy: "agent:conductor:task:parent-1", + }); + }); + + it("includes spawnedBy in chat final broadcasts for subagent sessions", () => { + vi.mocked(loadGatewaySessionRow).mockReturnValue({ + key: "agent:coder:subagent:abc", + kind: "direct", + updatedAt: null, + spawnedBy: "agent:conductor:task:parent-1", + }); + + const { broadcast, handler, chatRunState } = createHarness({ + resolveSessionKeyForRun: () => "agent:coder:subagent:abc", + }); + + chatRunState.registry.add("run-sub-final", { + sessionKey: "agent:coder:subagent:abc", + clientRunId: "client-sub-final", + }); + + handler({ + runId: "run-sub-final", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "done" }, + }); + + handler({ + runId: "run-sub-final", + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }); + + const chatCalls = chatBroadcastCalls(broadcast); + const finalCall = chatCalls.find(([, p]) => p.state === "final"); + expect(finalCall).toBeDefined(); + expect(finalCall![1]).toMatchObject({ + sessionKey: "agent:coder:subagent:abc", + spawnedBy: "agent:conductor:task:parent-1", + state: "final", + }); + }); + + it("omits spawnedBy from chat broadcasts for non-subagent sessions", () => { + vi.mocked(loadGatewaySessionRow).mockReturnValue({ + key: "agent:main:main", + kind: "direct", + updatedAt: null, + }); + + const { broadcast, handler, chatRunState } = createHarness({ + resolveSessionKeyForRun: () => "agent:main:main", + }); + + chatRunState.registry.add("run-main", { + sessionKey: "agent:main:main", + clientRunId: "client-main", + }); + + handler({ + runId: "run-main", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "hello from main" }, + }); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls.length).toBeGreaterThanOrEqual(1); + expect(chatCalls[0][1]).not.toHaveProperty("spawnedBy"); + }); + + it("skips session row load entirely for session keys that cannot carry lineage", () => { + const { broadcast, handler, chatRunState } = createHarness({ + resolveSessionKeyForRun: () => "agent:main:main", + }); + + chatRunState.registry.add("run-no-lineage", { + sessionKey: "agent:main:main", + clientRunId: "client-no-lineage", + }); + + for (let seq = 1; seq <= 5; seq++) { + handler({ + runId: "run-no-lineage", + seq, + stream: "assistant", + ts: Date.now() + seq * 200, + data: { text: `message ${seq}` }, + }); + } + + // The chat delta path invokes resolveSpawnedBy only. Non-subagent, + // non-acp keys cannot carry spawnedBy (see supportsSpawnLineage in + // sessions-patch.ts), so resolveSpawnedBy must short-circuit without + // ever calling loadGatewaySessionRow on this hot path. + expect(loadGatewaySessionRow).not.toHaveBeenCalled(); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls.length).toBeGreaterThanOrEqual(1); + expect(chatCalls[0][1]).not.toHaveProperty("spawnedBy"); + }); + + it("includes spawnedBy in non-tool agent event broadcasts for subagent sessions", () => { + vi.mocked(loadGatewaySessionRow).mockReturnValue({ + key: "agent:coder:subagent:xyz", + kind: "direct", + updatedAt: null, + spawnedBy: "agent:conductor:task:parent-2", + }); + + const { broadcast, handler } = createHarness({ + resolveSessionKeyForRun: () => "agent:coder:subagent:xyz", + }); + + registerAgentRunContext("run-agent-sub", { sessionKey: "agent:coder:subagent:xyz" }); + + handler({ + runId: "run-agent-sub", + seq: 1, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "start" }, + }); + + const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent"); + expect(agentCalls.length).toBeGreaterThanOrEqual(1); + expect(agentCalls[0][1]).toMatchObject({ + sessionKey: "agent:coder:subagent:xyz", + spawnedBy: "agent:conductor:task:parent-2", + }); + + resetAgentRunContextForTest(); + }); + + it("includes spawnedBy in chat error final broadcasts for subagent sessions", () => { + vi.mocked(loadGatewaySessionRow).mockReturnValue({ + key: "agent:coder:subagent:err", + kind: "direct", + updatedAt: null, + spawnedBy: "agent:conductor:task:parent-err", + }); + + const { broadcast, handler, chatRunState } = createHarness({ + resolveSessionKeyForRun: () => "agent:coder:subagent:err", + lifecycleErrorRetryGraceMs: 0, + }); + + chatRunState.registry.add("run-sub-err", { + sessionKey: "agent:coder:subagent:err", + clientRunId: "client-sub-err", + }); + + handler({ + runId: "run-sub-err", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "partial" }, + }); + + handler({ + runId: "run-sub-err", + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "error", error: "provider failed" }, + }); + + const chatCalls = chatBroadcastCalls(broadcast); + const errorCall = chatCalls.find(([, p]) => p.state === "error"); + expect(errorCall).toBeDefined(); + expect(errorCall![1]).toMatchObject({ + sessionKey: "agent:coder:subagent:err", + spawnedBy: "agent:conductor:task:parent-err", + state: "error", + }); + }); + + it("includes spawnedBy in flushed chat delta for subagent sessions", () => { + let now = 20_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + + vi.mocked(loadGatewaySessionRow).mockReturnValue({ + key: "agent:coder:subagent:flush", + kind: "direct", + updatedAt: null, + spawnedBy: "agent:conductor:task:parent-flush", + }); + + const { broadcast, chatRunState, toolEventRecipients, handler } = createHarness({ + resolveSessionKeyForRun: () => "agent:coder:subagent:flush", + }); + + chatRunState.registry.add("run-sub-flush", { + sessionKey: "agent:coder:subagent:flush", + clientRunId: "client-sub-flush", + }); + registerAgentRunContext("run-sub-flush", { + sessionKey: "agent:coder:subagent:flush", + verboseLevel: "off", + }); + toolEventRecipients.add("run-sub-flush", "conn-flush"); + + handler({ + runId: "run-sub-flush", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "before tool" }, + }); + + now = 20_050; + handler({ + runId: "run-sub-flush", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { text: "before tool expanded" }, + }); + + handler({ + runId: "run-sub-flush", + seq: 3, + stream: "tool", + ts: Date.now(), + data: { phase: "start", name: "exec", toolCallId: "tool-flush-sub" }, + }); + + const chatCalls = chatBroadcastCalls(broadcast); + const flushedDelta = chatCalls.find( + ([, p]) => p.state === "delta" && p.message?.content?.[0]?.text === "before tool expanded", + ); + expect(flushedDelta).toBeDefined(); + expect(flushedDelta![1]).toMatchObject({ + spawnedBy: "agent:conductor:task:parent-flush", + }); + + nowSpy.mockRestore(); + resetAgentRunContextForTest(); + }); + + it("includes spawnedBy in seq gap error broadcasts for subagent sessions", () => { + vi.mocked(loadGatewaySessionRow).mockReturnValue({ + key: "agent:coder:subagent:gap", + kind: "direct", + updatedAt: null, + spawnedBy: "agent:conductor:task:parent-gap", + }); + + const { broadcast, handler } = createHarness({ + resolveSessionKeyForRun: () => "agent:coder:subagent:gap", + }); + + registerAgentRunContext("run-sub-gap", { sessionKey: "agent:coder:subagent:gap" }); + + handler({ + runId: "run-sub-gap", + seq: 1, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "start" }, + }); + + handler({ + runId: "run-sub-gap", + seq: 5, + stream: "assistant", + ts: Date.now(), + data: { text: "skipped seq" }, + }); + + const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent"); + const gapError = agentCalls.find( + ([, p]) => p.stream === "error" && p.data?.reason === "seq gap", + ); + expect(gapError).toBeDefined(); + expect(gapError![1]).toMatchObject({ + sessionKey: "agent:coder:subagent:gap", + spawnedBy: "agent:conductor:task:parent-gap", + data: { reason: "seq gap", expected: 2, received: 5 }, + }); + + resetAgentRunContextForTest(); + }); + }); }); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index baca779f231..0d8d826201b 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -4,6 +4,7 @@ import { getRuntimeConfig } from "../config/io.js"; import { type AgentEventPayload, getAgentRunContext } from "../infra/agent-events.js"; import { detectErrorKind, type ErrorKind } from "../infra/errors.js"; import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js"; +import { isAcpSessionKey, isSubagentSessionKey } from "../sessions/session-key-utils.js"; import { setSafeTimeout } from "../utils/timer-delay.js"; import { normalizeLiveAssistantEventText, @@ -185,6 +186,20 @@ export function createAgentEventHandler({ pendingTerminalLifecycleErrors.delete(runId); }; + // Only subagent/acp keys can carry spawnedBy (mirrors supportsSpawnLineage in + // sessions-patch.ts). Short-circuit everyone else so high-volume chat streams + // do not touch the session store. + const resolveSpawnedBy = (sessionKey: string): string | null => { + if (!isSubagentSessionKey(sessionKey) && !isAcpSessionKey(sessionKey)) { + return null; + } + try { + return loadGatewaySessionRow(sessionKey)?.spawnedBy ?? null; + } catch { + return null; + } + }; + const buildSessionEventSnapshot = (sessionKey: string, evt?: AgentEventPayload) => { const row = loadGatewaySessionRowForSnapshot(sessionKey); const lifecyclePatch = evt @@ -397,9 +412,11 @@ export function createAgentEventHandler({ } chatRunState.deltaSentAt.set(clientRunId, now); chatRunState.deltaLastBroadcastLen.set(clientRunId, mergedText.length); + const spawnedBy = resolveSpawnedBy(sessionKey); const payload = { runId: clientRunId, sessionKey, + ...(spawnedBy && { spawnedBy }), seq, state: "delta" as const, message: { @@ -457,9 +474,11 @@ export function createAgentEventHandler({ } const now = Date.now(); + const spawnedBy = resolveSpawnedBy(sessionKey); const flushPayload = { runId: clientRunId, sessionKey, + ...(spawnedBy && { spawnedBy }), seq, state: "delta" as const, message: { @@ -496,10 +515,12 @@ export function createAgentEventHandler({ chatRunState.rawBuffers.delete(clientRunId); chatRunState.buffers.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId); + const spawnedBy = resolveSpawnedBy(sessionKey); if (jobState === "done") { const payload = { runId: clientRunId, sessionKey, + ...(spawnedBy && { spawnedBy }), seq, state: "final" as const, ...(stopReason && { stopReason }), @@ -519,6 +540,7 @@ export function createAgentEventHandler({ const payload = { runId: clientRunId, sessionKey, + ...(spawnedBy && { spawnedBy }), seq, state: "error" as const, errorMessage: error ? formatForLog(error) : undefined, @@ -569,7 +591,10 @@ export function createAgentEventHandler({ const isAborted = chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId); // Include sessionKey so Control UI can filter tool streams per session. - const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients; + const spawnedBy = sessionKey ? resolveSpawnedBy(sessionKey) : null; + const agentPayload = sessionKey + ? { ...eventForClients, sessionKey, ...(spawnedBy && { spawnedBy }) } + : eventForClients; const last = agentRunSeq.get(evt.runId) ?? 0; const isToolEvent = evt.stream === "tool"; const isItemEvent = evt.stream === "item"; @@ -592,6 +617,7 @@ export function createAgentEventHandler({ stream: "error", ts: Date.now(), sessionKey, + ...(spawnedBy && { spawnedBy }), data: { reason: "seq gap", expected: last + 1,