mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-05 13:30:22 +00:00
fix: dedupe delivered subagent completion announces (#61525) (thanks @100yenadmin)
* fix(subagents): dedupe delivered completion announces * refactor(subagents): distill cleanup delivery status writes * fix: dedupe delivered subagent completion announces (#61525) (thanks @100yenadmin) --------- Co-authored-by: Eva <eva@100yen.org> Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -289,6 +289,186 @@ describe("subagent registry lifecycle hardening", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("does not re-run announce flow after completion was already delivered", async () => {
|
||||
const entry = createRunEntry({
|
||||
completionAnnouncedAt: 3_500,
|
||||
endedAt: 4_000,
|
||||
});
|
||||
const persist = vi.fn();
|
||||
const runSubagentAnnounceFlow = vi.fn(async () => true);
|
||||
const notifyContextEngineSubagentEnded = vi.fn(async () => {});
|
||||
|
||||
const controller = createSubagentRegistryLifecycleController({
|
||||
runs: new Map([[entry.runId, entry]]),
|
||||
resumedRuns: new Set(),
|
||||
subagentAnnounceTimeoutMs: 1_000,
|
||||
persist,
|
||||
clearPendingLifecycleError: vi.fn(),
|
||||
countPendingDescendantRuns: () => 0,
|
||||
suppressAnnounceForSteerRestart: () => false,
|
||||
shouldEmitEndedHookForRun: () => false,
|
||||
emitSubagentEndedHookForRun: vi.fn(async () => {}),
|
||||
notifyContextEngineSubagentEnded,
|
||||
resumeSubagentRun: vi.fn(),
|
||||
captureSubagentCompletionReply: vi.fn(async () => "final completion reply"),
|
||||
runSubagentAnnounceFlow,
|
||||
warn: vi.fn(),
|
||||
});
|
||||
|
||||
await expect(
|
||||
controller.completeSubagentRun({
|
||||
runId: entry.runId,
|
||||
endedAt: 4_000,
|
||||
outcome: { status: "ok" },
|
||||
reason: SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
triggerCleanup: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
|
||||
expect(typeof entry.cleanupCompletedAt).toBe("number");
|
||||
expect(entry.cleanupCompletedAt).toBeGreaterThan(0);
|
||||
expect(notifyContextEngineSubagentEnded).toHaveBeenCalledWith({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
reason: "completed",
|
||||
workspaceDir: entry.workspaceDir,
|
||||
});
|
||||
expect(persist).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("emits ended hook while retrying cleanup after completion was already delivered", async () => {
|
||||
const entry = createRunEntry({
|
||||
completionAnnouncedAt: 3_500,
|
||||
endedAt: 4_000,
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
const emitSubagentEndedHookForRun = vi.fn(async () => {});
|
||||
|
||||
const controller = createSubagentRegistryLifecycleController({
|
||||
runs: new Map([[entry.runId, entry]]),
|
||||
resumedRuns: new Set(),
|
||||
subagentAnnounceTimeoutMs: 1_000,
|
||||
persist: vi.fn(),
|
||||
clearPendingLifecycleError: vi.fn(),
|
||||
countPendingDescendantRuns: () => 0,
|
||||
suppressAnnounceForSteerRestart: () => false,
|
||||
shouldEmitEndedHookForRun: () => true,
|
||||
emitSubagentEndedHookForRun,
|
||||
notifyContextEngineSubagentEnded: vi.fn(async () => {}),
|
||||
resumeSubagentRun: vi.fn(),
|
||||
captureSubagentCompletionReply: vi.fn(async () => "final completion reply"),
|
||||
runSubagentAnnounceFlow: vi.fn(async () => true),
|
||||
warn: vi.fn(),
|
||||
});
|
||||
|
||||
await expect(
|
||||
controller.completeSubagentRun({
|
||||
runId: entry.runId,
|
||||
endedAt: 4_000,
|
||||
outcome: { status: "ok" },
|
||||
reason: SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
triggerCleanup: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(emitSubagentEndedHookForRun).toHaveBeenCalledTimes(1);
|
||||
expect(emitSubagentEndedHookForRun).toHaveBeenCalledWith({
|
||||
entry,
|
||||
reason: SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
sendFarewell: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("produces valid cleanupCompletedAt on give-up path when completionAnnouncedAt is undefined", async () => {
|
||||
const persist = vi.fn();
|
||||
const entry = createRunEntry({
|
||||
endedAt: 4_000,
|
||||
expectsCompletionMessage: false,
|
||||
retainAttachmentsOnKeep: true,
|
||||
});
|
||||
|
||||
const controller = createSubagentRegistryLifecycleController({
|
||||
runs: new Map([[entry.runId, entry]]),
|
||||
resumedRuns: new Set(),
|
||||
subagentAnnounceTimeoutMs: 1_000,
|
||||
persist,
|
||||
clearPendingLifecycleError: vi.fn(),
|
||||
countPendingDescendantRuns: () => 0,
|
||||
suppressAnnounceForSteerRestart: () => false,
|
||||
shouldEmitEndedHookForRun: () => false,
|
||||
emitSubagentEndedHookForRun: vi.fn(async () => {}),
|
||||
notifyContextEngineSubagentEnded: vi.fn(async () => {}),
|
||||
resumeSubagentRun: vi.fn(),
|
||||
captureSubagentCompletionReply: vi.fn(async () => undefined),
|
||||
runSubagentAnnounceFlow: vi.fn(async () => true),
|
||||
warn: vi.fn(),
|
||||
});
|
||||
|
||||
expect(entry.completionAnnouncedAt).toBeUndefined();
|
||||
|
||||
await controller.finalizeResumedAnnounceGiveUp({
|
||||
runId: entry.runId,
|
||||
entry,
|
||||
reason: "retry-limit",
|
||||
});
|
||||
|
||||
expect(entry.cleanupCompletedAt).toBeTypeOf("number");
|
||||
expect(Number.isNaN(entry.cleanupCompletedAt)).toBe(false);
|
||||
});
|
||||
|
||||
it("continues cleanup when delivery-status persistence throws after announce delivery", async () => {
|
||||
const persist = vi.fn();
|
||||
const warn = vi.fn();
|
||||
const emitSubagentEndedHookForRun = vi.fn(async () => {});
|
||||
const entry = createRunEntry({
|
||||
endedAt: 4_000,
|
||||
expectsCompletionMessage: false,
|
||||
retainAttachmentsOnKeep: false,
|
||||
});
|
||||
taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId.mockImplementation(() => {
|
||||
throw new Error("delivery status boom");
|
||||
});
|
||||
|
||||
const controller = createSubagentRegistryLifecycleController({
|
||||
runs: new Map([[entry.runId, entry]]),
|
||||
resumedRuns: new Set(),
|
||||
subagentAnnounceTimeoutMs: 1_000,
|
||||
persist,
|
||||
clearPendingLifecycleError: vi.fn(),
|
||||
countPendingDescendantRuns: () => 0,
|
||||
suppressAnnounceForSteerRestart: () => false,
|
||||
shouldEmitEndedHookForRun: () => true,
|
||||
emitSubagentEndedHookForRun,
|
||||
notifyContextEngineSubagentEnded: vi.fn(async () => {}),
|
||||
resumeSubagentRun: vi.fn(),
|
||||
captureSubagentCompletionReply: vi.fn(async () => "final completion reply"),
|
||||
runSubagentAnnounceFlow: vi.fn(async () => true),
|
||||
warn,
|
||||
});
|
||||
|
||||
await expect(
|
||||
controller.completeSubagentRun({
|
||||
runId: entry.runId,
|
||||
endedAt: 4_000,
|
||||
outcome: { status: "ok" },
|
||||
reason: SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
triggerCleanup: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
"failed to update subagent background task delivery state",
|
||||
expect.objectContaining({
|
||||
error: { name: "Error", message: "delivery status boom" },
|
||||
deliveryStatus: "delivered",
|
||||
}),
|
||||
);
|
||||
expect(emitSubagentEndedHookForRun).toHaveBeenCalledTimes(1);
|
||||
expect(helperMocks.safeRemoveAttachmentsDir).toHaveBeenCalledTimes(1);
|
||||
expect(entry.cleanupCompletedAt).toBeTypeOf("number");
|
||||
expect(persist).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips browser cleanup when steer restart suppresses cleanup flow", async () => {
|
||||
const entry = createRunEntry({
|
||||
expectsCompletionMessage: false,
|
||||
|
||||
@@ -94,7 +94,7 @@ export function createSubagentRegistryLifecycleController(params: {
|
||||
const safeSetSubagentTaskDeliveryStatus = (args: {
|
||||
runId: string;
|
||||
childSessionKey: string;
|
||||
deliveryStatus: "failed";
|
||||
deliveryStatus: "delivered" | "failed";
|
||||
}) => {
|
||||
try {
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
@@ -362,16 +362,22 @@ export function createSubagentRegistryLifecycleController(params: {
|
||||
runId: string,
|
||||
cleanup: "delete" | "keep",
|
||||
didAnnounce: boolean,
|
||||
options?: {
|
||||
skipAnnounce?: boolean;
|
||||
},
|
||||
) => {
|
||||
const entry = params.runs.get(runId);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
if (didAnnounce) {
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
if (!options?.skipAnnounce) {
|
||||
entry.completionAnnouncedAt = Date.now();
|
||||
params.persist();
|
||||
}
|
||||
safeSetSubagentTaskDeliveryStatus({
|
||||
runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: entry.childSessionKey,
|
||||
childSessionKey: entry.childSessionKey,
|
||||
deliveryStatus: "delivered",
|
||||
});
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
@@ -426,10 +432,9 @@ export function createSubagentRegistryLifecycleController(params: {
|
||||
}
|
||||
|
||||
if (deferredDecision.kind === "give-up") {
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
safeSetSubagentTaskDeliveryStatus({
|
||||
runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: entry.childSessionKey,
|
||||
childSessionKey: entry.childSessionKey,
|
||||
deliveryStatus: "failed",
|
||||
});
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
@@ -463,6 +468,23 @@ export function createSubagentRegistryLifecycleController(params: {
|
||||
};
|
||||
|
||||
const startSubagentAnnounceCleanupFlow = (runId: string, entry: SubagentRunRecord): boolean => {
|
||||
if (typeof entry.completionAnnouncedAt === "number") {
|
||||
if (!beginSubagentCleanup(runId)) {
|
||||
return false;
|
||||
}
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, true, {
|
||||
skipAnnounce: true,
|
||||
}).catch((err) => {
|
||||
defaultRuntime.log(`[warn] subagent cleanup finalize failed (${runId}): ${String(err)}`);
|
||||
const current = params.runs.get(runId);
|
||||
if (!current || current.cleanupCompletedAt) {
|
||||
return;
|
||||
}
|
||||
current.cleanupHandled = false;
|
||||
params.persist();
|
||||
});
|
||||
return true;
|
||||
}
|
||||
if (!beginSubagentCleanup(runId)) {
|
||||
return false;
|
||||
}
|
||||
@@ -536,6 +558,7 @@ export function createSubagentRegistryLifecycleController(params: {
|
||||
entry.suppressAnnounceReason = undefined;
|
||||
entry.cleanupHandled = false;
|
||||
entry.cleanupCompletedAt = undefined;
|
||||
entry.completionAnnouncedAt = undefined;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
|
||||
@@ -237,6 +237,7 @@ export function createSubagentRunManager(params: {
|
||||
: undefined,
|
||||
cleanupCompletedAt: undefined,
|
||||
cleanupHandled: false,
|
||||
completionAnnouncedAt: undefined,
|
||||
suppressAnnounceReason: undefined,
|
||||
announceRetryCount: undefined,
|
||||
lastAnnounceRetryAt: undefined,
|
||||
@@ -308,6 +309,7 @@ export function createSubagentRunManager(params: {
|
||||
accumulatedRuntimeMs: 0,
|
||||
archiveAtMs,
|
||||
cleanupHandled: false,
|
||||
completionAnnouncedAt: undefined,
|
||||
wakeOnDescendantSettle: undefined,
|
||||
attachmentsDir: registerParams.attachmentsDir,
|
||||
attachmentsRootDir: registerParams.attachmentsRootDir,
|
||||
|
||||
@@ -18,11 +18,8 @@ export type SubagentRunRecord = {
|
||||
runTimeoutSeconds?: number;
|
||||
spawnMode?: SpawnSubagentMode;
|
||||
createdAt: number;
|
||||
/** Start time of the current run attempt. */
|
||||
startedAt?: number;
|
||||
/** Stable start time for the child session across follow-up runs. */
|
||||
sessionStartedAt?: number;
|
||||
/** Accumulated runtime from prior completed runs for this child session. */
|
||||
accumulatedRuntimeMs?: number;
|
||||
endedAt?: number;
|
||||
outcome?: SubagentRunOutcome;
|
||||
@@ -31,32 +28,16 @@ export type SubagentRunRecord = {
|
||||
cleanupHandled?: boolean;
|
||||
suppressAnnounceReason?: "steer-restart" | "killed";
|
||||
expectsCompletionMessage?: boolean;
|
||||
/** Number of announce delivery attempts that returned false (deferred). */
|
||||
announceRetryCount?: number;
|
||||
/** Timestamp of the last announce retry attempt (for backoff). */
|
||||
lastAnnounceRetryAt?: number;
|
||||
/** Terminal lifecycle reason recorded when the run finishes. */
|
||||
endedReason?: SubagentLifecycleEndedReason;
|
||||
/** Run ended while descendants were still pending and should be re-invoked once they settle. */
|
||||
wakeOnDescendantSettle?: boolean;
|
||||
/**
|
||||
* Latest frozen completion output captured for announce delivery.
|
||||
* Seeded at first end transition and refreshed by later assistant turns
|
||||
* while completion delivery is still pending for this session.
|
||||
*/
|
||||
frozenResultText?: string | null;
|
||||
/** Timestamp when frozenResultText was last captured. */
|
||||
frozenResultCapturedAt?: number;
|
||||
/**
|
||||
* Fallback completion output preserved across wake continuation restarts.
|
||||
* Used when a late wake run replies with NO_REPLY after the real final
|
||||
* summary was already produced by the prior run.
|
||||
*/
|
||||
fallbackFrozenResultText?: string | null;
|
||||
/** Timestamp when fallbackFrozenResultText was preserved. */
|
||||
fallbackFrozenResultCapturedAt?: number;
|
||||
/** Set after the subagent_ended hook has been emitted successfully once. */
|
||||
endedHookEmittedAt?: number;
|
||||
completionAnnouncedAt?: number;
|
||||
attachmentsDir?: string;
|
||||
attachmentsRootDir?: string;
|
||||
retainAttachmentsOnKeep?: boolean;
|
||||
|
||||
Reference in New Issue
Block a user