fix: address all review comments on PR #47719 + implement resume context and config idempotency guard

This commit is contained in:
Joey Krug
2026-03-15 23:05:08 -04:00
committed by Peter Steinberger
parent 7a5f2b128c
commit f109f3d4c2
3 changed files with 201 additions and 15 deletions

View File

@@ -19,6 +19,14 @@ vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
}));
vi.mock("../gateway/session-utils.fs.js", () => ({
readSessionMessages: vi.fn(() => []),
}));
vi.mock("./subagent-registry.js", () => ({
replaceSubagentRunAfterSteer: vi.fn(() => true),
}));
function createTestRunRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
return {
runId: "run-1",
@@ -45,6 +53,7 @@ describe("subagent-orphan-recovery", () => {
it("recovers orphaned sessions with abortedLastRun=true", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
const subagentRegistry = await import("./subagent-registry.js");
const sessionEntry = {
sessionId: "session-abc",
@@ -78,6 +87,10 @@ describe("subagent-orphan-recovery", () => {
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1");
expect(params.message).toContain("gateway reload");
expect(params.message).toContain("Test task: implement feature X");
expect(subagentRegistry.replaceSubagentRunAfterSteer).toHaveBeenCalledWith({
previousRunId: "run-1",
nextRunId: "test-run-id",
});
});
it("skips sessions that are not aborted", async () => {
@@ -321,4 +334,100 @@ describe("subagent-orphan-recovery", () => {
expect(message.length).toBeLessThan(5000);
expect(message).toContain("...");
});
it("includes last human message in resume when available", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
const sessionUtils = await import("../gateway/session-utils.fs.js");
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: true,
sessionFile: "session-abc.jsonl",
},
});
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
{ role: "user", content: [{ type: "text", text: "Please build feature Y" }] },
{ role: "assistant", content: [{ type: "text", text: "Working on it..." }] },
{ role: "user", content: [{ type: "text", text: "Also add tests for it" }] },
{ role: "assistant", content: [{ type: "text", text: "Sure, adding tests now." }] },
]);
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
const params = callArgs[0].params as Record<string, unknown>;
const message = params.message as string;
expect(message).toContain("Also add tests for it");
expect(message).toContain("last message from the user");
});
it("adds config change hint when assistant messages reference config modifications", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
const sessionUtils = await import("../gateway/session-utils.fs.js");
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: true,
},
});
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
{ role: "user", content: "Update the config" },
{ role: "assistant", content: "I've modified openclaw.json to add the new setting." },
]);
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
const params = callArgs[0].params as Record<string, unknown>;
const message = params.message as string;
expect(message).toContain("config changes from your previous run were already applied");
});
it("prevents duplicate resume when updateSessionStore fails", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
vi.mocked(sessions.updateSessionStore).mockRejectedValue(new Error("write failed"));
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: true,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
activeRuns.set(
"run-2",
createTestRunRecord({
runId: "run-2",
}),
);
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
expect(result.recovered).toBe(1);
expect(result.skipped).toBe(1);
expect(gateway.callGateway).toHaveBeenCalledOnce();
});
});

View File

