fix(acpx): retry backend health probes after ensure (#58612)

* fix(acpx): retry backend health probes after ensure

* fix(acpx): keep doctor checks diagnostic-only
This commit is contained in:
zssggle-rgb
2026-04-01 09:10:09 +08:00
committed by GitHub
parent 8e0f495197
commit a37c66906c
2 changed files with 169 additions and 5 deletions

View File

@@ -23,6 +23,11 @@ vi.mock("./ensure.js", () => ({
type RuntimeStub = AcpRuntime & {
probeAvailability(): Promise<void>;
isHealthy(): boolean;
doctor?(): Promise<{
ok: boolean;
message: string;
details?: string[];
}>;
};
function createRuntimeStub(healthy: boolean): {
@@ -56,6 +61,56 @@ function createRuntimeStub(healthy: boolean): {
};
}
function createRetryingRuntimeStub(
healthSequence: boolean[],
doctorReport: { ok: boolean; message: string; details?: string[] } = {
ok: false,
message: "acpx help check failed",
details: ["stderr=temporary startup race"],
},
): {
runtime: RuntimeStub;
probeAvailabilitySpy: ReturnType<typeof vi.fn>;
isHealthySpy: ReturnType<typeof vi.fn>;
doctorSpy: ReturnType<typeof vi.fn>;
} {
let probeCount = 0;
const probeAvailabilitySpy = vi.fn(async () => {
probeCount += 1;
});
const isHealthySpy = vi.fn(() => {
const index = Math.max(0, probeCount - 1);
return healthSequence[Math.min(index, healthSequence.length - 1)] ?? false;
});
const doctorSpy = vi.fn(async () => doctorReport);
return {
runtime: {
ensureSession: vi.fn(async (input) => ({
sessionKey: input.sessionKey,
backend: "acpx",
runtimeSessionName: input.sessionKey,
})),
runTurn: vi.fn(async function* () {
yield { type: "done" as const };
}),
cancel: vi.fn(async () => {}),
close: vi.fn(async () => {}),
async probeAvailability() {
await probeAvailabilitySpy();
},
isHealthy() {
return isHealthySpy();
},
async doctor() {
return await doctorSpy();
},
},
probeAvailabilitySpy,
isHealthySpy,
doctorSpy,
};
}
function createServiceContext(
overrides: Partial<OpenClawPluginServiceContext> = {},
): OpenClawPluginServiceContext {
@@ -108,6 +163,7 @@ describe("createAcpxRuntimeService", () => {
const { runtime } = createRuntimeStub(false);
const service = createAcpxRuntimeService({
runtimeFactory: () => runtime,
healthProbeRetryDelaysMs: [],
});
const context = createServiceContext();
@@ -205,4 +261,54 @@ describe("createAcpxRuntimeService", () => {
fs.rmSync(tempRoot, { recursive: true, force: true });
}
});
it("retries health probes until the runtime becomes healthy", async () => {
const { runtime, probeAvailabilitySpy, doctorSpy } = createRetryingRuntimeStub([
false,
false,
true,
]);
const service = createAcpxRuntimeService({
runtimeFactory: () => runtime,
healthProbeRetryDelaysMs: [0, 0],
});
const context = createServiceContext();
await service.start(context);
await vi.waitFor(() => {
expect(probeAvailabilitySpy).toHaveBeenCalledTimes(3);
});
expect(doctorSpy).toHaveBeenCalledTimes(2);
expect(context.logger.warn).toHaveBeenCalledWith(
expect.stringContaining("probe attempt 1 failed"),
);
expect(context.logger.info).toHaveBeenCalledWith(
"acpx runtime backend ready after 3 probe attempts",
);
});
it("does not treat doctor ok as healthy when the runtime still reports unhealthy", async () => {
const { runtime, probeAvailabilitySpy, doctorSpy } = createRetryingRuntimeStub([false], {
ok: true,
message: "acpx help check passed",
});
const service = createAcpxRuntimeService({
runtimeFactory: () => runtime,
healthProbeRetryDelaysMs: [],
});
const context = createServiceContext();
await service.start(context);
await vi.waitFor(() => {
expect(probeAvailabilitySpy).toHaveBeenCalledOnce();
expect(doctorSpy).toHaveBeenCalledOnce();
expect(context.logger.warn).toHaveBeenCalledWith(
"acpx runtime backend probe failed: acpx help check passed",
);
});
expect(context.logger.info).not.toHaveBeenCalledWith("acpx runtime backend ready");
expect(() => requireAcpRuntimeBackend("acpx")).toThrowError(AcpRuntimeError);
});
});

View File

@@ -13,6 +13,11 @@ import { ACPX_BACKEND_ID, AcpxRuntime } from "./runtime.js";
type AcpxRuntimeLike = AcpRuntime & {
probeAvailability(): Promise<void>;
isHealthy(): boolean;
doctor?(): Promise<{
ok: boolean;
message: string;
details?: string[];
}>;
};
type AcpxRuntimeFactoryParams = {
@@ -24,8 +29,25 @@ type AcpxRuntimeFactoryParams = {
type CreateAcpxRuntimeServiceParams = {
pluginConfig?: unknown;
runtimeFactory?: (params: AcpxRuntimeFactoryParams) => AcpxRuntimeLike;
healthProbeRetryDelaysMs?: number[];
};
const DEFAULT_HEALTH_PROBE_RETRY_DELAYS_MS = [250, 1_000, 2_500];
function delay(ms: number): Promise<void> {
if (ms <= 0) {
return Promise.resolve();
}
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
function formatDoctorFailureMessage(report: { message: string; details?: string[] }): string {
const detailText = report.details?.filter(Boolean).join("; ").trim();
return detailText ? `${report.message} (${detailText})` : report.message;
}
function createDefaultRuntime(params: AcpxRuntimeFactoryParams): AcpxRuntimeLike {
return new AcpxRuntime(params.pluginConfig, {
logger: params.logger,
@@ -49,6 +71,8 @@ export function createAcpxRuntimeService(
if (ctx.workspaceDir?.trim()) {
await fs.mkdir(ctx.workspaceDir, { recursive: true });
}
const healthProbeRetryDelaysMs =
params.healthProbeRetryDelaysMs ?? DEFAULT_HEALTH_PROBE_RETRY_DELAYS_MS;
const runtimeFactory = params.runtimeFactory ?? createDefaultRuntime;
runtime = runtimeFactory({
pluginConfig,
@@ -84,12 +108,46 @@ export function createAcpxRuntimeService(
if (currentRevision !== lifecycleRevision) {
return;
}
await runtime?.probeAvailability();
if (runtime?.isHealthy()) {
ctx.logger.info("acpx runtime backend ready");
} else {
ctx.logger.warn("acpx runtime backend probe failed after local install");
let lastFailureMessage: string | undefined;
for (let attempt = 0; attempt <= healthProbeRetryDelaysMs.length; attempt += 1) {
await runtime?.probeAvailability();
if (currentRevision !== lifecycleRevision) {
return;
}
if (runtime?.isHealthy()) {
ctx.logger.info(
attempt === 0
? "acpx runtime backend ready"
: `acpx runtime backend ready after ${attempt + 1} probe attempts`,
);
return;
}
const doctorReport = await runtime?.doctor?.();
if (currentRevision !== lifecycleRevision) {
return;
}
if (doctorReport) {
lastFailureMessage = formatDoctorFailureMessage(doctorReport);
} else {
lastFailureMessage = "acpx runtime backend remained unhealthy after probe";
}
const retryDelayMs = healthProbeRetryDelaysMs[attempt];
if (retryDelayMs == null) {
break;
}
ctx.logger.warn(
`acpx runtime backend probe attempt ${attempt + 1} failed: ${lastFailureMessage}; retrying in ${retryDelayMs}ms`,
);
await delay(retryDelayMs);
if (currentRevision !== lifecycleRevision) {
return;
}
}
ctx.logger.warn(
`acpx runtime backend probe failed: ${lastFailureMessage ?? "backend remained unhealthy after setup"}`,
);
} catch (err) {
if (currentRevision !== lifecycleRevision) {
return;