mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-25 01:09:36 +00:00
fix(copilot): retain completed compaction sessions
This commit is contained in:
@@ -185,6 +185,17 @@ field; OpenClaw does not infer it from assistant prose. The helper intentionally
|
||||
leaves prompt errors, in-flight turns, and intentional silent replies such as
|
||||
`NO_REPLY` unclassified.
|
||||
|
||||
### Agent-end side effects
|
||||
|
||||
Native harnesses must call `runAgentEndSideEffects(...)` from
|
||||
`openclaw/plugin-sdk/agent-harness-runtime` after they finalize an attempt. It
|
||||
dispatches the portable `agent_end` hook and OpenClaw's research capture without
|
||||
delaying interactive replies. Use `awaitAgentEndSideEffects(...)` for local,
|
||||
non-interactive runs where the attempt must not resolve until those side effects
|
||||
finish. Both helpers accept the same `{ event, ctx }` payload as
|
||||
`runAgentHarnessAgentEndHook(...)`; their failures do not alter the completed
|
||||
attempt result.
|
||||
|
||||
### Native Codex harness mode
|
||||
|
||||
The bundled `codex` harness is the native Codex mode for embedded OpenClaw
|
||||
|
||||
@@ -529,8 +529,8 @@ describe("createCopilotAgentHarness", () => {
|
||||
});
|
||||
|
||||
it("aborts deferred compaction cleanup before disposal", async () => {
|
||||
const cleanup = createDeferred<void>();
|
||||
const abort = vi.fn(() => cleanup.resolve());
|
||||
const cleanup = createDeferred<"aborted" | "completed" | "deadline">();
|
||||
const abort = vi.fn(() => cleanup.resolve("aborted"));
|
||||
mocks.runCopilotAttempt.mockImplementation(async (_params, deps) => {
|
||||
deps.onSessionEstablished?.({
|
||||
sdkSessionId: "sdk-sess-pending-cleanup",
|
||||
@@ -553,8 +553,8 @@ describe("createCopilotAgentHarness", () => {
|
||||
});
|
||||
|
||||
it("aborts deferred compaction cleanup when the OpenClaw session resets", async () => {
|
||||
const cleanup = createDeferred<void>();
|
||||
const abort = vi.fn(() => cleanup.resolve());
|
||||
const cleanup = createDeferred<"aborted" | "completed" | "deadline">();
|
||||
const abort = vi.fn(() => cleanup.resolve("aborted"));
|
||||
mocks.runCopilotAttempt.mockImplementation(async (_params, deps) => {
|
||||
deps.onSessionEstablished?.({
|
||||
sdkSessionId: "sdk-sess-reset-cleanup",
|
||||
@@ -623,9 +623,10 @@ describe("createCopilotAgentHarness", () => {
|
||||
expect(secondCallParams.initialReplayState?.replayInvalid).toBeUndefined();
|
||||
});
|
||||
|
||||
it("starts fresh while timed-out compaction retains the prior SDK session", async () => {
|
||||
it("blocks reuse while timed-out compaction is pending, then resumes after completion", async () => {
|
||||
const pool = makePoolMock();
|
||||
const sessionStore = makeSessionStoreMock();
|
||||
const cleanup = createDeferred<"aborted" | "completed" | "deadline">();
|
||||
let attempt = 0;
|
||||
mocks.runCopilotAttempt.mockImplementation(async (_params, deps) => {
|
||||
attempt += 1;
|
||||
@@ -637,7 +638,7 @@ describe("createCopilotAgentHarness", () => {
|
||||
});
|
||||
deps.onTimedOutCompaction?.({
|
||||
abort: () => undefined,
|
||||
cleanup: Promise.resolve(),
|
||||
cleanup: cleanup.promise,
|
||||
sdkSessionId: "sdk-sess-compacting",
|
||||
});
|
||||
}
|
||||
@@ -648,6 +649,50 @@ describe("createCopilotAgentHarness", () => {
|
||||
await harness.runAttempt(makeAttemptParams({ runId: "t1" }));
|
||||
await harness.runAttempt(makeAttemptParams({ runId: "t2" }));
|
||||
|
||||
const secondCallParams = mocks.runCopilotAttempt.mock.calls[1]?.[0] as {
|
||||
initialReplayState?: { sdkSessionId?: string };
|
||||
};
|
||||
expect(secondCallParams.initialReplayState?.sdkSessionId).toBeUndefined();
|
||||
|
||||
cleanup.resolve("completed");
|
||||
await flushAsyncWork();
|
||||
|
||||
await harness.runAttempt(makeAttemptParams({ runId: "t3" }));
|
||||
const thirdCallParams = mocks.runCopilotAttempt.mock.calls[2]?.[0] as {
|
||||
initialReplayState?: { sdkSessionId?: string };
|
||||
};
|
||||
expect(thirdCallParams.initialReplayState?.sdkSessionId).toBe("sdk-sess-compacting");
|
||||
expect(sessionStore.store.delete).not.toHaveBeenCalledWith("oc-sess-reuse");
|
||||
});
|
||||
|
||||
it("invalidates the retained SDK binding when deferred compaction is cancelled", async () => {
|
||||
const pool = makePoolMock();
|
||||
const sessionStore = makeSessionStoreMock();
|
||||
const cleanup = createDeferred<"aborted" | "completed" | "deadline">();
|
||||
let attempt = 0;
|
||||
mocks.runCopilotAttempt.mockImplementation(async (_params, deps) => {
|
||||
attempt += 1;
|
||||
if (attempt === 1) {
|
||||
deps.onSessionEstablished?.({
|
||||
sdkSessionId: "sdk-sess-cancelled",
|
||||
pooledClient: { key: {} as any, client: {} as any },
|
||||
sessionConfig: TEST_SESSION_CONFIG,
|
||||
});
|
||||
deps.onTimedOutCompaction?.({
|
||||
abort: () => undefined,
|
||||
cleanup: cleanup.promise,
|
||||
sdkSessionId: "sdk-sess-cancelled",
|
||||
});
|
||||
}
|
||||
return ATTEMPT_RESULT;
|
||||
});
|
||||
const harness = createCopilotAgentHarness({ pool, sessionStore: sessionStore.store });
|
||||
|
||||
await harness.runAttempt(makeAttemptParams({ runId: "t1" }));
|
||||
cleanup.resolve("aborted");
|
||||
await flushAsyncWork();
|
||||
|
||||
await harness.runAttempt(makeAttemptParams({ runId: "t2" }));
|
||||
const secondCallParams = mocks.runCopilotAttempt.mock.calls[1]?.[0] as {
|
||||
initialReplayState?: { sdkSessionId?: string };
|
||||
};
|
||||
|
||||
@@ -94,6 +94,7 @@ type LegacyCopilotSessionBinding = {
|
||||
};
|
||||
|
||||
type CopilotAttemptSessionBinding = Pick<CopilotSessionBinding, "compatKey" | "sdkSessionId">;
|
||||
type TimedOutCompactionCleanupOutcome = "aborted" | "completed" | "deadline";
|
||||
|
||||
type CopilotSessionBindingStore = Pick<
|
||||
PluginStateSyncKeyedStore<CopilotSessionBinding>,
|
||||
@@ -424,7 +425,10 @@ export function createCopilotAgentHarness(
|
||||
let disposed = false;
|
||||
let disposePromise: Promise<void> | undefined;
|
||||
const inFlight = new Set<Promise<unknown>>();
|
||||
const timedOutCompactionCleanups = new Map<string, Map<Promise<void>, () => void>>();
|
||||
const timedOutCompactionCleanups = new Map<
|
||||
string,
|
||||
Map<Promise<TimedOutCompactionCleanupOutcome>, () => void>
|
||||
>();
|
||||
// Maps OpenClaw session id (from AgentHarnessAttemptParams.sessionId) to
|
||||
// the SDK session id + client that owns it. Populated by
|
||||
// runCopilotAttempt via the onSessionEstablished callback so that
|
||||
@@ -448,11 +452,12 @@ export function createCopilotAgentHarness(
|
||||
|
||||
function trackTimedOutCompactionCleanup(params: {
|
||||
abort: () => void;
|
||||
cleanup: Promise<void>;
|
||||
cleanup: Promise<TimedOutCompactionCleanupOutcome>;
|
||||
sessionId: string;
|
||||
}): void {
|
||||
const cleanups =
|
||||
timedOutCompactionCleanups.get(params.sessionId) ?? new Map<Promise<void>, () => void>();
|
||||
timedOutCompactionCleanups.get(params.sessionId) ??
|
||||
new Map<Promise<TimedOutCompactionCleanupOutcome>, () => void>();
|
||||
cleanups.set(params.cleanup, params.abort);
|
||||
timedOutCompactionCleanups.set(params.sessionId, cleanups);
|
||||
void params.cleanup.then(
|
||||
@@ -461,7 +466,10 @@ export function createCopilotAgentHarness(
|
||||
);
|
||||
}
|
||||
|
||||
function removeTimedOutCompactionCleanup(sessionId: string, cleanup: Promise<void>): void {
|
||||
function removeTimedOutCompactionCleanup(
|
||||
sessionId: string,
|
||||
cleanup: Promise<TimedOutCompactionCleanupOutcome>,
|
||||
): void {
|
||||
const cleanups = timedOutCompactionCleanups.get(sessionId);
|
||||
if (!cleanups) {
|
||||
return;
|
||||
@@ -544,9 +552,14 @@ export function createCopilotAgentHarness(
|
||||
// surfaces as a prompt error.
|
||||
const currentCompatKey = computeSessionCompatKey(params);
|
||||
const currentCompactKey = computeSessionCompactKey(params);
|
||||
const tracked = openclawSessionId ? trackedSessions.get(openclawSessionId) : undefined;
|
||||
const compactionCleanupPending =
|
||||
openclawSessionId !== undefined && timedOutCompactionCleanups.has(openclawSessionId);
|
||||
const tracked =
|
||||
openclawSessionId && !compactionCleanupPending
|
||||
? trackedSessions.get(openclawSessionId)
|
||||
: undefined;
|
||||
const stored = openclawSessionId
|
||||
? resetBlockedStoredSessions.has(openclawSessionId)
|
||||
? compactionCleanupPending || resetBlockedStoredSessions.has(openclawSessionId)
|
||||
? undefined
|
||||
: lookupStoredBinding(options?.sessionStore, openclawSessionId)
|
||||
: undefined;
|
||||
@@ -608,13 +621,11 @@ export function createCopilotAgentHarness(
|
||||
sdkSessionId,
|
||||
}: {
|
||||
abort: () => void;
|
||||
cleanup: Promise<void>;
|
||||
cleanup: Promise<TimedOutCompactionCleanupOutcome>;
|
||||
sdkSessionId: string;
|
||||
}) => {
|
||||
const tracked = trackedSessions.get(openclawSessionId);
|
||||
const stored = resetBlockedStoredSessions.has(openclawSessionId)
|
||||
? undefined
|
||||
: lookupStoredBinding(options?.sessionStore, openclawSessionId);
|
||||
const stored = lookupStoredBinding(options?.sessionStore, openclawSessionId);
|
||||
const ownsTrackedSession = tracked?.sdkSessionId === sdkSessionId;
|
||||
const ownsStoredSession = stored?.sdkSessionId === sdkSessionId;
|
||||
trackTimedOutCompactionCleanup({
|
||||
@@ -626,14 +637,33 @@ export function createCopilotAgentHarness(
|
||||
return;
|
||||
}
|
||||
// The timed-out attempt retains this SDK session until its
|
||||
// background compaction resolves. Do not let a new turn resume it.
|
||||
if (ownsTrackedSession) {
|
||||
trackedSessions.delete(openclawSessionId);
|
||||
}
|
||||
if (ownsStoredSession) {
|
||||
deleteStoredBinding(options?.sessionStore, openclawSessionId);
|
||||
}
|
||||
// background compaction resolves. Preserve its binding for a
|
||||
// successful completion, but do not let a new turn resume it yet.
|
||||
resetBlockedStoredSessions.add(openclawSessionId);
|
||||
void cleanup.then((outcome) => {
|
||||
const currentTracked = trackedSessions.get(openclawSessionId);
|
||||
const currentStored = lookupStoredBinding(
|
||||
options?.sessionStore,
|
||||
openclawSessionId,
|
||||
);
|
||||
const stillOwnsTrackedSession = currentTracked?.sdkSessionId === sdkSessionId;
|
||||
const stillOwnsStoredSession = currentStored?.sdkSessionId === sdkSessionId;
|
||||
if (outcome === "completed") {
|
||||
if (stillOwnsTrackedSession || stillOwnsStoredSession) {
|
||||
resetBlockedStoredSessions.delete(openclawSessionId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (stillOwnsTrackedSession) {
|
||||
trackedSessions.delete(openclawSessionId);
|
||||
}
|
||||
if (stillOwnsStoredSession) {
|
||||
deleteStoredBinding(options?.sessionStore, openclawSessionId);
|
||||
}
|
||||
if (stillOwnsTrackedSession || stillOwnsStoredSession) {
|
||||
resetBlockedStoredSessions.add(openclawSessionId);
|
||||
}
|
||||
});
|
||||
}
|
||||
: undefined,
|
||||
});
|
||||
|
||||
@@ -1738,7 +1738,7 @@ describe("runCopilotAttempt", () => {
|
||||
expect(sdk.sessions[0]?.disconnect).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
expect(sdk.client.deleteSession).toHaveBeenCalledWith("sess-1");
|
||||
expect(sdk.client.deleteSession).not.toHaveBeenCalled();
|
||||
expect(afterCompaction).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ compactedCount: 3, sessionFile: "session.json" }),
|
||||
expect.objectContaining({ runId: "run-1", sessionId: "session-1" }),
|
||||
@@ -1794,6 +1794,7 @@ describe("runCopilotAttempt", () => {
|
||||
});
|
||||
|
||||
expect(sdk.sessions[0]?.rpc.history.cancelBackgroundCompaction).toHaveBeenCalledTimes(1);
|
||||
expect(sdk.client.deleteSession).toHaveBeenCalledWith("sess-1");
|
||||
});
|
||||
|
||||
it("keeps the compaction timeout classification after deferred completion", async () => {
|
||||
|
||||
@@ -146,7 +146,7 @@ export interface CopilotAttemptDeps {
|
||||
*/
|
||||
onTimedOutCompaction?: (info: {
|
||||
abort: () => void;
|
||||
cleanup: Promise<void>;
|
||||
cleanup: Promise<"aborted" | "completed" | "deadline">;
|
||||
sdkSessionId: string;
|
||||
}) => void;
|
||||
}
|
||||
@@ -219,12 +219,13 @@ function deferTimedOutCompactionCleanup(params: {
|
||||
sdkSessionId?: string;
|
||||
session: SessionLike;
|
||||
timeoutMs: number;
|
||||
}): Promise<void> {
|
||||
}): Promise<"aborted" | "completed" | "deadline"> {
|
||||
// sendAndWait can time out while the SDK continues background compaction.
|
||||
// Keep its bridge attached so after_compaction uses the originating run context.
|
||||
return (async () => {
|
||||
let outcome: "aborted" | "completed" | "deadline" = "deadline";
|
||||
try {
|
||||
const outcome = await awaitCompactionCompletionBeforeDeadline({
|
||||
outcome = await awaitCompactionCompletionBeforeDeadline({
|
||||
abortSignal: params.abortSignal,
|
||||
bridge: params.bridge,
|
||||
timeoutMs: params.timeoutMs,
|
||||
@@ -241,7 +242,7 @@ function deferTimedOutCompactionCleanup(params: {
|
||||
} catch {
|
||||
// The attempt has already returned its timeout result.
|
||||
}
|
||||
if (params.sdkSessionId) {
|
||||
if (outcome !== "completed" && params.sdkSessionId) {
|
||||
try {
|
||||
await params.handle.client.deleteSession(params.sdkSessionId);
|
||||
} catch {
|
||||
@@ -254,6 +255,7 @@ function deferTimedOutCompactionCleanup(params: {
|
||||
// The pool will dispose this client later if its release cannot complete.
|
||||
}
|
||||
}
|
||||
return outcome;
|
||||
})();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user