[Feat] surface spawnedBy in chat and agent broadcast payloads (#63244)

Merged via squash.

Prepared head SHA: ff0fe5db38
Co-authored-by: samzong <13782141+samzong@users.noreply.github.com>
Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com>
Reviewed-by: @frankekn
This commit is contained in:
samzong
2026-04-29 20:48:59 +08:00
committed by GitHub
parent 390a7598c9
commit 443ca4865d
7 changed files with 482 additions and 1 deletions

View File

@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
- Active Memory: return bounded partial recall summaries when the hidden memory sub-agent times out, including the default temporary-transcript path, so useful recovered context is not discarded. (#73219) Thanks @joeykrug.
- Docker setup: add `OPENCLAW_SKIP_ONBOARDING` so automated Docker installs can skip the interactive onboarding step while still applying gateway defaults. (#55518) Thanks @jinjimz.
- Gateway/memory: add a read-only `doctor.memory.remHarness` RPC so operator clients can preview bounded REM dreaming output without running mutation paths. (#66673) Thanks @samzong.
- Gateway/events: surface `spawnedBy` on subagent chat and agent broadcast payloads so clients can route child session events without an extra session lookup. (#63244) Thanks @samzong.
### Fixes

View File

@@ -385,6 +385,7 @@ public struct AgentEvent: Codable, Sendable {
public let seq: Int
public let stream: String
public let ts: Int
public let spawnedby: String?
public let data: [String: AnyCodable]
public init(
@@ -392,12 +393,14 @@ public struct AgentEvent: Codable, Sendable {
seq: Int,
stream: String,
ts: Int,
spawnedby: String?,
data: [String: AnyCodable])
{
self.runid = runid
self.seq = seq
self.stream = stream
self.ts = ts
self.spawnedby = spawnedby
self.data = data
}
@@ -406,6 +409,7 @@ public struct AgentEvent: Codable, Sendable {
case seq
case stream
case ts
case spawnedby = "spawnedBy"
case data
}
}
@@ -4753,6 +4757,7 @@ public struct ChatInjectParams: Codable, Sendable {
public struct ChatEvent: Codable, Sendable {
public let runid: String
public let sessionkey: String
public let spawnedby: String?
public let seq: Int
public let state: AnyCodable
public let message: AnyCodable?
@@ -4764,6 +4769,7 @@ public struct ChatEvent: Codable, Sendable {
public init(
runid: String,
sessionkey: String,
spawnedby: String?,
seq: Int,
state: AnyCodable,
message: AnyCodable?,
@@ -4774,6 +4780,7 @@ public struct ChatEvent: Codable, Sendable {
{
self.runid = runid
self.sessionkey = sessionkey
self.spawnedby = spawnedby
self.seq = seq
self.state = state
self.message = message
@@ -4786,6 +4793,7 @@ public struct ChatEvent: Codable, Sendable {
private enum CodingKeys: String, CodingKey {
case runid = "runId"
case sessionkey = "sessionKey"
case spawnedby = "spawnedBy"
case seq
case state
case message

View File

@@ -385,6 +385,7 @@ public struct AgentEvent: Codable, Sendable {
public let seq: Int
public let stream: String
public let ts: Int
public let spawnedby: String?
public let data: [String: AnyCodable]
public init(
@@ -392,12 +393,14 @@ public struct AgentEvent: Codable, Sendable {
seq: Int,
stream: String,
ts: Int,
spawnedby: String?,
data: [String: AnyCodable])
{
self.runid = runid
self.seq = seq
self.stream = stream
self.ts = ts
self.spawnedby = spawnedby
self.data = data
}
@@ -406,6 +409,7 @@ public struct AgentEvent: Codable, Sendable {
case seq
case stream
case ts
case spawnedby = "spawnedBy"
case data
}
}
@@ -4753,6 +4757,7 @@ public struct ChatInjectParams: Codable, Sendable {
public struct ChatEvent: Codable, Sendable {
public let runid: String
public let sessionkey: String
public let spawnedby: String?
public let seq: Int
public let state: AnyCodable
public let message: AnyCodable?
@@ -4764,6 +4769,7 @@ public struct ChatEvent: Codable, Sendable {
public init(
runid: String,
sessionkey: String,
spawnedby: String?,
seq: Int,
state: AnyCodable,
message: AnyCodable?,
@@ -4774,6 +4780,7 @@ public struct ChatEvent: Codable, Sendable {
{
self.runid = runid
self.sessionkey = sessionkey
self.spawnedby = spawnedby
self.seq = seq
self.state = state
self.message = message
@@ -4786,6 +4793,7 @@ public struct ChatEvent: Codable, Sendable {
private enum CodingKeys: String, CodingKey {
case runid = "runId"
case sessionkey = "sessionKey"
case spawnedby = "spawnedBy"
case seq
case state
case message

View File

@@ -30,6 +30,7 @@ export const AgentEventSchema = Type.Object(
seq: Type.Integer({ minimum: 0 }),
stream: NonEmptyString,
ts: Type.Integer({ minimum: 0 }),
spawnedBy: Type.Optional(NonEmptyString),
data: Type.Record(Type.String(), Type.Unknown()),
},
{ additionalProperties: false },

View File

@@ -72,6 +72,7 @@ export const ChatEventSchema = Type.Object(
{
runId: NonEmptyString,
sessionKey: NonEmptyString,
spawnedBy: Type.Optional(NonEmptyString),
seq: Type.Integer({ minimum: 0 }),
state: Type.Union([
Type.Literal("delta"),

View File

@@ -1538,4 +1538,429 @@ describe("agent event handler", () => {
"Disk usage crossed 95 percent on /data and needs cleanup now.",
);
});
describe("spawnedBy enrichment in chat and agent broadcasts", () => {
it("includes spawnedBy in chat delta broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:abc",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-1",
});
const { broadcast, nodeSendToSession, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:abc",
});
chatRunState.registry.add("run-sub-1", {
sessionKey: "agent:coder:subagent:abc",
clientRunId: "client-sub-1",
});
handler({
runId: "run-sub-1",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "hello from subagent" },
});
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls.length).toBeGreaterThanOrEqual(1);
const [, payload] = chatCalls[0];
expect(payload).toMatchObject({
sessionKey: "agent:coder:subagent:abc",
spawnedBy: "agent:conductor:task:parent-1",
state: "delta",
});
const nodeCalls = sessionChatCalls(nodeSendToSession);
expect(nodeCalls.length).toBeGreaterThanOrEqual(1);
expect(nodeCalls[0][2]).toMatchObject({
spawnedBy: "agent:conductor:task:parent-1",
});
});
it("includes spawnedBy in chat final broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:abc",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-1",
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:abc",
});
chatRunState.registry.add("run-sub-final", {
sessionKey: "agent:coder:subagent:abc",
clientRunId: "client-sub-final",
});
handler({
runId: "run-sub-final",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "done" },
});
handler({
runId: "run-sub-final",
seq: 2,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "end" },
});
const chatCalls = chatBroadcastCalls(broadcast);
const finalCall = chatCalls.find(([, p]) => p.state === "final");
expect(finalCall).toBeDefined();
expect(finalCall![1]).toMatchObject({
sessionKey: "agent:coder:subagent:abc",
spawnedBy: "agent:conductor:task:parent-1",
state: "final",
});
});
it("omits spawnedBy from chat broadcasts for non-subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:main:main",
kind: "direct",
updatedAt: null,
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:main:main",
});
chatRunState.registry.add("run-main", {
sessionKey: "agent:main:main",
clientRunId: "client-main",
});
handler({
runId: "run-main",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "hello from main" },
});
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls.length).toBeGreaterThanOrEqual(1);
expect(chatCalls[0][1]).not.toHaveProperty("spawnedBy");
});
it("skips session row load entirely for session keys that cannot carry lineage", () => {
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:main:main",
});
chatRunState.registry.add("run-no-lineage", {
sessionKey: "agent:main:main",
clientRunId: "client-no-lineage",
});
for (let seq = 1; seq <= 5; seq++) {
handler({
runId: "run-no-lineage",
seq,
stream: "assistant",
ts: Date.now() + seq * 200,
data: { text: `message ${seq}` },
});
}
// The chat delta path invokes resolveSpawnedBy only. Non-subagent,
// non-acp keys cannot carry spawnedBy (see supportsSpawnLineage in
// sessions-patch.ts), so resolveSpawnedBy must short-circuit without
// ever calling loadGatewaySessionRow on this hot path.
expect(loadGatewaySessionRow).not.toHaveBeenCalled();
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls.length).toBeGreaterThanOrEqual(1);
expect(chatCalls[0][1]).not.toHaveProperty("spawnedBy");
});
it("includes spawnedBy in non-tool agent event broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:xyz",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-2",
});
const { broadcast, handler } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:xyz",
});
registerAgentRunContext("run-agent-sub", { sessionKey: "agent:coder:subagent:xyz" });
handler({
runId: "run-agent-sub",
seq: 1,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "start" },
});
const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
expect(agentCalls.length).toBeGreaterThanOrEqual(1);
expect(agentCalls[0][1]).toMatchObject({
sessionKey: "agent:coder:subagent:xyz",
spawnedBy: "agent:conductor:task:parent-2",
});
resetAgentRunContextForTest();
});
it("includes spawnedBy in chat error final broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:err",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-err",
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:err",
lifecycleErrorRetryGraceMs: 0,
});
chatRunState.registry.add("run-sub-err", {
sessionKey: "agent:coder:subagent:err",
clientRunId: "client-sub-err",
});
handler({
runId: "run-sub-err",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "partial" },
});
handler({
runId: "run-sub-err",
seq: 2,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "error", error: "provider failed" },
});
const chatCalls = chatBroadcastCalls(broadcast);
const errorCall = chatCalls.find(([, p]) => p.state === "error");
expect(errorCall).toBeDefined();
expect(errorCall![1]).toMatchObject({
sessionKey: "agent:coder:subagent:err",
spawnedBy: "agent:conductor:task:parent-err",
state: "error",
});
});
it("includes spawnedBy in flushed chat delta for subagent sessions", () => {
let now = 20_000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:flush",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-flush",
});
const { broadcast, chatRunState, toolEventRecipients, handler } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:flush",
});
chatRunState.registry.add("run-sub-flush", {
sessionKey: "agent:coder:subagent:flush",
clientRunId: "client-sub-flush",
});
registerAgentRunContext("run-sub-flush", {
sessionKey: "agent:coder:subagent:flush",
verboseLevel: "off",
});
toolEventRecipients.add("run-sub-flush", "conn-flush");
handler({
runId: "run-sub-flush",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "before tool" },
});
now = 20_050;
handler({
runId: "run-sub-flush",
seq: 2,
stream: "assistant",
ts: Date.now(),
data: { text: "before tool expanded" },
});
handler({
runId: "run-sub-flush",
seq: 3,
stream: "tool",
ts: Date.now(),
data: { phase: "start", name: "exec", toolCallId: "tool-flush-sub" },
});
const chatCalls = chatBroadcastCalls(broadcast);
const flushedDelta = chatCalls.find(
([, p]) => p.state === "delta" && p.message?.content?.[0]?.text === "before tool expanded",
);
expect(flushedDelta).toBeDefined();
expect(flushedDelta![1]).toMatchObject({
spawnedBy: "agent:conductor:task:parent-flush",
});
nowSpy.mockRestore();
resetAgentRunContextForTest();
});
it("includes spawnedBy in seq gap error broadcasts for subagent sessions", () => {
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:gap",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-gap",
});
const { broadcast, handler } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:gap",
});
registerAgentRunContext("run-sub-gap", { sessionKey: "agent:coder:subagent:gap" });
handler({
runId: "run-sub-gap",
seq: 1,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "start" },
});
handler({
runId: "run-sub-gap",
seq: 5,
stream: "assistant",
ts: Date.now(),
data: { text: "skipped seq" },
});
const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
const gapError = agentCalls.find(
([, p]) => p.stream === "error" && p.data?.reason === "seq gap",
);
expect(gapError).toBeDefined();
expect(gapError![1]).toMatchObject({
sessionKey: "agent:coder:subagent:gap",
spawnedBy: "agent:conductor:task:parent-gap",
data: { reason: "seq gap", expected: 2, received: 5 },
});
resetAgentRunContextForTest();
});
it("caches spawnedBy lookup so repeated events for the same subagent session only load the row once", () => {
vi.mocked(loadGatewaySessionRow).mockClear();
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:cache-test",
kind: "direct",
updatedAt: null,
spawnedBy: "agent:conductor:task:parent-cache",
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:cache-test",
});
chatRunState.registry.add("run-cache", {
sessionKey: "agent:coder:subagent:cache-test",
clientRunId: "client-cache",
});
// Fire multiple events for the same session
handler({
runId: "run-cache",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "chunk 1" },
});
handler({
runId: "run-cache",
seq: 2,
stream: "assistant",
ts: Date.now(),
data: { text: "chunk 2" },
});
handler({
runId: "run-cache",
seq: 3,
stream: "lifecycle",
ts: Date.now(),
data: { phase: "end" },
});
// Key assertion: loadGatewaySessionRow called exactly once despite 3 events
expect(loadGatewaySessionRow).toHaveBeenCalledTimes(1);
expect(loadGatewaySessionRow).toHaveBeenCalledWith("agent:coder:subagent:cache-test");
// All broadcasts still have correct spawnedBy
const chatCalls = chatBroadcastCalls(broadcast);
for (const [, payload] of chatCalls) {
expect(payload).toMatchObject({
spawnedBy: "agent:conductor:task:parent-cache",
});
}
});
it("caches null spawnedBy for eligible subagent sessions that lack a spawnedBy value", () => {
vi.mocked(loadGatewaySessionRow).mockClear();
vi.mocked(loadGatewaySessionRow).mockReturnValue({
key: "agent:coder:subagent:no-lineage",
kind: "direct",
updatedAt: null,
// no spawnedBy field
});
const { broadcast, handler, chatRunState } = createHarness({
resolveSessionKeyForRun: () => "agent:coder:subagent:no-lineage",
});
chatRunState.registry.add("run-null", {
sessionKey: "agent:coder:subagent:no-lineage",
clientRunId: "client-null",
});
handler({
runId: "run-null",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "chunk 1" },
});
handler({
runId: "run-null",
seq: 2,
stream: "assistant",
ts: Date.now(),
data: { text: "chunk 2" },
});
// null result is cached — only one DB call despite two events
expect(loadGatewaySessionRow).toHaveBeenCalledTimes(1);
const chatCalls = chatBroadcastCalls(broadcast);
for (const [, payload] of chatCalls) {
expect(payload).not.toHaveProperty("spawnedBy");
}
});
});
});

