mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 13:50:49 +00:00
fix: scope nested agent lanes per target session (#67785) (thanks @stainlu)
* fix(agents): scope nested lane per target session to stop cross-agent blocking * docs(agents): note per-session nested-lane lifecycle parity with session:* lanes * refactor(agents): distill nested lane helpers * fix: scope nested agent lanes per target session (#67785) (thanks @stainlu) --------- Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -9,6 +9,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
### Fixes
|
### Fixes
|
||||||
|
|
||||||
- Agents/openai-completions: always send `stream_options.include_usage` on streaming requests, so local and custom OpenAI-compatible backends report real context usage instead of showing 0%. (#68746) Thanks @kagura-agent.
|
- Agents/openai-completions: always send `stream_options.include_usage` on streaming requests, so local and custom OpenAI-compatible backends report real context usage instead of showing 0%. (#68746) Thanks @kagura-agent.
|
||||||
|
- Agents/nested lanes: scope nested agent work per target session so a long-running nested run on one session no longer head-of-line blocks unrelated sessions across the gateway. (#67785) Thanks @stainlu.
|
||||||
|
|
||||||
## 2026.4.19-beta.1
|
## 2026.4.19-beta.1
|
||||||
|
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ import {
|
|||||||
import type { OutboundSessionContext } from "../../infra/outbound/session-context.js";
|
import type { OutboundSessionContext } from "../../infra/outbound/session-context.js";
|
||||||
import type { RuntimeEnv } from "../../runtime.js";
|
import type { RuntimeEnv } from "../../runtime.js";
|
||||||
import { isInternalMessageChannel } from "../../utils/message-channel.js";
|
import { isInternalMessageChannel } from "../../utils/message-channel.js";
|
||||||
import { AGENT_LANE_NESTED } from "../lanes.js";
|
import { isNestedAgentLane } from "../lanes.js";
|
||||||
import type { AgentCommandOpts } from "./types.js";
|
import type { AgentCommandOpts } from "./types.js";
|
||||||
|
|
||||||
type RunResult = Awaited<ReturnType<(typeof import("../pi-embedded.js"))["runEmbeddedPiAgent"]>>;
|
type RunResult = Awaited<ReturnType<(typeof import("../pi-embedded.js"))["runEmbeddedPiAgent"]>>;
|
||||||
@@ -351,7 +351,7 @@ export async function deliverAgentCommandResult(params: {
|
|||||||
if (!output) {
|
if (!output) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (opts.lane === AGENT_LANE_NESTED) {
|
if (isNestedAgentLane(opts.lane)) {
|
||||||
logNestedOutput(runtime, opts, output, effectiveSessionKey);
|
logNestedOutput(runtime, opts, output, effectiveSessionKey);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,10 @@
|
|||||||
import { describe, expect, it } from "vitest";
|
import { describe, expect, it } from "vitest";
|
||||||
import { AGENT_LANE_NESTED, resolveNestedAgentLane } from "./lanes.js";
|
import {
|
||||||
|
AGENT_LANE_NESTED,
|
||||||
|
isNestedAgentLane,
|
||||||
|
resolveNestedAgentLane,
|
||||||
|
resolveNestedAgentLaneForSession,
|
||||||
|
} from "./lanes.js";
|
||||||
|
|
||||||
describe("resolveNestedAgentLane", () => {
|
describe("resolveNestedAgentLane", () => {
|
||||||
it("defaults to the nested lane when no lane is provided", () => {
|
it("defaults to the nested lane when no lane is provided", () => {
|
||||||
@@ -16,3 +21,63 @@ describe("resolveNestedAgentLane", () => {
|
|||||||
expect(resolveNestedAgentLane(" custom-lane ")).toBe("custom-lane");
|
expect(resolveNestedAgentLane(" custom-lane ")).toBe("custom-lane");
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("resolveNestedAgentLaneForSession (#67502)", () => {
|
||||||
|
it("falls back to the unscoped nested lane when no session key is provided", () => {
|
||||||
|
expect(resolveNestedAgentLaneForSession(undefined)).toBe(AGENT_LANE_NESTED);
|
||||||
|
expect(resolveNestedAgentLaneForSession("")).toBe(AGENT_LANE_NESTED);
|
||||||
|
expect(resolveNestedAgentLaneForSession(" ")).toBe(AGENT_LANE_NESTED);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("scopes the nested lane per target session key", () => {
|
||||||
|
expect(resolveNestedAgentLaneForSession("agent:ebao-next:discord:channel:1")).toBe(
|
||||||
|
`${AGENT_LANE_NESTED}:agent:ebao-next:discord:channel:1`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("produces distinct lanes for distinct target sessions", () => {
|
||||||
|
const laneA = resolveNestedAgentLaneForSession("agent:ebao-next:discord:channel:1");
|
||||||
|
const laneB = resolveNestedAgentLaneForSession("agent:ebao-vue:discord:channel:2");
|
||||||
|
expect(laneA).not.toBe(laneB);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("is deterministic for the same session key across calls", () => {
|
||||||
|
const key = "agent:ebao:discord:channel:1";
|
||||||
|
expect(resolveNestedAgentLaneForSession(key)).toBe(resolveNestedAgentLaneForSession(key));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("trims whitespace around the session key before scoping", () => {
|
||||||
|
expect(resolveNestedAgentLaneForSession(" agent:ebao:main ")).toBe(
|
||||||
|
`${AGENT_LANE_NESTED}:agent:ebao:main`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("isNestedAgentLane", () => {
|
||||||
|
it("returns true for the unscoped nested lane", () => {
|
||||||
|
expect(isNestedAgentLane(AGENT_LANE_NESTED)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns true for per-session nested lanes", () => {
|
||||||
|
expect(isNestedAgentLane(resolveNestedAgentLaneForSession("agent:a:main"))).toBe(true);
|
||||||
|
expect(isNestedAgentLane(`${AGENT_LANE_NESTED}:agent:a:main`)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false for unrelated lanes", () => {
|
||||||
|
expect(isNestedAgentLane("main")).toBe(false);
|
||||||
|
expect(isNestedAgentLane("cron")).toBe(false);
|
||||||
|
expect(isNestedAgentLane("subagent")).toBe(false);
|
||||||
|
expect(isNestedAgentLane("session:agent:a:main")).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false for lanes that merely contain 'nested' as a substring", () => {
|
||||||
|
expect(isNestedAgentLane("deeply-nested-lane")).toBe(false);
|
||||||
|
expect(isNestedAgentLane("session:nested")).toBe(false);
|
||||||
|
expect(isNestedAgentLane("nestedfoo")).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false for empty or missing lane names", () => {
|
||||||
|
expect(isNestedAgentLane(undefined)).toBe(false);
|
||||||
|
expect(isNestedAgentLane("")).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ import { CommandLane } from "../process/lanes.js";
|
|||||||
|
|
||||||
export const AGENT_LANE_NESTED = CommandLane.Nested;
|
export const AGENT_LANE_NESTED = CommandLane.Nested;
|
||||||
export const AGENT_LANE_SUBAGENT = CommandLane.Subagent;
|
export const AGENT_LANE_SUBAGENT = CommandLane.Subagent;
|
||||||
|
const NESTED_LANE = "nested";
|
||||||
|
const NESTED_LANE_PREFIX = `${NESTED_LANE}:`;
|
||||||
|
|
||||||
export function resolveNestedAgentLane(lane?: string): string {
|
export function resolveNestedAgentLane(lane?: string): string {
|
||||||
const trimmed = lane?.trim();
|
const trimmed = lane?.trim();
|
||||||
@@ -12,3 +14,18 @@ export function resolveNestedAgentLane(lane?: string): string {
|
|||||||
}
|
}
|
||||||
return trimmed;
|
return trimmed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function resolveNestedAgentLaneForSession(sessionKey: string | undefined): string {
|
||||||
|
const trimmed = sessionKey?.trim();
|
||||||
|
if (!trimmed) {
|
||||||
|
return AGENT_LANE_NESTED;
|
||||||
|
}
|
||||||
|
return `${NESTED_LANE_PREFIX}${trimmed}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isNestedAgentLane(lane: string | undefined): boolean {
|
||||||
|
if (!lane) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return lane === NESTED_LANE || lane.startsWith(NESTED_LANE_PREFIX);
|
||||||
|
}
|
||||||
|
|||||||
@@ -758,7 +758,7 @@ describe("sessions tools", () => {
|
|||||||
expect(agentCalls).toHaveLength(8);
|
expect(agentCalls).toHaveLength(8);
|
||||||
for (const call of agentCalls) {
|
for (const call of agentCalls) {
|
||||||
expect(call.params).toMatchObject({
|
expect(call.params).toMatchObject({
|
||||||
lane: "nested",
|
lane: expect.stringMatching(/^nested(?::|$)/),
|
||||||
channel: "webchat",
|
channel: "webchat",
|
||||||
inputProvenance: { kind: "inter_session" },
|
inputProvenance: { kind: "inter_session" },
|
||||||
});
|
});
|
||||||
@@ -938,7 +938,7 @@ describe("sessions tools", () => {
|
|||||||
expect(agentCalls).toHaveLength(4);
|
expect(agentCalls).toHaveLength(4);
|
||||||
for (const call of agentCalls) {
|
for (const call of agentCalls) {
|
||||||
expect(call.params).toMatchObject({
|
expect(call.params).toMatchObject({
|
||||||
lane: "nested",
|
lane: expect.stringMatching(/^nested(?::|$)/),
|
||||||
channel: "webchat",
|
channel: "webchat",
|
||||||
inputProvenance: { kind: "inter_session" },
|
inputProvenance: { kind: "inter_session" },
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import crypto from "node:crypto";
|
import crypto from "node:crypto";
|
||||||
import { callGateway } from "../../gateway/call.js";
|
import { callGateway } from "../../gateway/call.js";
|
||||||
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
|
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
|
||||||
import { AGENT_LANE_NESTED } from "../lanes.js";
|
import { resolveNestedAgentLaneForSession } from "../lanes.js";
|
||||||
import { waitForAgentRunAndReadUpdatedAssistantReply } from "../run-wait.js";
|
import { waitForAgentRunAndReadUpdatedAssistantReply } from "../run-wait.js";
|
||||||
|
|
||||||
export { readLatestAssistantReply } from "../run-wait.js";
|
export { readLatestAssistantReply } from "../run-wait.js";
|
||||||
@@ -36,7 +36,7 @@ export async function runAgentStep(params: {
|
|||||||
idempotencyKey: stepIdem,
|
idempotencyKey: stepIdem,
|
||||||
deliver: false,
|
deliver: false,
|
||||||
channel: params.channel ?? INTERNAL_MESSAGE_CHANNEL,
|
channel: params.channel ?? INTERNAL_MESSAGE_CHANNEL,
|
||||||
lane: params.lane ?? AGENT_LANE_NESTED,
|
lane: params.lane ?? resolveNestedAgentLaneForSession(params.sessionKey),
|
||||||
extraSystemPrompt: params.extraSystemPrompt,
|
extraSystemPrompt: params.extraSystemPrompt,
|
||||||
inputProvenance: {
|
inputProvenance: {
|
||||||
kind: "inter_session",
|
kind: "inter_session",
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import type { CallGatewayOptions } from "../../gateway/call.js";
|
|||||||
import { formatErrorMessage } from "../../infra/errors.js";
|
import { formatErrorMessage } from "../../infra/errors.js";
|
||||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||||
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
|
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
|
||||||
import { AGENT_LANE_NESTED } from "../lanes.js";
|
import { resolveNestedAgentLaneForSession } from "../lanes.js";
|
||||||
import { readLatestAssistantReply, waitForAgentRun } from "../run-wait.js";
|
import { readLatestAssistantReply, waitForAgentRun } from "../run-wait.js";
|
||||||
import { runAgentStep } from "./agent-step.js";
|
import { runAgentStep } from "./agent-step.js";
|
||||||
import { resolveAnnounceTarget } from "./sessions-announce-target.js";
|
import { resolveAnnounceTarget } from "./sessions-announce-target.js";
|
||||||
@@ -92,7 +92,7 @@ export async function runSessionsSendA2AFlow(params: {
|
|||||||
message: incomingMessage,
|
message: incomingMessage,
|
||||||
extraSystemPrompt: replyPrompt,
|
extraSystemPrompt: replyPrompt,
|
||||||
timeoutMs: params.announceTimeoutMs,
|
timeoutMs: params.announceTimeoutMs,
|
||||||
lane: AGENT_LANE_NESTED,
|
lane: resolveNestedAgentLaneForSession(currentSessionKey),
|
||||||
sourceSessionKey: nextSessionKey,
|
sourceSessionKey: nextSessionKey,
|
||||||
sourceChannel:
|
sourceChannel:
|
||||||
nextSessionKey === params.requesterSessionKey ? params.requesterChannel : targetChannel,
|
nextSessionKey === params.requesterSessionKey ? params.requesterChannel : targetChannel,
|
||||||
@@ -123,7 +123,7 @@ export async function runSessionsSendA2AFlow(params: {
|
|||||||
message: "Agent-to-agent announce step.",
|
message: "Agent-to-agent announce step.",
|
||||||
extraSystemPrompt: announcePrompt,
|
extraSystemPrompt: announcePrompt,
|
||||||
timeoutMs: params.announceTimeoutMs,
|
timeoutMs: params.announceTimeoutMs,
|
||||||
lane: AGENT_LANE_NESTED,
|
lane: resolveNestedAgentLaneForSession(params.targetSessionKey),
|
||||||
sourceSessionKey: params.requesterSessionKey,
|
sourceSessionKey: params.requesterSessionKey,
|
||||||
sourceChannel: params.requesterChannel,
|
sourceChannel: params.requesterChannel,
|
||||||
sourceTool: "sessions_send",
|
sourceTool: "sessions_send",
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import {
|
|||||||
type GatewayMessageChannel,
|
type GatewayMessageChannel,
|
||||||
INTERNAL_MESSAGE_CHANNEL,
|
INTERNAL_MESSAGE_CHANNEL,
|
||||||
} from "../../utils/message-channel.js";
|
} from "../../utils/message-channel.js";
|
||||||
import { AGENT_LANE_NESTED } from "../lanes.js";
|
import { resolveNestedAgentLaneForSession } from "../lanes.js";
|
||||||
import {
|
import {
|
||||||
readLatestAssistantReplySnapshot,
|
readLatestAssistantReplySnapshot,
|
||||||
waitForAgentRunAndReadUpdatedAssistantReply,
|
waitForAgentRunAndReadUpdatedAssistantReply,
|
||||||
@@ -276,7 +276,7 @@ export function createSessionsSendTool(opts?: {
|
|||||||
idempotencyKey,
|
idempotencyKey,
|
||||||
deliver: false,
|
deliver: false,
|
||||||
channel: INTERNAL_MESSAGE_CHANNEL,
|
channel: INTERNAL_MESSAGE_CHANNEL,
|
||||||
lane: AGENT_LANE_NESTED,
|
lane: resolveNestedAgentLaneForSession(resolvedKey),
|
||||||
extraSystemPrompt: agentMessageContext,
|
extraSystemPrompt: agentMessageContext,
|
||||||
inputProvenance: {
|
inputProvenance: {
|
||||||
kind: "inter_session",
|
kind: "inter_session",
|
||||||
|
|||||||
@@ -292,6 +292,29 @@ describe("deliverAgentCommandResult", () => {
|
|||||||
expect(line).toContain("ANNOUNCE_SKIP");
|
expect(line).toContain("ANNOUNCE_SKIP");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("prefixes per-session nested lanes with the same nested log context (#67502)", async () => {
|
||||||
|
const runtime = createRuntime();
|
||||||
|
await runDelivery({
|
||||||
|
runtime,
|
||||||
|
resultText: "ANNOUNCE_SKIP",
|
||||||
|
opts: {
|
||||||
|
message: "hello",
|
||||||
|
deliver: false,
|
||||||
|
lane: "nested:agent:ebao-next:discord:channel:1",
|
||||||
|
sessionKey: "agent:ebao-next:discord:channel:1",
|
||||||
|
runId: "run-announce",
|
||||||
|
messageChannel: "webchat",
|
||||||
|
},
|
||||||
|
sessionEntry: undefined,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(runtime.log).toHaveBeenCalledTimes(1);
|
||||||
|
const line = String((runtime.log as ReturnType<typeof vi.fn>).mock.calls[0]?.[0]);
|
||||||
|
expect(line).toContain("[agent:nested]");
|
||||||
|
expect(line).toContain("session=agent:ebao-next:discord:channel:1");
|
||||||
|
expect(line).toContain("ANNOUNCE_SKIP");
|
||||||
|
});
|
||||||
|
|
||||||
it("preserves audioAsVoice in JSON output envelopes", async () => {
|
it("preserves audioAsVoice in JSON output envelopes", async () => {
|
||||||
const runtime = createRuntime();
|
const runtime = createRuntime();
|
||||||
await runDelivery({
|
await runDelivery({
|
||||||
|
|||||||
@@ -150,7 +150,7 @@ describe("sessions_send gateway loopback", () => {
|
|||||||
const firstCall = spy.mock.calls[0]?.[0] as
|
const firstCall = spy.mock.calls[0]?.[0] as
|
||||||
| { lane?: string; inputProvenance?: { kind?: string; sourceTool?: string } }
|
| { lane?: string; inputProvenance?: { kind?: string; sourceTool?: string } }
|
||||||
| undefined;
|
| undefined;
|
||||||
expect(firstCall?.lane).toBe("nested");
|
expect(firstCall?.lane).toMatch(/^nested(?::|$)/);
|
||||||
expect(firstCall?.inputProvenance).toMatchObject({
|
expect(firstCall?.inputProvenance).toMatchObject({
|
||||||
kind: "inter_session",
|
kind: "inter_session",
|
||||||
sourceTool: "sessions_send",
|
sourceTool: "sessions_send",
|
||||||
|
|||||||
Reference in New Issue
Block a user