From 07edaffb0448d140803485ac431bcae4b87d8b36 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 11 Apr 2026 02:50:45 +0100 Subject: [PATCH] fix: finalize OpenAI replay liveness landing --- CHANGELOG.md | 1 + ...edded-subscribe.handlers.lifecycle.test.ts | 20 ++++++++++++ ...i-embedded-subscribe.handlers.lifecycle.ts | 7 +++- .../pi-embedded-subscribe.handlers.types.ts | 1 + ...session.subscribeembeddedpisession.test.ts | 32 +++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 6 ++++ 6 files changed, 66 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6dc25b373c..973edfc2438 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai - Matrix/partial streaming: add MSC4357 live markers to draft preview sends and edits so supporting Matrix clients can render a live/typewriter animation and stop it when the final edit lands. (#63513) Thanks @TigerInYourDream. - Control UI/dreaming: simplify the Scene and Diary surfaces, preserve unknown phase state for partial status payloads, and stabilize waiting-entry recency ordering so Dreaming status and review lists stay clear and deterministic. (#64035) Thanks @davemorin. - Agents: add an opt-in strict-agentic embedded Pi execution contract for GPT-5-family runs so plan-only or filler turns keep acting until they hit a real blocker. (#64241) Thanks @100yenadmin. +- Agents/OpenAI: add provider-owned OpenAI/Codex tool schema compatibility and surface embedded-run replay/liveness state for long-running runs. (#64300) Thanks @100yenadmin. - Docs i18n: chunk raw doc translation, reject truncated tagged outputs, avoid ambiguous body-only wrapper unwrapping, and recover from terminated Pi translation sessions without changing the default `openai/gpt-5.4` path. (#62969, #63808) Thanks @hxy91819. ### Fixes diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts index c4fd69d18dd..4d62425d629 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts @@ -259,6 +259,26 @@ describe("handleAgentEnd", () => { }); }); + it("keeps accumulated deterministic side effects from being marked abandoned", async () => { + const onAgentEvent = vi.fn(); + const ctx = createContext(undefined, { onAgentEvent }); + ctx.state.replayState = { ...ctx.state.replayState, replayInvalid: true }; + ctx.state.livenessState = "working"; + ctx.state.assistantTexts = []; + ctx.state.hadDeterministicSideEffect = true; + + await handleAgentEnd(ctx); + + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { + phase: "end", + livenessState: "working", + replayInvalid: true, + }, + }); + }); + it("flushes orphaned tool media as a media-only block reply", async () => { const ctx = createContext(undefined); ctx.state.pendingToolMediaUrls = ["/tmp/reply.opus"]; diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index c8cd7bf5a70..58275c0c051 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -43,6 +43,11 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< const hasAssistantVisibleText = Array.isArray(ctx.state.assistantTexts) && ctx.state.assistantTexts.some((text) => hasAssistantVisibleReply({ text })); + const hadDeterministicSideEffect = + ctx.state.hadDeterministicSideEffect === true || + (ctx.state.messagingToolSentTexts?.length ?? 0) > 0 || + (ctx.state.messagingToolSentMediaUrls?.length ?? 0) > 0 || + (ctx.state.successfulCronAdds ?? 0) > 0; const incompleteTerminalAssistant = isIncompleteTerminalAssistantTurn({ hasAssistantVisibleText, lastAssistant: isAssistantMessage(lastAssistant) ? lastAssistant : null, @@ -51,7 +56,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< ctx.state.replayState.replayInvalid || incompleteTerminalAssistant ? true : undefined; const derivedWorkingTerminalState = isError ? "blocked" - : replayInvalid && !hasAssistantVisibleText + : replayInvalid && !hasAssistantVisibleText && !hadDeterministicSideEffect ? "abandoned" : ctx.state.livenessState; const livenessState = diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 4ae7064b185..f81d4163669 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -68,6 +68,7 @@ export type EmbeddedPiSubscribeState = { unsubscribed: boolean; replayState: EmbeddedRunReplayState; livenessState?: EmbeddedRunLivenessState; + hadDeterministicSideEffect?: boolean; messagingToolSentTexts: string[]; messagingToolSentTextsNormalized: string[]; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 81197911748..38e9d99f6ec 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -589,4 +589,36 @@ describe("subscribeEmbeddedPiSession", () => { }), ); }); + + it("preserves deterministic side-effect liveness across compaction retries", () => { + const { session, emit } = createStubSessionHarness(); + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run-cron-side-effect-compaction", + onAgentEvent, + sessionKey: "test-session", + }); + + emitToolRun({ + emit, + toolName: "cron", + toolCallId: "cron-1", + args: { action: "add", job: { name: "reminder" } }, + isError: false, + result: { details: { status: "ok" } }, + }); + emit({ type: "auto_compaction_end", willRetry: true, result: { summary: "compacted" } }); + emit({ type: "agent_end" }); + + const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls); + expect(payloads).toContainEqual( + expect.objectContaining({ + phase: "end", + livenessState: "working", + replayInvalid: true, + }), + ); + }); }); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 7a9451907d8..0d61a8e0446 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -111,6 +111,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar unsubscribed: false, replayState: createEmbeddedRunReplayState(params.initialReplayState), livenessState: "working", + hadDeterministicSideEffect: false, messagingToolSentTexts: [], messagingToolSentTextsNormalized: [], messagingToolSentTargets: [], @@ -678,6 +679,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar }; const resetForCompactionRetry = () => { + state.hadDeterministicSideEffect = + state.hadDeterministicSideEffect === true || + messagingToolSentTexts.length > 0 || + messagingToolSentMediaUrls.length > 0 || + state.successfulCronAdds > 0; assistantTexts.length = 0; toolMetas.length = 0; toolMetaById.clear();