View File

@@ -4,6 +4,7 @@ import { getRuntimeConfig } from "../config/io.js";
import { type AgentEventPayload, getAgentRunContext } from "../infra/agent-events.js";
import { detectErrorKind, type ErrorKind } from "../infra/errors.js";
import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js";
import { isAcpSessionKey, isSubagentSessionKey } from "../sessions/session-key-utils.js";
import { setSafeTimeout } from "../utils/timer-delay.js";
import {
normalizeLiveAssistantEventText,
@@ -185,6 +186,31 @@ export function createAgentEventHandler({
pendingTerminalLifecycleErrors.delete(runId);
};
// Only subagent/acp keys can carry spawnedBy (mirrors supportsSpawnLineage in
// sessions-patch.ts). Short-circuit everyone else so high-volume chat streams
// do not touch the session store. Results are cached per sessionKey because
// spawnedBy is immutable once set and resolveSpawnedBy sits on the hot event
// path (delta, flush, final, agent, seq-gap).
const spawnedByCache = new Map<string, string | null>();
const resolveSpawnedBy = (sessionKey: string): string | null => {
if (spawnedByCache.has(sessionKey)) {
return spawnedByCache.get(sessionKey)!;
}
// Non-lineage keys return null without polluting the cache; only
// subagent/ACP results (positive or null) are worth memoising.
if (!isSubagentSessionKey(sessionKey) && !isAcpSessionKey(sessionKey)) {
return null;
}
let result: string | null = null;
try {
result = loadGatewaySessionRow(sessionKey)?.spawnedBy ?? null;
} catch {
// result stays null
}
spawnedByCache.set(sessionKey, result);
return result;
};
const buildSessionEventSnapshot = (sessionKey: string, evt?: AgentEventPayload) => {
const row = loadGatewaySessionRowForSnapshot(sessionKey);
const lifecyclePatch = evt
@@ -397,9 +423,11 @@ export function createAgentEventHandler({
}
chatRunState.deltaSentAt.set(clientRunId, now);
chatRunState.deltaLastBroadcastLen.set(clientRunId, mergedText.length);
const spawnedBy = resolveSpawnedBy(sessionKey);
const payload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "delta" as const,
message: {
@@ -457,9 +485,11 @@ export function createAgentEventHandler({
}
const now = Date.now();
const spawnedBy = resolveSpawnedBy(sessionKey);
const flushPayload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "delta" as const,
message: {
@@ -496,10 +526,12 @@ export function createAgentEventHandler({
chatRunState.rawBuffers.delete(clientRunId);
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
const spawnedBy = resolveSpawnedBy(sessionKey);
if (jobState === "done") {
const payload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "final" as const,
...(stopReason && { stopReason }),
@@ -519,6 +551,7 @@ export function createAgentEventHandler({
const payload = {
runId: clientRunId,
sessionKey,
...(spawnedBy && { spawnedBy }),
seq,
state: "error" as const,
errorMessage: error ? formatForLog(error) : undefined,
@@ -569,7 +602,10 @@ export function createAgentEventHandler({
const isAborted =
chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId);
// Include sessionKey so Control UI can filter tool streams per session.
const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients;
const spawnedBy = sessionKey ? resolveSpawnedBy(sessionKey) : null;
const agentPayload = sessionKey
? { ...eventForClients, sessionKey, ...(spawnedBy && { spawnedBy }) }
: eventForClients;
const last = agentRunSeq.get(evt.runId) ?? 0;
const isToolEvent = evt.stream === "tool";
const isItemEvent = evt.stream === "item";
@@ -592,6 +628,7 @@ export function createAgentEventHandler({
stream: "error",
ts: Date.now(),
sessionKey,
...(spawnedBy && { spawnedBy }),
data: {
reason: "seq gap",
expected: last + 1,