mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix(release): harden subagent completion delivery
This commit is contained in:
@@ -207,7 +207,7 @@ describe("telegram inbound media", () => {
|
||||
},
|
||||
},
|
||||
assert: (payload: Record<string, unknown>) => {
|
||||
expect(payload.Body).toContain("Eiffel Tower");
|
||||
expect(payload.Body).toContain("48.858844");
|
||||
expect(payload.LocationName).toBe("Eiffel Tower");
|
||||
expect(payload.LocationAddress).toBe("Champ de Mars, Paris");
|
||||
expect(payload.LocationSource).toBe("place");
|
||||
|
||||
@@ -75,6 +75,65 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
|
||||
let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps =
|
||||
defaultSubagentAnnounceDeliveryDeps;
|
||||
|
||||
function resolveBoundConversationOrigin(params: {
|
||||
bindingConversation: ConversationRef & { parentConversationId?: string };
|
||||
requesterConversation?: ConversationRef;
|
||||
requesterOrigin?: DeliveryContext;
|
||||
}): DeliveryContext {
|
||||
const conversation = params.bindingConversation;
|
||||
const conversationId = conversation.conversationId?.trim() ?? "";
|
||||
const parentConversationId = conversation.parentConversationId?.trim() ?? "";
|
||||
const requesterConversationId = params.requesterConversation?.conversationId?.trim() ?? "";
|
||||
const requesterTo = params.requesterOrigin?.to?.trim();
|
||||
if (
|
||||
conversation.channel === "matrix" &&
|
||||
parentConversationId &&
|
||||
requesterConversationId &&
|
||||
parentConversationId === requesterConversationId &&
|
||||
requesterTo
|
||||
) {
|
||||
return {
|
||||
channel: conversation.channel,
|
||||
accountId: conversation.accountId,
|
||||
to: requesterTo,
|
||||
...(conversationId ? { threadId: conversationId } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
const boundTarget = resolveConversationDeliveryTarget({
|
||||
channel: conversation.channel,
|
||||
conversationId,
|
||||
parentConversationId,
|
||||
});
|
||||
if (
|
||||
requesterTo &&
|
||||
conversationId &&
|
||||
requesterConversationId &&
|
||||
conversationId.toLowerCase() === requesterConversationId.toLowerCase()
|
||||
) {
|
||||
return {
|
||||
channel: conversation.channel,
|
||||
accountId: conversation.accountId,
|
||||
to: requesterTo,
|
||||
threadId:
|
||||
boundTarget.threadId ??
|
||||
(params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== ""
|
||||
? String(params.requesterOrigin.threadId)
|
||||
: undefined),
|
||||
};
|
||||
}
|
||||
return {
|
||||
channel: conversation.channel,
|
||||
accountId: conversation.accountId,
|
||||
to: boundTarget.to,
|
||||
threadId:
|
||||
boundTarget.threadId ??
|
||||
(params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== ""
|
||||
? String(params.requesterOrigin.threadId)
|
||||
: undefined),
|
||||
};
|
||||
}
|
||||
|
||||
function resolveRequesterSessionActivity(requesterSessionKey: string) {
|
||||
const activity = subagentAnnounceDeliveryDeps.getRequesterSessionActivity(requesterSessionKey);
|
||||
if (activity.sessionId || activity.isActive) {
|
||||
@@ -243,22 +302,12 @@ export async function resolveSubagentCompletionOrigin(params: {
|
||||
failClosed: false,
|
||||
});
|
||||
if (route.mode === "bound" && route.binding) {
|
||||
const boundTarget = resolveConversationDeliveryTarget({
|
||||
channel: route.binding.conversation.channel,
|
||||
conversationId: route.binding.conversation.conversationId,
|
||||
parentConversationId: route.binding.conversation.parentConversationId,
|
||||
});
|
||||
return mergeDeliveryContext(
|
||||
{
|
||||
channel: route.binding.conversation.channel,
|
||||
accountId: route.binding.conversation.accountId,
|
||||
to: boundTarget.to,
|
||||
threadId:
|
||||
boundTarget.threadId ??
|
||||
(requesterOrigin?.threadId != null && requesterOrigin.threadId !== ""
|
||||
? String(requesterOrigin.threadId)
|
||||
: undefined),
|
||||
},
|
||||
resolveBoundConversationOrigin({
|
||||
bindingConversation: route.binding.conversation,
|
||||
requesterConversation,
|
||||
requesterOrigin,
|
||||
}),
|
||||
requesterOrigin,
|
||||
);
|
||||
}
|
||||
@@ -489,7 +538,7 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
? normalizedSessionOnlyOriginChannel
|
||||
: undefined;
|
||||
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
|
||||
if (params.expectsCompletionMessage && requesterActivity.isActive) {
|
||||
if (params.expectsCompletionMessage && requesterActivity.sessionId) {
|
||||
const woke = requesterActivity.sessionId
|
||||
? subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage(
|
||||
requesterActivity.sessionId,
|
||||
@@ -502,11 +551,13 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
path: "steered",
|
||||
};
|
||||
}
|
||||
return {
|
||||
delivered: false,
|
||||
path: "direct",
|
||||
error: "active requester session could not be woken",
|
||||
};
|
||||
if (requesterActivity.isActive) {
|
||||
return {
|
||||
delivered: false,
|
||||
path: "direct",
|
||||
error: "active requester session could not be woken",
|
||||
};
|
||||
}
|
||||
}
|
||||
if (params.signal?.aborted) {
|
||||
return {
|
||||
|
||||
@@ -63,10 +63,6 @@ export type SubagentRunOutcome = {
|
||||
elapsedMs?: number;
|
||||
};
|
||||
|
||||
function isFailedOutcome(outcome?: SubagentRunOutcome): boolean {
|
||||
return outcome?.status === "error";
|
||||
}
|
||||
|
||||
function readFiniteNumber(value: number | undefined): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
@@ -156,6 +152,9 @@ function extractSubagentOutputText(message: unknown): string {
|
||||
const role = (message as { role?: unknown }).role;
|
||||
const content = (message as { content?: unknown }).content;
|
||||
if (role === "assistant") {
|
||||
if (typeof content === "string") {
|
||||
return sanitizeTextContent(content);
|
||||
}
|
||||
return extractAssistantText(message) ?? "";
|
||||
}
|
||||
if (role === "toolResult" || role === "tool") {
|
||||
@@ -257,9 +256,6 @@ function selectSubagentOutputText(
|
||||
snapshot: SubagentOutputSnapshot,
|
||||
outcome?: SubagentRunOutcome,
|
||||
): string | undefined {
|
||||
if (isFailedOutcome(outcome)) {
|
||||
return undefined;
|
||||
}
|
||||
if (snapshot.latestSilentText) {
|
||||
return snapshot.latestSilentText;
|
||||
}
|
||||
@@ -277,9 +273,6 @@ export async function readSubagentOutput(
|
||||
sessionKey: string,
|
||||
outcome?: SubagentRunOutcome,
|
||||
): Promise<string | undefined> {
|
||||
if (isFailedOutcome(outcome)) {
|
||||
return undefined;
|
||||
}
|
||||
const history = await subagentAnnounceOutputDeps.callGateway({
|
||||
method: "chat.history",
|
||||
params: { sessionKey, limit: 100 },
|
||||
@@ -359,9 +352,6 @@ export async function captureSubagentCompletionReply(
|
||||
sessionKey: string,
|
||||
options?: { waitForReply?: boolean; outcome?: SubagentRunOutcome },
|
||||
): Promise<string | undefined> {
|
||||
if (isFailedOutcome(options?.outcome)) {
|
||||
return undefined;
|
||||
}
|
||||
return await captureSubagentCompletionReplyUsing({
|
||||
sessionKey,
|
||||
waitForReply: options?.waitForReply,
|
||||
|
||||
@@ -249,6 +249,17 @@ describe("subagent announce formatting", () => {
|
||||
callGateway: async <T = Record<string, unknown>>(
|
||||
req: Parameters<typeof gatewayCall.callGateway>[0],
|
||||
) => (await callGatewaySpy(req)) as T,
|
||||
loadConfig: () => configOverride,
|
||||
getRequesterSessionActivity: (requesterSessionKey: string) => {
|
||||
const entry = loadSessionStoreFixture()[requesterSessionKey];
|
||||
const sessionId = entry?.sessionId;
|
||||
return {
|
||||
sessionId,
|
||||
isActive: Boolean(sessionId && embeddedRunMock.isEmbeddedPiRunActive(sessionId)),
|
||||
};
|
||||
},
|
||||
queueEmbeddedPiMessage: (sessionId: string, text: string) =>
|
||||
embeddedRunMock.queueEmbeddedPiMessage(sessionId, text),
|
||||
});
|
||||
loadSessionStoreSpy.mockReset().mockImplementation(() => loadSessionStoreFixture());
|
||||
resolveAgentIdFromSessionKeySpy.mockReset().mockImplementation(() => "main");
|
||||
|
||||
@@ -286,10 +286,11 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
outcome = { status: "unknown" };
|
||||
}
|
||||
const failedTerminalOutcome = outcome.status === "error";
|
||||
const allowFailedOutputCapture =
|
||||
!failedTerminalOutcome || (!params.roundOneReply && !params.fallbackReply);
|
||||
if (failedTerminalOutcome) {
|
||||
reply = undefined;
|
||||
}
|
||||
|
||||
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
|
||||
const requesterIsInternalSession = () =>
|
||||
requesterDepth >= 1 || isCronSessionKey(targetRequesterSessionKey);
|
||||
@@ -370,17 +371,19 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
}
|
||||
}
|
||||
|
||||
if (!childCompletionFindings && !failedTerminalOutcome) {
|
||||
const fallbackReply = normalizeOptionalString(params.fallbackReply);
|
||||
if (!childCompletionFindings) {
|
||||
const fallbackReply = failedTerminalOutcome
|
||||
? undefined
|
||||
: normalizeOptionalString(params.fallbackReply);
|
||||
const fallbackIsSilent =
|
||||
Boolean(fallbackReply) &&
|
||||
(isAnnounceSkip(fallbackReply) || isSilentReplyText(fallbackReply, SILENT_REPLY_TOKEN));
|
||||
|
||||
if (!reply) {
|
||||
if (!reply && allowFailedOutputCapture) {
|
||||
reply = await readSubagentOutput(params.childSessionKey, outcome);
|
||||
}
|
||||
|
||||
if (!reply?.trim()) {
|
||||
if (!reply?.trim() && allowFailedOutputCapture) {
|
||||
reply = await readLatestSubagentOutputWithRetry({
|
||||
sessionKey: params.childSessionKey,
|
||||
maxWaitMs: params.timeoutMs,
|
||||
|
||||
@@ -104,6 +104,10 @@ vi.mock("../plugins/hook-runner-global.js", () => ({
|
||||
getGlobalHookRunner: vi.fn(() => null),
|
||||
}));
|
||||
|
||||
vi.mock("../browser-lifecycle-cleanup.js", () => ({
|
||||
cleanupBrowserSessionsForLifecycleEnd: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-depth.js", () => ({
|
||||
getSubagentDepthFromSessionStore: () => 0,
|
||||
}));
|
||||
@@ -178,6 +182,13 @@ describe("subagent registry lifecycle error grace", () => {
|
||||
subagentAnnounceDeliveryTesting.setDepsForTest({
|
||||
callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway,
|
||||
loadConfig: loadConfigMock as typeof import("../config/config.js").loadConfig,
|
||||
getRequesterSessionActivity: (requesterSessionKey: string) => {
|
||||
const entry = sessionStore[requesterSessionKey];
|
||||
return {
|
||||
sessionId: entry?.sessionId,
|
||||
isActive: false,
|
||||
};
|
||||
},
|
||||
});
|
||||
subagentAnnounceOutputTesting.setDepsForTest({
|
||||
callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway,
|
||||
@@ -457,6 +468,7 @@ describe("subagent registry lifecycle error grace", () => {
|
||||
emitLifecycleEvent("run-refresh-silent", { phase: "end", endedAt });
|
||||
await flushAsync();
|
||||
await waitForCleanupHandledFalse("run-refresh-silent");
|
||||
await waitForFrozenResultText("run-refresh-silent", "All work complete, final summary");
|
||||
|
||||
setAssistantOutput("agent:main:subagent:refresh-silent", "NO_REPLY");
|
||||
emitLifecycleEvent(
|
||||
|
||||
@@ -35,19 +35,26 @@ export async function prepareGatewayPluginBootstrap(params: {
|
||||
}
|
||||
: params.cfgAtStart;
|
||||
|
||||
if (!params.minimalTestGateway) {
|
||||
await Promise.all([
|
||||
const shouldRunStartupMaintenance =
|
||||
!params.minimalTestGateway || startupMaintenanceConfig.channels !== undefined;
|
||||
if (shouldRunStartupMaintenance) {
|
||||
const startupTasks = [
|
||||
runChannelPluginStartupMaintenance({
|
||||
cfg: startupMaintenanceConfig,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
}),
|
||||
runStartupSessionMigration({
|
||||
cfg: params.cfgAtStart,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
}),
|
||||
]);
|
||||
];
|
||||
if (!params.minimalTestGateway) {
|
||||
startupTasks.push(
|
||||
runStartupSessionMigration({
|
||||
cfg: params.cfgAtStart,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
}),
|
||||
);
|
||||
}
|
||||
await Promise.all(startupTasks);
|
||||
}
|
||||
|
||||
initSubagentRegistry();
|
||||
|
||||
Reference in New Issue
Block a user