From 6f2fbaaaf86d4f8ab14cd7cc8d2af3f4588467aa Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 31 May 2026 17:56:40 +0100 Subject: [PATCH] fix(gateway): track plugin subagent runs in agent handler Plugin SDK subagent runs now register at the Gateway agent acceptance boundary so subagent_ended hooks fire without creating duplicate CLI task rows. The registration stays best-effort: if the subagent registry cannot persist tracking state, the run still dispatches and falls back to the existing CLI task tracking path. Closes #59164 Co-authored-by: Cornna <96944678+ymylive@users.noreply.github.com> --- src/gateway/server-methods/agent.test.ts | 202 ++++++++++++++++++ src/gateway/server-methods/agent.ts | 87 +++++++- src/gateway/server-methods/shared-types.ts | 1 + ...server-plugins.subagent-ended-hook.test.ts | 202 ++++++++++++++++++ src/gateway/server-plugins.ts | 12 +- 5 files changed, 499 insertions(+), 5 deletions(-) create mode 100644 src/gateway/server-plugins.subagent-ended-hook.test.ts diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 0680fb1f858..baa55331ad7 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -5,6 +5,11 @@ import { registerExecApprovalFollowupRuntimeHandoff, resetExecApprovalFollowupRuntimeHandoffsForTests, } from "../../agents/bash-tools.exec-approval-followup-state.js"; +import { + getSubagentRunByChildSessionKey, + resetSubagentRegistryForTests, + testing as subagentRegistryTesting, +} from "../../agents/subagent-registry.js"; import { getDetachedTaskLifecycleRuntime, resetDetachedTaskLifecycleRuntimeForTests, @@ -12,6 +17,7 @@ import { } from "../../tasks/detached-task-runtime.js"; import { findTaskByRunId, + listTaskRecords, markTaskTerminalById, resetTaskRegistryForTests, } from "../../tasks/task-registry.js"; @@ -508,6 +514,8 @@ describe("gateway agent handler", () => { } resetDetachedTaskLifecycleRuntimeForTests(); resetTaskRegistryForTests(); + resetSubagentRegistryForTests({ persist: false }); + subagentRegistryTesting.setDepsForTest(); mocks.loadConfigReturn = {}; mocks.emitGatewaySessionEndPluginHook.mockReset(); mocks.emitGatewaySessionStartPluginHook.mockReset(); @@ -2801,6 +2809,200 @@ describe("gateway agent handler", () => { }); }); + it("tracks plugin SDK subagent agent runs through the subagent registry only", async () => { + await withTempDir({ prefix: "openclaw-gateway-plugin-subagent-task-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetSubagentRegistryForTests({ persist: false }); + const runId = "plugin-subagent-task-run"; + const childSessionKey = "agent:work:subagent:plugin-helper"; + const cfg = { + session: { mainKey: "main", scope: "per-sender" }, + agents: { list: [{ id: "main", default: true }, { id: "work" }] }, + }; + mocks.listAgentIds.mockReturnValue(["main", "work"]); + mocks.loadConfigReturn = cfg; + mocks.loadSessionEntry.mockReturnValue({ + cfg, + storePath: "/tmp/sessions.json", + entry: { + sessionId: "plugin-subagent-session", + updatedAt: Date.now(), + }, + canonicalKey: childSessionKey, + }); + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + const store: Record = { + [childSessionKey]: { + sessionId: "plugin-subagent-session", + updatedAt: Date.now(), + }, + }; + return await updater(store); + }); + mocks.agentCommand.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { durationMs: 100 }, + }); + const context = makeContext(); + const baseClient = requireValue(backendGatewayClient(), "expected backend client"); + const pluginClient: AgentHandlerArgs["client"] = { + connect: baseClient.connect, + internal: { + ...baseClient.internal, + agentRunTracking: "plugin_subagent", + pluginRuntimeOwnerId: "memory-core", + }, + }; + + await invokeAgent( + { + message: "background plugin subagent task", + sessionKey: childSessionKey, + idempotencyKey: runId, + }, + { + context, + reqId: runId, + client: pluginClient, + }, + ); + + await waitForAssertion(() => { + const tasks = listTaskRecords().filter((task) => task.runId === runId); + expect(tasks).toHaveLength(1); + const task = requireValue(tasks[0], "expected one plugin subagent task"); + expectRecordFields(task, { + runtime: "subagent", + childSessionKey, + ownerKey: "agent:work:main", + label: "plugin:memory-core", + task: "background plugin subagent task", + deliveryStatus: "not_applicable", + }); + expect(task.runtime).not.toBe("cli"); + }); + + const run = requireValue( + getSubagentRunByChildSessionKey(childSessionKey), + "expected subagent registry run", + ); + expectRecordFields(run, { + runId, + childSessionKey, + controllerSessionKey: "agent:work:main", + requesterSessionKey: "agent:work:main", + requesterDisplayKey: "main", + cleanup: "keep", + spawnMode: "run", + label: "plugin:memory-core", + }); + expectRecordFields(run.completion, { required: false }); + expectRecordFields(run.delivery, { status: "not_required" }); + + const commandCallCount = mocks.agentCommand.mock.calls.length; + const createdAt = run.createdAt; + await invokeAgent( + { + message: "background plugin subagent task", + sessionKey: childSessionKey, + idempotencyKey: runId, + }, + { + context, + reqId: `${runId}-retry`, + client: pluginClient, + }, + ); + + expect(mocks.agentCommand).toHaveBeenCalledTimes(commandCallCount); + const retryTasks = listTaskRecords().filter((task) => task.runId === runId); + expect(retryTasks).toHaveLength(1); + expect(getSubagentRunByChildSessionKey(childSessionKey)?.createdAt).toBe(createdAt); + }); + }); + + it("keeps plugin SDK subagent runs best-effort when registry persistence fails", async () => { + await withTempDir( + { prefix: "openclaw-gateway-plugin-subagent-registry-fail-" }, + async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetSubagentRegistryForTests({ persist: false }); + subagentRegistryTesting.setDepsForTest({ + persistSubagentRunsToDiskOrThrow: () => { + throw new Error("disk full"); + }, + }); + const runId = "plugin-subagent-registry-fail"; + const childSessionKey = "agent:main:subagent:registry-fail"; + const cfg = { + session: { mainKey: "main", scope: "per-sender" }, + }; + mocks.loadConfigReturn = cfg; + mocks.loadSessionEntry.mockReturnValue({ + cfg, + storePath: "/tmp/sessions.json", + entry: { + sessionId: "plugin-subagent-registry-fail-session", + updatedAt: Date.now(), + }, + canonicalKey: childSessionKey, + }); + mocks.updateSessionStore.mockImplementation(async (_path, updater) => { + const store: Record = { + [childSessionKey]: { + sessionId: "plugin-subagent-registry-fail-session", + updatedAt: Date.now(), + }, + }; + return await updater(store); + }); + mocks.agentCommand.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { durationMs: 100 }, + }); + const context = makeContext(); + const baseClient = requireValue(backendGatewayClient(), "expected backend client"); + const commandCallCount = mocks.agentCommand.mock.calls.length; + + await invokeAgent( + { + message: "background plugin subagent task", + sessionKey: childSessionKey, + idempotencyKey: runId, + }, + { + context, + reqId: runId, + client: { + connect: baseClient.connect, + internal: { + ...baseClient.internal, + agentRunTracking: "plugin_subagent", + pluginRuntimeOwnerId: "memory-core", + }, + }, + }, + ); + + expect(mocks.agentCommand).toHaveBeenCalledTimes(commandCallCount + 1); + await waitForAssertion(() => { + const task = requireValue(findTaskByRunId(runId), "expected fallback cli task"); + expectRecordFields(task, { + runtime: "cli", + childSessionKey, + status: "succeeded", + terminalSummary: "completed", + }); + }); + expect(context.logGateway.warn).toHaveBeenCalledWith( + expect.stringContaining("falling back to cli task tracking"), + ); + }, + ); + }); + it("terminalizes failed async gateway agent runs in the shared task registry", async () => { await withTempDir({ prefix: "openclaw-gateway-agent-task-error-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 22253d14203..cf1bbdc0857 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -102,6 +102,7 @@ import { mergeDeliveryContext, normalizeDeliveryContext, normalizeSessionDeliveryFields, + type DeliveryContext, } from "../../utils/delivery-context.shared.js"; import { INTERNAL_MESSAGE_CHANNEL, @@ -619,6 +620,52 @@ type GatewayAgentTaskTerminalStatus = Extract< TaskStatus, "succeeded" | "failed" | "timed_out" | "cancelled" >; +type GatewayAgentTaskTrackingMode = "cli" | "plugin_subagent" | "none"; + +function resolveGatewayAgentTaskTrackingMode(params: { + client: GatewayRequestHandlerOptions["client"]; + sessionKey?: string; + inputProvenance?: InputProvenance; +}): GatewayAgentTaskTrackingMode { + if (!params.sessionKey?.trim() || params.inputProvenance?.kind === "inter_session") { + return "none"; + } + return params.client?.internal?.agentRunTracking === "plugin_subagent" + ? "plugin_subagent" + : "cli"; +} + +async function registerPluginSubagentRunFromGateway(params: { + cfg: OpenClawConfig; + runId: string; + childSessionKey: string; + task: string; + requesterOrigin?: DeliveryContext; + pluginId?: string; +}): Promise { + const childSessionKey = params.childSessionKey.trim(); + if (!childSessionKey) { + return; + } + const ownerSessionKey = resolveAgentMainSessionKey({ + cfg: params.cfg, + agentId: resolveAgentIdFromSessionKey(childSessionKey), + }); + const { registerSubagentRun } = await import("../../agents/subagent-registry.js"); + registerSubagentRun({ + runId: params.runId, + childSessionKey, + controllerSessionKey: ownerSessionKey, + requesterSessionKey: ownerSessionKey, + requesterOrigin: params.requesterOrigin, + requesterDisplayKey: "main", + task: params.task, + cleanup: "keep", + ...(params.pluginId ? { label: `plugin:${params.pluginId}` } : {}), + expectsCompletionMessage: false, + spawnMode: "run", + }); +} function resolveFailedTrackedAgentTaskStatus(error: unknown): GatewayAgentTaskTerminalStatus { return isAbortError(error) || isTimeoutError(error) ? "timed_out" : "failed"; @@ -830,10 +877,9 @@ function dispatchAgentRunFromGateway(params: { abortController: AbortController; respond: GatewayRequestHandlerOptions["respond"]; context: GatewayRequestHandlerOptions["context"]; + taskTrackingMode: Exclude; }) { - const inputProvenance = normalizeInputProvenance(params.ingressOpts.inputProvenance); - const shouldTrackTask = - params.ingressOpts.sessionKey?.trim() && inputProvenance?.kind !== "inter_session"; + const shouldTrackTask = params.taskTrackingMode === "cli"; if (shouldTrackTask) { try { createRunningTaskRun({ @@ -2161,6 +2207,39 @@ export const agentHandlers: GatewayRequestHandlers = { return; } + const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; + const taskTrackingMode = resolveGatewayAgentTaskTrackingMode({ + client, + sessionKey: resolvedSessionKey, + inputProvenance, + }); + let dispatchTaskTrackingMode: Exclude = + taskTrackingMode === "cli" ? "cli" : "none"; + if (taskTrackingMode === "plugin_subagent" && resolvedSessionKey) { + try { + await registerPluginSubagentRunFromGateway({ + cfg, + runId, + childSessionKey: resolvedSessionKey, + task: request.message.trim(), + requesterOrigin: normalizeDeliveryContext({ + channel: resolvedChannel, + to: resolvedTo, + accountId: resolvedAccountId, + threadId: resolvedThreadId, + }), + pluginId: normalizeOptionalString(client?.internal?.pluginRuntimeOwnerId), + }); + } catch (err) { + context.logGateway.warn( + `failed to register plugin subagent run ${runId}; falling back to cli task tracking: ${formatForLog( + err, + )}`, + ); + dispatchTaskTrackingMode = "cli"; + } + } + const accepted = { runId, sessionKey: resolvedSessionKey, @@ -2246,7 +2325,6 @@ export const agentHandlers: GatewayRequestHandlers = { message = annotateInterSessionPromptText(message, inputProvenance); } - const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; const ingressAgentId = resolvedSessionKey === "global" ? activeSessionAgentId @@ -2364,6 +2442,7 @@ export const agentHandlers: GatewayRequestHandlers = { abortController: activeRunAbort.controller, respond, context, + taskTrackingMode: dispatchTaskTrackingMode, }); dispatched = true; } catch (err) { diff --git a/src/gateway/server-methods/shared-types.ts b/src/gateway/server-methods/shared-types.ts index cd69aa0117b..786e67c0f00 100644 --- a/src/gateway/server-methods/shared-types.ts +++ b/src/gateway/server-methods/shared-types.ts @@ -36,6 +36,7 @@ export type GatewayClient = { allowModelOverride?: boolean; approvalRuntime?: boolean; pluginRuntimeOwnerId?: string; + agentRunTracking?: "plugin_subagent"; }; }; diff --git a/src/gateway/server-plugins.subagent-ended-hook.test.ts b/src/gateway/server-plugins.subagent-ended-hook.test.ts new file mode 100644 index 00000000000..2b50f5db630 --- /dev/null +++ b/src/gateway/server-plugins.subagent-ended-hook.test.ts @@ -0,0 +1,202 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { PluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js"; +import type { GatewayRequestContext, GatewayRequestOptions } from "./server-methods/types.js"; + +type HandleGatewayRequestOptions = GatewayRequestOptions & { + extraHandlers?: Record; +}; +const handleGatewayRequest = vi.hoisted(() => + vi.fn(async (_opts: HandleGatewayRequestOptions) => {}), +); + +vi.mock("./server-methods.js", () => ({ + handleGatewayRequest, +})); + +type ServerPluginsModule = typeof import("./server-plugins.js"); +type GatewayRequestScopeModule = typeof import("../plugins/runtime/gateway-request-scope.js"); + +function createTestCfg(): OpenClawConfig { + return { + session: { mainKey: "agent:main:main", scope: "per-sender" }, + } as unknown as OpenClawConfig; +} + +function createTestContext(label: string, cfg: OpenClawConfig): GatewayRequestContext { + return { + label, + getRuntimeConfig: () => cfg, + } as unknown as GatewayRequestContext; +} + +async function loadServerPlugins(): Promise { + return await import("./server-plugins.js"); +} + +async function loadGatewayScope(): Promise { + return await import("../plugins/runtime/gateway-request-scope.js"); +} + +function lastGatewayRequest(): HandleGatewayRequestOptions { + const call = handleGatewayRequest.mock.calls.at(-1)?.[0]; + if (!call) { + throw new Error("expected handleGatewayRequest call"); + } + return call; +} + +beforeEach(() => { + handleGatewayRequest.mockReset(); + handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => { + switch (opts.req.method) { + case "agent": + opts.respond(true, { runId: "plugin-run-1" }); + return; + case "agent.wait": + opts.respond(true, { status: "ok" }); + return; + default: + opts.respond(true, {}); + } + }); +}); + +afterEach(async () => { + const serverPlugins = await loadServerPlugins(); + serverPlugins.clearFallbackGatewayContext(); +}); + +describe("createGatewaySubagentRuntime.run subagent_ended tracking (#59164)", () => { + test("marks plugin SDK subagent runs for Gateway-owned subagent tracking", async () => { + const serverPlugins = await loadServerPlugins(); + const runtime = serverPlugins.createGatewaySubagentRuntime(); + serverPlugins.setFallbackGatewayContext( + createTestContext("plugin-sdk-subagent", createTestCfg()), + ); + + const result = await runtime.run({ + sessionKey: "agent:main:subagent:plugin-helper", + message: "summarize this transcript", + deliver: false, + }); + + expect(result.runId).toBe("plugin-run-1"); + const request = lastGatewayRequest(); + expect(request.req.method).toBe("agent"); + expect(request.client?.internal?.agentRunTracking).toBe("plugin_subagent"); + expect(request.client?.internal?.pluginRuntimeOwnerId).toBeUndefined(); + }); + + test("preserves plugin identity on the tracked Gateway agent request", async () => { + const serverPlugins = await loadServerPlugins(); + const gatewayScope = await loadGatewayScope(); + const runtime = serverPlugins.createGatewaySubagentRuntime(); + + const scope = { + context: createTestContext("plugin-scope", createTestCfg()), + pluginId: "memory-core", + isWebchatConnect: () => false, + } satisfies PluginRuntimeGatewayRequestScope; + + await gatewayScope.withPluginRuntimeGatewayRequestScope(scope, () => + runtime.run({ + sessionKey: "agent:main:subagent:dreaming-narrative", + message: "dream task", + deliver: false, + }), + ); + + const request = lastGatewayRequest(); + expect(request.req.method).toBe("agent"); + expect(request.client?.internal?.agentRunTracking).toBe("plugin_subagent"); + expect(request.client?.internal?.pluginRuntimeOwnerId).toBe("memory-core"); + }); + + test("does not dispatch when no runtime config is available", async () => { + const serverPlugins = await loadServerPlugins(); + const runtime = serverPlugins.createGatewaySubagentRuntime(); + + await expect( + runtime.run({ + sessionKey: "agent:main:subagent:orphan", + message: "no cfg available", + deliver: false, + }), + ).rejects.toThrow(/gateway request scope/); + + expect(handleGatewayRequest).not.toHaveBeenCalled(); + }); + + test("preserves the child session so the transcript stays readable until the plugin deletes it", async () => { + const serverPlugins = await loadServerPlugins(); + const runtime = serverPlugins.createGatewaySubagentRuntime(); + serverPlugins.setFallbackGatewayContext(createTestContext("plugin-readback", createTestCfg())); + + const transcript = [ + { role: "user", content: "summarize this transcript" }, + { role: "assistant", content: "summary text" }, + ]; + const sessionStore = new Map([ + ["agent:main:subagent:plugin-readback", { messages: transcript }], + ]); + + handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => { + const req = opts.req as { method: string; params?: { key?: string } }; + switch (req.method) { + case "agent": + opts.respond(true, { runId: "plugin-run-readback" }); + return; + case "agent.wait": + opts.respond(true, { status: "ok" }); + return; + case "sessions.get": { + const key = req.params?.key ?? ""; + const stored = sessionStore.get(key); + if (!stored) { + opts.respond(false, undefined, { + code: "not_found", + message: `session ${key} not found`, + }); + return; + } + opts.respond(true, stored); + return; + } + case "sessions.delete": { + const key = req.params?.key ?? ""; + sessionStore.delete(key); + opts.respond(true, {}); + return; + } + default: + opts.respond(true, {}); + } + }); + + const runResult = await runtime.run({ + sessionKey: "agent:main:subagent:plugin-readback", + message: "summarize this transcript", + deliver: false, + }); + expect(runResult.runId).toBe("plugin-run-readback"); + + const waitResult = await runtime.waitForRun({ runId: runResult.runId }); + expect(waitResult.status).toBe("ok"); + + const sessionView = await runtime.getSessionMessages({ + sessionKey: "agent:main:subagent:plugin-readback", + }); + expect(sessionView.messages).toEqual(transcript); + + await runtime.deleteSession({ + sessionKey: "agent:main:subagent:plugin-readback", + }); + + await expect( + runtime.getSessionMessages({ + sessionKey: "agent:main:subagent:plugin-readback", + }), + ).rejects.toThrow(/not found/); + }); +}); diff --git a/src/gateway/server-plugins.ts b/src/gateway/server-plugins.ts index 4918b481cbf..25ca46efd3b 100644 --- a/src/gateway/server-plugins.ts +++ b/src/gateway/server-plugins.ts @@ -248,6 +248,7 @@ function resolveRequestedFallbackModelRef(params: { function createSyntheticOperatorClient(params?: { allowModelOverride?: boolean; + agentRunTracking?: "plugin_subagent"; pluginRuntimeOwnerId?: string; scopes?: string[]; }): GatewayRequestOptions["client"] { @@ -270,6 +271,7 @@ function createSyntheticOperatorClient(params?: { }, internal: { allowModelOverride: params?.allowModelOverride === true, + ...(params?.agentRunTracking ? { agentRunTracking: params.agentRunTracking } : {}), ...(params?.scopes?.includes(APPROVALS_SCOPE) ? { approvalRuntime: true } : {}), ...(pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : {}), }, @@ -303,6 +305,7 @@ function mergeGatewayClientInternal( type DispatchGatewayMethodInProcessOptions = { allowSyntheticModelOverride?: boolean; + agentRunTracking?: "plugin_subagent"; disableSyntheticClient?: boolean; expectFinal?: boolean; forceSyntheticClient?: boolean; @@ -358,12 +361,18 @@ export async function dispatchGatewayMethodInProcessRaw( : undefined; const syntheticClient = createSyntheticOperatorClient({ allowModelOverride: options?.allowSyntheticModelOverride === true, + agentRunTracking: options?.agentRunTracking, ...(pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : {}), scopes: options?.syntheticScopes, }); const scopedClient = mergeGatewayClientInternal( scope?.client, - pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : undefined, + pluginRuntimeOwnerId || options?.agentRunTracking + ? { + ...(options?.agentRunTracking ? { agentRunTracking: options.agentRunTracking } : {}), + ...(pluginRuntimeOwnerId ? { pluginRuntimeOwnerId } : {}), + } + : undefined, ); if (options?.disableSyntheticClient === true && !scopedClient) { throw new Error(`In-process gateway dispatch requires a scoped client (method: ${method}).`); @@ -503,6 +512,7 @@ export function createGatewaySubagentRuntime(): PluginRuntime["subagent"] { }, { allowSyntheticModelOverride, + agentRunTracking: "plugin_subagent", ...(pluginId ? { pluginRuntimeOwnerId: pluginId } : {}), }, );