feat(gateway): surface spawnedBy in chat and agent broadcast payloads

Subagent sessions emitted chat and agent broadcast events with only
sessionKey. Clients had to follow up with sessions.subscribe to learn
the parent, creating a timing race.

Inject spawnedBy at the six previously uncovered broadcast sites
(emitChatDelta, flushBufferedChatDeltaIfNeeded, emitChatFinal done +
error, seq gap error, non-tool agent event). resolveSpawnedBy reads
from the session store directly. It short-circuits for session keys
that cannot carry lineage (mirrors supportsSpawnLineage in
sessions-patch.ts), so the hot chat delta path does not touch the
store for normal sessions.

No cache: spawnedBy is immutable once set, and the only frequent
caller is the subagent/acp hot path which is already filtered by the
lineage key check.

Signed-off-by: samzong <samzong.lu@gmail.com>
This commit is contained in:
samzong
2026-04-17 00:31:10 +08:00
committed by Frank Yang
parent 390a7598c9
commit c3d49d70f9
6 changed files with 374 additions and 1 deletions

View File

@@ -385,6 +385,7 @@ public struct AgentEvent: Codable, Sendable {
public let seq: Int
public let stream: String
public let ts: Int
public let spawnedby: String?
public let data: [String: AnyCodable]
public init(
@@ -392,12 +393,14 @@ public struct AgentEvent: Codable, Sendable {
seq: Int,
stream: String,
ts: Int,
spawnedby: String?,
data: [String: AnyCodable])
{
self.runid = runid
self.seq = seq
self.stream = stream
self.ts = ts
self.spawnedby = spawnedby
self.data = data
}
@@ -406,6 +409,7 @@ public struct AgentEvent: Codable, Sendable {
case seq
case stream
case ts
case spawnedby = "spawnedBy"
case data
}
}
@@ -4753,6 +4757,7 @@ public struct ChatInjectParams: Codable, Sendable {
public struct ChatEvent: Codable, Sendable {
public let runid: String
public let sessionkey: String
public let spawnedby: String?
public let seq: Int
public let state: AnyCodable
public let message: AnyCodable?
@@ -4764,6 +4769,7 @@ public struct ChatEvent: Codable, Sendable {
public init(
runid: String,
sessionkey: String,
spawnedby: String?,
seq: Int,
state: AnyCodable,
message: AnyCodable?,
@@ -4774,6 +4780,7 @@ public struct ChatEvent: Codable, Sendable {
{
self.runid = runid
self.sessionkey = sessionkey
self.spawnedby = spawnedby
self.seq = seq
self.state = state
self.message = message
@@ -4786,6 +4793,7 @@ public struct ChatEvent: Codable, Sendable {
private enum CodingKeys: String, CodingKey {
case runid = "runId"
case sessionkey = "sessionKey"
case spawnedby = "spawnedBy"
case seq
case state
case message

View File

@@ -385,6 +385,7 @@ public struct AgentEvent: Codable, Sendable {
public let seq: Int
public let stream: String
public let ts: Int
public let spawnedby: String?
public let data: [String: AnyCodable]
public init(
@@ -392,12 +393,14 @@ public struct AgentEvent: Codable, Sendable {
seq: Int,
stream: String,
ts: Int,
spawnedby: String?,
data: [String: AnyCodable])
{
self.runid = runid
self.seq = seq
self.stream = stream
self.ts = ts
self.spawnedby = spawnedby
self.data = data
}
@@ -406,6 +409,7 @@ public struct AgentEvent: Codable, Sendable {
case seq
case stream
case ts
case spawnedby = "spawnedBy"
case data
}
}
@@ -4753,6 +4757,7 @@ public struct ChatInjectParams: Codable, Sendable {
public struct ChatEvent: Codable, Sendable {
public let runid: String
public let sessionkey: String
public let spawnedby: String?
public let seq: Int
public let state: AnyCodable
public let message: AnyCodable?
@@ -4764,6 +4769,7 @@ public struct ChatEvent: Codable, Sendable {
public init(
runid: String,
sessionkey: String,
spawnedby: String?,
seq: Int,
state: AnyCodable,
message: AnyCodable?,
@@ -4774,6 +4780,7 @@ public struct ChatEvent: Codable, Sendable {
{
self.runid = runid
self.sessionkey = sessionkey
self.spawnedby = spawnedby
self.seq = seq
self.state = state
self.message = message
@@ -4786,6 +4793,7 @@ public struct ChatEvent: Codable, Sendable {
private enum CodingKeys: String, CodingKey {
case runid = "runId"
case sessionkey = "sessionKey"
case spawnedby = "spawnedBy"
case seq
case state
case message

View File

@@ -30,6 +30,7 @@ export const AgentEventSchema = Type.Object(
seq: Type.Integer({ minimum: 0 }),
stream: NonEmptyString,
ts: Type.Integer({ minimum: 0 }),
spawnedBy: Type.Optional(NonEmptyString),
data: Type.Record(Type.String(), Type.Unknown()),
},
{ additionalProperties: false },

View File

@@ -72,6 +72,7 @@ export const ChatEventSchema = Type.Object(
{
runId: NonEmptyString,
sessionKey: NonEmptyString,
spawnedBy: Type.Optional(NonEmptyString),
seq: Type.Integer({ minimum: 0 }),
state: Type.Union([
Type.Literal("delta"),

View File

@@ -1538,4 +1538,333 @@ describe("agent event handler", () => {
"Disk usage crossed 95 percent on /data and needs cleanup now.",
);
});
describe("spawnedBy enrichment in chat and agent broadcasts", () => {
it("includes spawnedBy in chat delta broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:abc",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-1",
});
const { broadcast, nodeSendToSession, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:abc",
});
chatRunState.registry.add("run-sub-1", {
sessionKey: "agent:coder:subagent:abc",
clientRunId: "client-sub-1",
});
handler({
runId: "run-sub-1",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "hello from subagent" },
});
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls.length).toBeGreaterThanOrEqual(1);
const [, payload] = chatCalls[0];
expect(payload).toMatchObject({
sessionKey: "agent:coder:subagent:abc",
spawnedBy: "agent:conductor:task:parent-1",
state: "delta",
});
const nodeCalls = sessionChatCalls(nodeSendToSession);
expect(nodeCalls.length).toBeGreaterThanOrEqual(1);
expect(nodeCalls[0][2]).toMatchObject({
spawnedBy: "agent:conductor:task:parent-1",
});
});
it("includes spawnedBy in chat final broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:abc",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-1",
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:abc",
});
chatRunState.registry.add("run-sub-final", {
sessionKey: "agent:coder:subagent:abc",
clientRunId: "client-sub-final",
});
handler({
runId: "run-sub-final",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "done" },
});
handler({
runId: "run-sub-final",
seq: 2,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "end" },
});
const chatCalls = chatBroadcastCalls(broadcast);
const finalCall = chatCalls.find(([, p]) => p.state === "final");
expect(finalCall).toBeDefined();
expect(finalCall![1]).toMatchObject({
sessionKey: "agent:coder:subagent:abc",
spawnedBy: "agent:conductor:task:parent-1",
state: "final",
});
});
it("omits spawnedBy from chat broadcasts for non-subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:main:main",
kind: "direct",
updatedAt: null,
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:main:main",
});
chatRunState.registry.add("run-main", {
sessionKey: "agent:main:main",
clientRunId: "client-main",
});
handler({
runId: "run-main",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "hello from main" },
});
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls.length).toBeGreaterThanOrEqual(1);
expect(chatCalls[0][1]).not.toHaveProperty("spawnedBy");
});
it("skips session row load entirely for session keys that cannot carry lineage", () => {
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:main:main",
});
chatRunState.registry.add("run-no-lineage", {
sessionKey: "agent:main:main",
clientRunId: "client-no-lineage",
});
for (let seq = 1; seq <= 5; seq++) {
handler({
runId: "run-no-lineage",
seq,
stream: "assistant",
ts: Date.now() + seq * 200,
data: { text: `message ${seq}` },
});
}
// The chat delta path invokes resolveSpawnedBy only. Non-subagent,
// non-acp keys cannot carry spawnedBy (see supportsSpawnLineage in
// sessions-patch.ts), so resolveSpawnedBy must short-circuit without
// ever calling loadGatewaySessionRow on this hot path.
expect(loadGatewaySessionRow).not.toHaveBeenCalled();
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls.length).toBeGreaterThanOrEqual(1);
expect(chatCalls[0][1]).not.toHaveProperty("spawnedBy");
});
it("includes spawnedBy in non-tool agent event broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:xyz",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-2",
});
const { broadcast, handler } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:xyz",
});
registerAgentRunContext("run-agent-sub", { sessionKey: "agent:coder:subagent:xyz" });
handler({
runId: "run-agent-sub",
seq: 1,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "start" },
});
const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
expect(agentCalls.length).toBeGreaterThanOrEqual(1);
expect(agentCalls[0][1]).toMatchObject({
sessionKey: "agent:coder:subagent:xyz",
spawnedBy: "agent:conductor:task:parent-2",
});
resetAgentRunContextForTest();
});
it("includes spawnedBy in chat error final broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:err",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-err",
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:err",
lifecycleErrorRetryGraceMs: 0,
});
chatRunState.registry.add("run-sub-err", {
sessionKey: "agent:coder:subagent:err",
clientRunId: "client-sub-err",
});
handler({
runId: "run-sub-err",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "partial" },
});
handler({
runId: "run-sub-err",
seq: 2,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "error", error: "provider failed" },
});
const chatCalls = chatBroadcastCalls(broadcast);
const errorCall = chatCalls.find(([, p]) => p.state === "error");
expect(errorCall).toBeDefined();
expect(errorCall![1]).toMatchObject({
sessionKey: "agent:coder:subagent:err",
spawnedBy: "agent:conductor:task:parent-err",
state: "error",
});
});
it("includes spawnedBy in flushed chat delta for subagent sessions", () => {
let now = 20_000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:flush",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-flush",
});
const { broadcast, chatRunState, toolEventRecipients, handler } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:flush",
});
chatRunState.registry.add("run-sub-flush", {
sessionKey: "agent:coder:subagent:flush",
clientRunId: "client-sub-flush",
});
registerAgentRunContext("run-sub-flush", {
sessionKey: "agent:coder:subagent:flush",
verboseLevel: "off",
});
toolEventRecipients.add("run-sub-flush", "conn-flush");
handler({
runId: "run-sub-flush",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "before tool" },
});
now = 20_050;
handler({
runId: "run-sub-flush",
seq: 2,
stream: "assistant",
ts: Date.now(),
data: { text: "before tool expanded" },
});
handler({
runId: "run-sub-flush",
seq: 3,
stream: "tool",
ts: Date.now(),
data: { phase: "start", name: "exec", toolCallId: "tool-flush-sub" },
});
const chatCalls = chatBroadcastCalls(broadcast);
const flushedDelta = chatCalls.find(
([, p]) => p.state === "delta" && p.message?.content?.[0]?.text === "before tool expanded",
);
expect(flushedDelta).toBeDefined();
expect(flushedDelta![1]).toMatchObject({
spawnedBy: "agent:conductor:task:parent-flush",
});
nowSpy.mockRestore();
resetAgentRunContextForTest();
});
it("includes spawnedBy in seq gap error broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:gap",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-gap",
});
const { broadcast, handler } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:gap",
});
registerAgentRunContext("run-sub-gap", { sessionKey: "agent:coder:subagent:gap" });
handler({
runId: "run-sub-gap",
seq: 1,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "start" },
});
handler({
runId: "run-sub-gap",
seq: 5,
stream: "assistant",
ts: Date.now(),
data: { text: "skipped seq" },
});
const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
const gapError = agentCalls.find(
([, p]) => p.stream === "error" && p.data?.reason === "seq gap",
);
expect(gapError).toBeDefined();
expect(gapError![1]).toMatchObject({
sessionKey: "agent:coder:subagent:gap",
spawnedBy: "agent:conductor:task:parent-gap",
data: { reason: "seq gap", expected: 2, received: 5 },
});
resetAgentRunContextForTest();
});
});
});

