fix(gateway): track plugin subagent runs in agent handler

Plugin SDK subagent runs now register at the Gateway agent acceptance boundary so subagent_ended hooks fire without creating duplicate CLI task rows.

The registration stays best-effort: if the subagent registry cannot persist tracking state, the run still dispatches and falls back to the existing CLI task tracking path.

Closes #59164

Co-authored-by: Cornna <96944678+ymylive@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-05-31 17:56:40 +01:00
parent 21dcf2dd99
commit 6f2fbaaaf8
5 changed files with 499 additions and 5 deletions

View File

@@ -5,6 +5,11 @@ import {
registerExecApprovalFollowupRuntimeHandoff,
resetExecApprovalFollowupRuntimeHandoffsForTests,
} from "../../agents/bash-tools.exec-approval-followup-state.js";
import {
getSubagentRunByChildSessionKey,
resetSubagentRegistryForTests,
testing as subagentRegistryTesting,
} from "../../agents/subagent-registry.js";
import {
getDetachedTaskLifecycleRuntime,
resetDetachedTaskLifecycleRuntimeForTests,
@@ -12,6 +17,7 @@ import {
} from "../../tasks/detached-task-runtime.js";
import {
findTaskByRunId,
listTaskRecords,
markTaskTerminalById,
resetTaskRegistryForTests,
} from "../../tasks/task-registry.js";
@@ -508,6 +514,8 @@ describe("gateway agent handler", () => {
}
resetDetachedTaskLifecycleRuntimeForTests();
resetTaskRegistryForTests();
resetSubagentRegistryForTests({ persist: false });
subagentRegistryTesting.setDepsForTest();
mocks.loadConfigReturn = {};
mocks.emitGatewaySessionEndPluginHook.mockReset();
mocks.emitGatewaySessionStartPluginHook.mockReset();
@@ -2801,6 +2809,200 @@ describe("gateway agent handler", () => {
});
});
it("tracks plugin SDK subagent agent runs through the subagent registry only", async () => {
await withTempDir({ prefix: "openclaw-gateway-plugin-subagent-task-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSubagentRegistryForTests({ persist: false });
const runId = "plugin-subagent-task-run";
const childSessionKey = "agent:work:subagent:plugin-helper";
const cfg = {
session: { mainKey: "main", scope: "per-sender" },
agents: { list: [{ id: "main", default: true }, { id: "work" }] },
};
mocks.listAgentIds.mockReturnValue(["main", "work"]);
mocks.loadConfigReturn = cfg;
mocks.loadSessionEntry.mockReturnValue({
cfg,
storePath: "/tmp/sessions.json",
entry: {
sessionId: "plugin-subagent-session",
updatedAt: Date.now(),
},
canonicalKey: childSessionKey,
});
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
const store: Record<string, unknown> = {
[childSessionKey]: {
sessionId: "plugin-subagent-session",
updatedAt: Date.now(),
},
};
return await updater(store);
});
mocks.agentCommand.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: { durationMs: 100 },
});
const context = makeContext();
const baseClient = requireValue(backendGatewayClient(), "expected backend client");
const pluginClient: AgentHandlerArgs["client"] = {
connect: baseClient.connect,
internal: {
...baseClient.internal,
agentRunTracking: "plugin_subagent",
pluginRuntimeOwnerId: "memory-core",
},
};
await invokeAgent(
{
message: "background plugin subagent task",
sessionKey: childSessionKey,
idempotencyKey: runId,
},
{
context,
reqId: runId,
client: pluginClient,
},
);
await waitForAssertion(() => {
const tasks = listTaskRecords().filter((task) => task.runId === runId);
expect(tasks).toHaveLength(1);
const task = requireValue(tasks[0], "expected one plugin subagent task");
expectRecordFields(task, {
runtime: "subagent",
childSessionKey,
ownerKey: "agent:work:main",
label: "plugin:memory-core",
task: "background plugin subagent task",
deliveryStatus: "not_applicable",
});
expect(task.runtime).not.toBe("cli");
});
const run = requireValue(
getSubagentRunByChildSessionKey(childSessionKey),
"expected subagent registry run",
);
expectRecordFields(run, {
runId,
childSessionKey,
controllerSessionKey: "agent:work:main",
requesterSessionKey: "agent:work:main",
requesterDisplayKey: "main",
cleanup: "keep",
spawnMode: "run",
label: "plugin:memory-core",
});
expectRecordFields(run.completion, { required: false });
expectRecordFields(run.delivery, { status: "not_required" });
const commandCallCount = mocks.agentCommand.mock.calls.length;
const createdAt = run.createdAt;
await invokeAgent(
{
message: "background plugin subagent task",
sessionKey: childSessionKey,
idempotencyKey: runId,
},
{
context,
reqId: `${runId}-retry`,
client: pluginClient,
},
);
expect(mocks.agentCommand).toHaveBeenCalledTimes(commandCallCount);
const retryTasks = listTaskRecords().filter((task) => task.runId === runId);
expect(retryTasks).toHaveLength(1);
expect(getSubagentRunByChildSessionKey(childSessionKey)?.createdAt).toBe(createdAt);
});
});
it("keeps plugin SDK subagent runs best-effort when registry persistence fails", async () => {
await withTempDir(
{ prefix: "openclaw-gateway-plugin-subagent-registry-fail-" },
async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSubagentRegistryForTests({ persist: false });
subagentRegistryTesting.setDepsForTest({
persistSubagentRunsToDiskOrThrow: () => {
throw new Error("disk full");
},
});
const runId = "plugin-subagent-registry-fail";
const childSessionKey = "agent:main:subagent:registry-fail";
const cfg = {
session: { mainKey: "main", scope: "per-sender" },
};
mocks.loadConfigReturn = cfg;
mocks.loadSessionEntry.mockReturnValue({
cfg,
storePath: "/tmp/sessions.json",
entry: {
sessionId: "plugin-subagent-registry-fail-session",
updatedAt: Date.now(),
},
canonicalKey: childSessionKey,
});
mocks.updateSessionStore.mockImplementation(async (_path, updater) => {
const store: Record<string, unknown> = {
[childSessionKey]: {
sessionId: "plugin-subagent-registry-fail-session",
updatedAt: Date.now(),
},
};
return await updater(store);
});
mocks.agentCommand.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: { durationMs: 100 },
});
const context = makeContext();
const baseClient = requireValue(backendGatewayClient(), "expected backend client");
const commandCallCount = mocks.agentCommand.mock.calls.length;
await invokeAgent(
{
message: "background plugin subagent task",
sessionKey: childSessionKey,
idempotencyKey: runId,
},
{
context,
reqId: runId,
client: {
connect: baseClient.connect,
internal: {
...baseClient.internal,
agentRunTracking: "plugin_subagent",
pluginRuntimeOwnerId: "memory-core",
},
},
},
);
expect(mocks.agentCommand).toHaveBeenCalledTimes(commandCallCount + 1);
await waitForAssertion(() => {
const task = requireValue(findTaskByRunId(runId), "expected fallback cli task");
expectRecordFields(task, {
runtime: "cli",
childSessionKey,
status: "succeeded",
terminalSummary: "completed",
});
});
expect(context.logGateway.warn).toHaveBeenCalledWith(
expect.stringContaining("falling back to cli task tracking"),
);
},
);
});
it("terminalizes failed async gateway agent runs in the shared task registry", async () => {
await withTempDir({ prefix: "openclaw-gateway-agent-task-error-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;

View File

@@ -102,6 +102,7 @@ import {
mergeDeliveryContext,
normalizeDeliveryContext,
normalizeSessionDeliveryFields,
type DeliveryContext,
} from "../../utils/delivery-context.shared.js";
import {
INTERNAL_MESSAGE_CHANNEL,
@@ -619,6 +620,52 @@ type GatewayAgentTaskTerminalStatus = Extract<
TaskStatus,
"succeeded" | "failed" | "timed_out" | "cancelled"
>;
type GatewayAgentTaskTrackingMode = "cli" | "plugin_subagent" | "none";
function resolveGatewayAgentTaskTrackingMode(params: {
client: GatewayRequestHandlerOptions["client"];
sessionKey?: string;
inputProvenance?: InputProvenance;
}): GatewayAgentTaskTrackingMode {
if (!params.sessionKey?.trim() || params.inputProvenance?.kind === "inter_session") {
return "none";
}
return params.client?.internal?.agentRunTracking === "plugin_subagent"
? "plugin_subagent"
: "cli";
}
async function registerPluginSubagentRunFromGateway(params: {
cfg: OpenClawConfig;
runId: string;
childSessionKey: string;
task: string;
requesterOrigin?: DeliveryContext;
pluginId?: string;
}): Promise<void> {
const childSessionKey = params.childSessionKey.trim();
if (!childSessionKey) {
return;
}
const ownerSessionKey = resolveAgentMainSessionKey({
cfg: params.cfg,
agentId: resolveAgentIdFromSessionKey(childSessionKey),
});
const { registerSubagentRun } = await import("../../agents/subagent-registry.js");
registerSubagentRun({
runId: params.runId,
childSessionKey,
controllerSessionKey: ownerSessionKey,
requesterSessionKey: ownerSessionKey,
requesterOrigin: params.requesterOrigin,
requesterDisplayKey: "main",
task: params.task,
cleanup: "keep",
...(params.pluginId ? { label: `plugin:${params.pluginId}` } : {}),
expectsCompletionMessage: false,
spawnMode: "run",
});
}
function resolveFailedTrackedAgentTaskStatus(error: unknown): GatewayAgentTaskTerminalStatus {
return isAbortError(error) || isTimeoutError(error) ? "timed_out" : "failed";
@@ -830,10 +877,9 @@ function dispatchAgentRunFromGateway(params: {
abortController: AbortController;
respond: GatewayRequestHandlerOptions["respond"];
context: GatewayRequestHandlerOptions["context"];
taskTrackingMode: Exclude<GatewayAgentTaskTrackingMode, "plugin_subagent">;
}) {
const inputProvenance = normalizeInputProvenance(params.ingressOpts.inputProvenance);
const shouldTrackTask =
params.ingressOpts.sessionKey?.trim() && inputProvenance?.kind !== "inter_session";
const shouldTrackTask = params.taskTrackingMode === "cli";
if (shouldTrackTask) {
try {
createRunningTaskRun({
@@ -2161,6 +2207,39 @@ export const agentHandlers: GatewayRequestHandlers = {
return;
}
const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId;
const taskTrackingMode = resolveGatewayAgentTaskTrackingMode({
client,
sessionKey: resolvedSessionKey,
inputProvenance,
});
let dispatchTaskTrackingMode: Exclude<GatewayAgentTaskTrackingMode, "plugin_subagent"> =
taskTrackingMode === "cli" ? "cli" : "none";
if (taskTrackingMode === "plugin_subagent" && resolvedSessionKey) {
try {
await registerPluginSubagentRunFromGateway({
cfg,
runId,
childSessionKey: resolvedSessionKey,
task: request.message.trim(),
requesterOrigin: normalizeDeliveryContext({
channel: resolvedChannel,
to: resolvedTo,
accountId: resolvedAccountId,
threadId: resolvedThreadId,
}),
pluginId: normalizeOptionalString(client?.internal?.pluginRuntimeOwnerId),
});
} catch (err) {
context.logGateway.warn(
`failed to register plugin subagent run ${runId}; falling back to cli task tracking: ${formatForLog(
err,
)}`,
);
dispatchTaskTrackingMode = "cli";
}
}
const accepted = {
runId,
sessionKey: resolvedSessionKey,
@@ -2246,7 +2325,6 @@ export const agentHandlers: GatewayRequestHandlers = {
message = annotateInterSessionPromptText(message, inputProvenance);
}
const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId;
const ingressAgentId =
resolvedSessionKey === "global"
? activeSessionAgentId
@@ -2364,6 +2442,7 @@ export const agentHandlers: GatewayRequestHandlers = {
abortController: activeRunAbort.controller,
respond,
context,
taskTrackingMode: dispatchTaskTrackingMode,
});
dispatched = true;
} catch (err) {

View File

@@ -36,6 +36,7 @@ export type GatewayClient = {
allowModelOverride?: boolean;
approvalRuntime?: boolean;
pluginRuntimeOwnerId?: string;
agentRunTracking?: "plugin_subagent";
};
};

View File

@@ -0,0 +1,202 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { PluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js";
import type { GatewayRequestContext, GatewayRequestOptions } from "./server-methods/types.js";
type HandleGatewayRequestOptions = GatewayRequestOptions & {
extraHandlers?: Record<string, unknown>;
};
const handleGatewayRequest = vi.hoisted(() =>
vi.fn(async (_opts: HandleGatewayRequestOptions) => {}),
);
vi.mock("./server-methods.js", () => ({
handleGatewayRequest,
}));
type ServerPluginsModule = typeof import("./server-plugins.js");
type GatewayRequestScopeModule = typeof import("../plugins/runtime/gateway-request-scope.js");
function createTestCfg(): OpenClawConfig {
return {
session: { mainKey: "agent:main:main", scope: "per-sender" },
} as unknown as OpenClawConfig;
}
function createTestContext(label: string, cfg: OpenClawConfig): GatewayRequestContext {
return {
label,
getRuntimeConfig: () => cfg,
} as unknown as GatewayRequestContext;
}
async function loadServerPlugins(): Promise<ServerPluginsModule> {
return await import("./server-plugins.js");
}
async function loadGatewayScope(): Promise<GatewayRequestScopeModule> {
return await import("../plugins/runtime/gateway-request-scope.js");
}
function lastGatewayRequest(): HandleGatewayRequestOptions {
const call = handleGatewayRequest.mock.calls.at(-1)?.[0];
if (!call) {
throw new Error("expected handleGatewayRequest call");
}
return call;
}
beforeEach(() => {
handleGatewayRequest.mockReset();
handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => {
switch (opts.req.method) {
case "agent":
opts.respond(true, { runId: "plugin-run-1" });
return;
case "agent.wait":
opts.respond(true, { status: "ok" });
return;
default:
opts.respond(true, {});
}
});
});
afterEach(async () => {
const serverPlugins = await loadServerPlugins();
serverPlugins.clearFallbackGatewayContext();
});
describe("createGatewaySubagentRuntime.run subagent_ended tracking (#59164)", () => {
test("marks plugin SDK subagent runs for Gateway-owned subagent tracking", async () => {
const serverPlugins = await loadServerPlugins();
const runtime = serverPlugins.createGatewaySubagentRuntime();
serverPlugins.setFallbackGatewayContext(
createTestContext("plugin-sdk-subagent", createTestCfg()),
);
const result = await runtime.run({
sessionKey: "agent:main:subagent:plugin-helper",
message: "summarize this transcript",
deliver: false,
});
expect(result.runId).toBe("plugin-run-1");
const request = lastGatewayRequest();
expect(request.req.method).toBe("agent");
expect(request.client?.internal?.agentRunTracking).toBe("plugin_subagent");
expect(request.client?.internal?.pluginRuntimeOwnerId).toBeUndefined();
});
test("preserves plugin identity on the tracked Gateway agent request", async () => {
const serverPlugins = await loadServerPlugins();
const gatewayScope = await loadGatewayScope();
const runtime = serverPlugins.createGatewaySubagentRuntime();
const scope = {
context: createTestContext("plugin-scope", createTestCfg()),
pluginId: "memory-core",
isWebchatConnect: () => false,
} satisfies PluginRuntimeGatewayRequestScope;
await gatewayScope.withPluginRuntimeGatewayRequestScope(scope, () =>
runtime.run({
sessionKey: "agent:main:subagent:dreaming-narrative",
message: "dream task",
deliver: false,
}),
);
const request = lastGatewayRequest();
expect(request.req.method).toBe("agent");
expect(request.client?.internal?.agentRunTracking).toBe("plugin_subagent");
expect(request.client?.internal?.pluginRuntimeOwnerId).toBe("memory-core");
});
test("does not dispatch when no runtime config is available", async () => {
const serverPlugins = await loadServerPlugins();
const runtime = serverPlugins.createGatewaySubagentRuntime();
await expect(
runtime.run({
sessionKey: "agent:main:subagent:orphan",
message: "no cfg available",
deliver: false,
}),
).rejects.toThrow(/gateway request scope/);
expect(handleGatewayRequest).not.toHaveBeenCalled();
});
test("preserves the child session so the transcript stays readable until the plugin deletes it", async () => {
const serverPlugins = await loadServerPlugins();
const runtime = serverPlugins.createGatewaySubagentRuntime();
serverPlugins.setFallbackGatewayContext(createTestContext("plugin-readback", createTestCfg()));
const transcript = [
{ role: "user", content: "summarize this transcript" },
{ role: "assistant", content: "summary text" },
];
const sessionStore = new Map<string, { messages: unknown[] }>([
["agent:main:subagent:plugin-readback", { messages: transcript }],
]);
handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => {
const req = opts.req as { method: string; params?: { key?: string } };
switch (req.method) {
case "agent":
opts.respond(true, { runId: "plugin-run-readback" });
return;
case "agent.wait":
opts.respond(true, { status: "ok" });
return;
case "sessions.get": {
const key = req.params?.key ?? "";
const stored = sessionStore.get(key);
if (!stored) {
opts.respond(false, undefined, {
code: "not_found",
message: `session ${key} not found`,
});
return;
}
opts.respond(true, stored);
return;
}
case "sessions.delete": {
const key = req.params?.key ?? "";
sessionStore.delete(key);
opts.respond(true, {});
return;
}
default:
opts.respond(true, {});
}
});
const runResult = await runtime.run({
sessionKey: "agent:main:subagent:plugin-readback",
message: "summarize this transcript",
deliver: false,
});
expect(runResult.runId).toBe("plugin-run-readback");
const waitResult = await runtime.waitForRun({ runId: runResult.runId });
expect(waitResult.status).toBe("ok");
const sessionView = await runtime.getSessionMessages({
sessionKey: "agent:main:subagent:plugin-readback",
});
expect(sessionView.messages).toEqual(transcript);
await runtime.deleteSession({
sessionKey: "agent:main:subagent:plugin-readback",
});
await expect(
runtime.getSessionMessages({
sessionKey: "agent:main:subagent:plugin-readback",
}),
).rejects.toThrow(/not found/);
});
});

View File

@@ -248,6 +248,7 @@ function resolveRequestedFallbackModelRef(params: {
function createSyntheticOperatorClient(params?: {
allowModelOverride?: boolean;
agentRunTracking?: "plugin_subagent";
pluginRuntimeOwnerId?: string;
scopes?: string[];
}): GatewayRequestOptions["client"] {
@@ -270,6 +271,7 @@ function createSyntheticOperatorClient(params?: {
},
internal: {
allowModelOverride: params?.allowModelOverride === true,
...(params?.agentRunTracking ? { agentRunTracking: params.agentRunTracking } : {}),
...(params?.scopes?.includes(APPROVALS_SCOPE) ? { approvalRuntime: true } : {}),
...(pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : {}),
},
@@ -303,6 +305,7 @@ function mergeGatewayClientInternal(
type DispatchGatewayMethodInProcessOptions = {
allowSyntheticModelOverride?: boolean;
agentRunTracking?: "plugin_subagent";
disableSyntheticClient?: boolean;
expectFinal?: boolean;
forceSyntheticClient?: boolean;
@@ -358,12 +361,18 @@ export async function dispatchGatewayMethodInProcessRaw(
: undefined;
const syntheticClient = createSyntheticOperatorClient({
allowModelOverride: options?.allowSyntheticModelOverride === true,
agentRunTracking: options?.agentRunTracking,
...(pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : {}),
scopes: options?.syntheticScopes,
});
const scopedClient = mergeGatewayClientInternal(
scope?.client,
pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : undefined,
pluginRuntimeOwnerId || options?.agentRunTracking
? {
...(options?.agentRunTracking ? { agentRunTracking: options.agentRunTracking } : {}),
...(pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : {}),
}
: undefined,
);
if (options?.disableSyntheticClient === true && !scopedClient) {
throw new Error(`In-process gateway dispatch requires a scoped client (method: ${method}).`);
@@ -503,6 +512,7 @@ export function createGatewaySubagentRuntime(): PluginRuntime["subagent"] {
},
{
allowSyntheticModelOverride,
agentRunTracking: "plugin_subagent",
...(pluginId ? { pluginRuntimeOwnerId: pluginId } : {}),
},
);