@@ -19,7 +19,9 @@ import {
type SessionEntry,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { readSessionMessages } from "../gateway/session-utils.fs.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { replaceSubagentRunAfterSteer } from "./subagent-registry.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
const log = createSubsystemLogger("subagent-orphan-recovery");
@@ -30,14 +32,45 @@ const DEFAULT_RECOVERY_DELAY_MS = 5_000;
/**
* Build the resume message for an orphaned subagent.
*/
function buildResumeMessage(task: string): string {
function buildResumeMessage(task: string, lastHumanMessage?: string): string {
const maxTaskLen = 2000;
const truncatedTask = task.length > maxTaskLen ? `${task.slice(0, maxTaskLen)}...` : task;
return (
let message =
`[System] Your previous turn was interrupted by a gateway reload. ` +
`Your task was:\n\n${truncatedTask}\n\nPlease continue where you left off.`
);
`Your original task was:\n\n${truncatedTask}\n\n`;
if (lastHumanMessage) {
message += `The last message from the user before the interruption was:\n\n${lastHumanMessage}\n\n`;
}
message += `Please continue where you left off.`;
return message;
}
function extractMessageText(msg: unknown): string | undefined {
if (!msg || typeof msg !== "object") {
return undefined;
}
const m = msg as Record<string, unknown>;
if (typeof m.content === "string") {
return m.content;
}
if (Array.isArray(m.content)) {
const text = m.content
.filter(
(c: unknown) =>
typeof c === "object" &&
c !== null &&
(c as Record<string, unknown>).type === "text" &&
typeof (c as Record<string, unknown>).text === "string",
)
.map((c: unknown) => (c as Record<string, string>).text)
.filter(Boolean)
.join("\n");
return text || undefined;
}
return undefined;
}
/**
@@ -46,11 +79,17 @@ function buildResumeMessage(task: string): string {
async function resumeOrphanedSession(params: {
sessionKey: string;
task: string;
lastHumanMessage?: string;
configChangeHint?: string;
originalRunId: string;
}): Promise<boolean> {
const resumeMessage = buildResumeMessage(params.task);
let resumeMessage = buildResumeMessage(params.task, params.lastHumanMessage);
if (params.configChangeHint) {
resumeMessage += params.configChangeHint;
}
try {
await callGateway<{ runId: string }>({
const result = await callGateway<{ runId: string }>({
method: "agent",
params: {
message: resumeMessage,
@@ -61,6 +100,10 @@ async function resumeOrphanedSession(params: {
},
timeoutMs: 10_000,
});
replaceSubagentRunAfterSteer({
previousRunId: params.originalRunId,
nextRunId: result.runId,
});
log.info(`resumed orphaned session: ${params.sessionKey}`);
return true;
} catch (err) {
@@ -84,6 +127,8 @@ export async function recoverOrphanedSubagentSessions(params: {
getActiveRuns: () => Map<string, SubagentRunRecord>;
}): Promise<{ recovered: number; failed: number; skipped: number }> {
const result = { recovered: 0, failed: 0, skipped: 0 };
const resumedSessionKeys = new Set<string>();
const configChangePattern = /openclaw\.json|openclaw gateway restart|config\.patch/i;
try {
const activeRuns = params.getActiveRuns();
@@ -104,6 +149,10 @@ export async function recoverOrphanedSubagentSessions(params: {
if (!childSessionKey) {
continue;
}
if (resumedSessionKeys.has(childSessionKey)) {
result.skipped++;
continue;
}
try {
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
@@ -129,6 +178,18 @@ export async function recoverOrphanedSubagentSessions(params: {
log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`);
const messages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile);
const lastHumanMessage = [...messages]
.toReversed()
.find((msg) => (msg as { role?: unknown } | null)?.role === "user");
const configChangeDetected = messages.some((msg) => {
if ((msg as { role?: unknown } | null)?.role !== "assistant") {
return false;
}
const text = extractMessageText(msg);
return typeof text === "string" && configChangePattern.test(text);
});
// Resume the session with the original task context.
// We intentionally do NOT clear abortedLastRun before attempting
// the resume — if callGateway fails (e.g. gateway still booting),
@@ -136,18 +197,30 @@ export async function recoverOrphanedSubagentSessions(params: {
const resumed = await resumeOrphanedSession({
sessionKey: childSessionKey,
task: runRecord.task,
lastHumanMessage: extractMessageText(lastHumanMessage),
configChangeHint: configChangeDetected
? "\n\n[config changes from your previous run were already applied — do not re-modify openclaw.json or restart the gateway]"
: undefined,
originalRunId: runId,
});
if (resumed) {
resumedSessionKeys.add(childSessionKey);
// Only clear the aborted flag after confirmed successful resume.
await updateSessionStore(storePath, (currentStore) => {
const current = currentStore[childSessionKey];
if (current) {
current.abortedLastRun = false;
current.updatedAt = Date.now();
currentStore[childSessionKey] = current;
}
});
try {
await updateSessionStore(storePath, (currentStore) => {
const current = currentStore[childSessionKey];
if (current) {
current.abortedLastRun = false;
current.updatedAt = Date.now();
currentStore[childSessionKey] = current;
}
});
} catch (err) {
log.warn(
`resume succeeded but failed to update session store for ${childSessionKey}: ${String(err)}`,
);
}
result.recovered++;
} else {
// Flag stays as abortedLastRun=true so next restart can retry

View File

@@ -50,8 +50,12 @@ function resolveGatewayPortFallback(): Promise<number> {
}
async function assertUnmanagedGatewayRestartEnabled(port: number): Promise<void> {
const cfg = await readBestEffortConfig().catch(() => undefined);
const tlsEnabled = !!(cfg as { gateway?: { tls?: { enabled?: unknown } } } | undefined)?.gateway
?.tls?.enabled;
const scheme = tlsEnabled ? "wss" : "ws";
const probe = await probeGateway({
url: `ws://127.0.0.1:${port}`,
url: `${scheme}://127.0.0.1:${port}`,
auth: {
token: process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined,
password: process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() || undefined,