View File

@@ -4,6 +4,7 @@ import { getRuntimeConfig } from "../config/io.js";
import { type AgentEventPayload, getAgentRunContext } from "../infra/agent-events.js";
import { detectErrorKind, type ErrorKind } from "../infra/errors.js";
import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js";
import { isAcpSessionKey, isSubagentSessionKey } from "../sessions/session-key-utils.js";
import { setSafeTimeout } from "../utils/timer-delay.js";
import {
normalizeLiveAssistantEventText,
@@ -185,6 +186,20 @@ export function createAgentEventHandler({
pendingTerminalLifecycleErrors.delete(runId);
};
// Only subagent/acp keys can carry spawnedBy (mirrors supportsSpawnLineage in
// sessions-patch.ts). Short-circuit everyone else so high-volume chat streams
// do not touch the session store.
const resolveSpawnedBy = (sessionKey: string): string | null => {
if (!isSubagentSessionKey(sessionKey) && !isAcpSessionKey(sessionKey)) {
return null;
}
try {
return loadGatewaySessionRow(sessionKey)?.spawnedBy ?? null;
} catch {
return null;
}
};
const buildSessionEventSnapshot = (sessionKey: string, evt?: AgentEventPayload) => {
const row = loadGatewaySessionRowForSnapshot(sessionKey);
const lifecyclePatch = evt
@@ -397,9 +412,11 @@ export function createAgentEventHandler({
}
chatRunState.deltaSentAt.set(clientRunId, now);
chatRunState.deltaLastBroadcastLen.set(clientRunId, mergedText.length);
const spawnedBy = resolveSpawnedBy(sessionKey);
const payload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "delta" as const,
message: {
@@ -457,9 +474,11 @@ export function createAgentEventHandler({
}
const now = Date.now();
const spawnedBy = resolveSpawnedBy(sessionKey);
const flushPayload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "delta" as const,
message: {
@@ -496,10 +515,12 @@ export function createAgentEventHandler({
chatRunState.rawBuffers.delete(clientRunId);
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
const spawnedBy = resolveSpawnedBy(sessionKey);
if (jobState === "done") {
const payload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "final" as const,
...(stopReason && { stopReason }),
@@ -519,6 +540,7 @@ export function createAgentEventHandler({
const payload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "error" as const,
errorMessage: error ? formatForLog(error) : undefined,
@@ -569,7 +591,10 @@ export function createAgentEventHandler({
const isAborted =
chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId);
// Include sessionKey so Control UI can filter tool streams per session.
const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients;
const spawnedBy = sessionKey ? resolveSpawnedBy(sessionKey) : null;
const agentPayload = sessionKey
? { ...eventForClients, sessionKey, ...(spawnedBy && { spawnedBy }) }
: eventForClients;
const last = agentRunSeq.get(evt.runId) ?? 0;
const isToolEvent = evt.stream === "tool";
const isItemEvent = evt.stream === "item";
@@ -592,6 +617,7 @@ export function createAgentEventHandler({
stream: "error",
ts: Date.now(),
sessionKey,
...(spawnedBy && { spawnedBy }),
data: {
reason: "seq gap",
expected: last + 1,