diff --git a/src/agents/pi-embedded-runner/replay-state.ts b/src/agents/pi-embedded-runner/replay-state.ts new file mode 100644 index 00000000000..19e68d11821 --- /dev/null +++ b/src/agents/pi-embedded-runner/replay-state.ts @@ -0,0 +1,49 @@ +export type EmbeddedRunReplayState = { + replayInvalid: boolean; + hadPotentialSideEffects: boolean; +}; + +export type EmbeddedRunReplayMetadata = { + hadPotentialSideEffects: boolean; + replaySafe: boolean; +}; + +export function createEmbeddedRunReplayState( + state?: Partial, +): EmbeddedRunReplayState { + return { + replayInvalid: state?.replayInvalid === true, + hadPotentialSideEffects: state?.hadPotentialSideEffects === true, + }; +} + +export function mergeEmbeddedRunReplayState( + current: EmbeddedRunReplayState, + next?: Partial, +): EmbeddedRunReplayState { + if (!next) { + return current; + } + return { + replayInvalid: current.replayInvalid || next.replayInvalid === true, + hadPotentialSideEffects: + current.hadPotentialSideEffects || next.hadPotentialSideEffects === true, + }; +} + +export function observeReplayMetadata( + current: EmbeddedRunReplayState, + metadata: EmbeddedRunReplayMetadata, +): EmbeddedRunReplayState { + return mergeEmbeddedRunReplayState(current, { + replayInvalid: !metadata.replaySafe, + hadPotentialSideEffects: metadata.hadPotentialSideEffects, + }); +} + +export function replayMetadataFromState(state: EmbeddedRunReplayState): EmbeddedRunReplayMetadata { + return { + hadPotentialSideEffects: state.hadPotentialSideEffects, + replaySafe: !state.replayInvalid && !state.hadPotentialSideEffects, + }; +} diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index b4408221a4e..840ac3d18d4 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -76,6 +76,7 @@ import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { resolveModelAsync } from "./model.js"; +import { createEmbeddedRunReplayState, observeReplayMetadata } from "./replay-state.js"; import { handleAssistantFailover } from "./run/assistant-failover.js"; import { createEmbeddedRunAuthController } from "./run/auth-controller.js"; import { runEmbeddedAttemptWithBackend } from "./run/backend.js"; @@ -564,8 +565,7 @@ export async function runEmbeddedPiAgent( } }; let authRetryPending = false; - let accumulatedReplayInvalid = false; - let accumulatedHadPotentialSideEffects = false; + let accumulatedReplayState = createEmbeddedRunReplayState(); // Hoisted so the retry-limit error path can use the most recent API total. let lastTurnTotal: number | undefined; while (true) { @@ -598,7 +598,7 @@ export async function runEmbeddedPiAgent( lastRunPromptUsage, lastTurnTotal, }), - replayInvalid: accumulatedReplayInvalid ? true : undefined, + replayInvalid: accumulatedReplayState.replayInvalid ? true : undefined, livenessState: "blocked", }); } @@ -676,8 +676,7 @@ export async function runEmbeddedPiAgent( resolvedApiKey: resolvedStreamApiKey, authProfileId: lastProfileId, authProfileIdSource: lockedProfileId ? "user" : "auto", - initialReplayInvalid: accumulatedReplayInvalid, - initialHadPotentialSideEffects: accumulatedHadPotentialSideEffects, + initialReplayState: accumulatedReplayState, authStorage, modelRegistry, agentId: workspaceResolution.agentId, @@ -754,17 +753,18 @@ export async function runEmbeddedPiAgent( model: modelId, }); const resolveReplayInvalidForAttempt = (incompleteTurnText?: string | null) => - accumulatedReplayInvalid || + accumulatedReplayState.replayInvalid || resolveReplayInvalidFlag({ attempt, incompleteTurnText, }); if (resolveReplayInvalidForAttempt(null)) { - accumulatedReplayInvalid = true; - } - if (attempt.replayMetadata.hadPotentialSideEffects) { - accumulatedHadPotentialSideEffects = true; + accumulatedReplayState = { ...accumulatedReplayState, replayInvalid: true }; } + accumulatedReplayState = observeReplayMetadata( + accumulatedReplayState, + attempt.replayMetadata, + ); const formattedAssistantErrorText = lastAssistant ? formatAssistantErrorText(lastAssistant, { cfg: params.config, diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts index a343ad5ad62..537e15f505d 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts @@ -87,8 +87,10 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { getMessagingToolSentMediaUrls: () => [] as string[], getMessagingToolSentTargets: () => [] as MessagingToolSend[], getSuccessfulCronAdds: () => 0, - getReplayInvalid: () => false, - getHadPotentialSideEffects: () => false, + getReplayState: () => ({ + replayInvalid: false, + hadPotentialSideEffects: false, + }), didSendViaMessagingTool: () => false, didSendDeterministicApprovalPrompt: () => false, getLastToolError: () => undefined, @@ -629,8 +631,10 @@ export function createSubscriptionMock(): SubscriptionMock { getMessagingToolSentMediaUrls: () => [] as string[], getMessagingToolSentTargets: () => [] as MessagingToolSend[], getSuccessfulCronAdds: () => 0, - getReplayInvalid: () => false, - getHadPotentialSideEffects: () => false, + getReplayState: () => ({ + replayInvalid: false, + hadPotentialSideEffects: false, + }), didSendViaMessagingTool: () => false, didSendDeterministicApprovalPrompt: () => false, getLastToolError: () => undefined, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 41f9e020685..d6669819815 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -125,6 +125,7 @@ import { } from "../prompt-cache-observability.js"; import { resolveCacheRetention } from "../prompt-cache-retention.js"; import { sanitizeSessionHistory, validateReplayTurns } from "../replay-history.js"; +import { observeReplayMetadata, replayMetadataFromState } from "../replay-state.js"; import { clearActiveEmbeddedRun, type EmbeddedPiQueueHandle, @@ -1408,8 +1409,7 @@ export async function runEmbeddedAttempt( buildEmbeddedSubscriptionParams({ session: activeSession, runId: params.runId, - initialReplayInvalid: params.initialReplayInvalid, - initialHadPotentialSideEffects: params.initialHadPotentialSideEffects, + initialReplayState: params.initialReplayState, hookRunner: getGlobalHookRunner() ?? undefined, verboseLevel: params.verboseLevel, reasoningMode: params.reasoningLevel ?? "off", @@ -1447,8 +1447,7 @@ export async function runEmbeddedAttempt( getMessagingToolSentMediaUrls, getMessagingToolSentTargets, getSuccessfulCronAdds, - getReplayInvalid, - getHadPotentialSideEffects, + getReplayState, didSendViaMessagingTool, getLastToolError, setTerminalLifecycleMeta, @@ -2280,11 +2279,9 @@ export async function runEmbeddedAttempt( didSendViaMessagingTool: didSendViaMessagingTool(), successfulCronAdds: getSuccessfulCronAdds(), }); - const replayMetadata = { - hadPotentialSideEffects: - observedReplayMetadata.hadPotentialSideEffects || getHadPotentialSideEffects(), - replaySafe: observedReplayMetadata.replaySafe && !getReplayInvalid(), - }; + const replayMetadata = replayMetadataFromState( + observeReplayMetadata(getReplayState(), observedReplayMetadata), + ); return { replayMetadata, diff --git a/src/agents/pi-embedded-runner/run/incomplete-turn.ts b/src/agents/pi-embedded-runner/run/incomplete-turn.ts index dceb721120e..1ab460b8305 100644 --- a/src/agents/pi-embedded-runner/run/incomplete-turn.ts +++ b/src/agents/pi-embedded-runner/run/incomplete-turn.ts @@ -40,6 +40,13 @@ type RunLivenessAttempt = Pick< "lastAssistant" | "promptErrorSource" | "replayMetadata" | "timedOutDuringCompaction" >; +export function isIncompleteTerminalAssistantTurn(params: { + hasAssistantVisibleText: boolean; + lastAssistant?: { stopReason?: string } | null; +}): boolean { + return !params.hasAssistantVisibleText && params.lastAssistant?.stopReason === "toolUse"; +} + const PLANNING_ONLY_PROMISE_RE = /\b(?:i(?:'ll| will)|let me|going to|first[, ]+i(?:'ll| will)|next[, ]+i(?:'ll| will)|i can do that)\b/i; const PLANNING_ONLY_COMPLETION_RE = @@ -133,7 +140,11 @@ export function resolveIncompleteTurnPayloadText(params: { } const stopReason = params.attempt.lastAssistant?.stopReason; - if (stopReason !== "toolUse" && stopReason !== "error") { + const incompleteTerminalAssistant = isIncompleteTerminalAssistantTurn({ + hasAssistantVisibleText: params.payloadCount > 0, + lastAssistant: params.attempt.lastAssistant, + }); + if (!incompleteTerminalAssistant && stopReason !== "error") { return null; } diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 4b0c0e4ea72..a377aa5f493 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -8,6 +8,7 @@ import type { PluginHookBeforeAgentStartResult } from "../../../plugins/types.js import type { MessagingToolSend } from "../../pi-embedded-messaging.js"; import type { ToolErrorSummary } from "../../tool-error-summary.js"; import type { NormalizedUsage } from "../../usage.js"; +import type { EmbeddedRunReplayMetadata, EmbeddedRunReplayState } from "../replay-state.js"; import type { EmbeddedRunLivenessState } from "../types.js"; import type { RunEmbeddedPiAgentParams } from "./params.js"; import type { PreemptiveCompactionRoute } from "./preemptive-compaction.js"; @@ -18,8 +19,7 @@ type EmbeddedRunAttemptBase = Omit< >; export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & { - initialReplayInvalid?: boolean; - initialHadPotentialSideEffects?: boolean; + initialReplayState?: EmbeddedRunReplayState; /** Pluggable context engine for ingest/assemble/compact lifecycle. */ contextEngine?: ContextEngine; /** Resolved model context window in tokens for assemble/compact budgeting. */ @@ -92,10 +92,7 @@ export type EmbeddedRunAttemptResult = { clientToolCall?: { name: string; params: Record }; /** True when sessions_yield tool was called during this attempt. */ yieldDetected?: boolean; - replayMetadata: { - hadPotentialSideEffects: boolean; - replaySafe: boolean; - }; + replayMetadata: EmbeddedRunReplayMetadata; itemLifecycle: { startedCount: number; completedCount: number; diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts index 3e5c495af25..c4fd69d18dd 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts @@ -29,6 +29,7 @@ function createContext( pendingCompactionRetry: 0, pendingToolMediaUrls: [], pendingToolAudioAsVoice: false, + replayState: { replayInvalid: false, hadPotentialSideEffects: false }, blockState: { thinking: true, final: true, @@ -196,7 +197,7 @@ describe("handleAgentEnd", () => { it("surfaces replay-invalid paused lifecycle end state when present", async () => { const onAgentEvent = vi.fn(); const ctx = createContext(undefined, { onAgentEvent }); - ctx.state.replayInvalid = true; + ctx.state.replayState = { ...ctx.state.replayState, replayInvalid: true }; ctx.state.livenessState = "paused"; await handleAgentEnd(ctx); @@ -214,7 +215,7 @@ describe("handleAgentEnd", () => { it("derives abandoned lifecycle end state when replay-invalid work finished without a reply", async () => { const onAgentEvent = vi.fn(); const ctx = createContext(undefined, { onAgentEvent }); - ctx.state.replayInvalid = true; + ctx.state.replayState = { ...ctx.state.replayState, replayInvalid: true }; ctx.state.livenessState = "working"; ctx.state.assistantTexts = []; ctx.state.messagingToolSentTexts = []; diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index 8d33bf3a843..c8cd7bf5a70 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -6,6 +6,7 @@ import { sanitizeForConsole, } from "./pi-embedded-error-observation.js"; import { classifyFailoverReason, formatAssistantErrorText } from "./pi-embedded-helpers.js"; +import { isIncompleteTerminalAssistantTurn } from "./pi-embedded-runner/run/incomplete-turn.js"; import { consumePendingToolMediaReply, hasAssistantVisibleReply, @@ -42,12 +43,12 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise< const hasAssistantVisibleText = Array.isArray(ctx.state.assistantTexts) && ctx.state.assistantTexts.some((text) => hasAssistantVisibleReply({ text })); - const incompleteTerminalAssistant = - !hasAssistantVisibleText && - isAssistantMessage(lastAssistant) && - lastAssistant.stopReason === "toolUse"; + const incompleteTerminalAssistant = isIncompleteTerminalAssistantTurn({ + hasAssistantVisibleText, + lastAssistant: isAssistantMessage(lastAssistant) ? lastAssistant : null, + }); const replayInvalid = - ctx.state.replayInvalid === true || incompleteTerminalAssistant ? true : undefined; + ctx.state.replayState.replayInvalid || incompleteTerminalAssistant ? true : undefined; const derivedWorkingTerminalState = isError ? "blocked" : replayInvalid && !hasAssistantVisibleText diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts index 31be03ed8f2..38612bb7015 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts @@ -48,8 +48,7 @@ function createTestContext(): { pendingToolMediaUrls: [], pendingToolAudioAsVoice: false, deterministicApprovalPromptPending: false, - replayInvalid: false, - hadPotentialSideEffects: false, + replayState: { replayInvalid: false, hadPotentialSideEffects: false }, messagingToolSentTexts: [], messagingToolSentTextsNormalized: [], messagingToolSentMediaUrls: [], @@ -278,8 +277,10 @@ describe("handleToolExecutionEnd mutating failure recovery", () => { } as never, ); - expect(ctx.state.replayInvalid).toBe(true); - expect(ctx.state.hadPotentialSideEffects).toBe(true); + expect(ctx.state.replayState).toEqual({ + replayInvalid: true, + hadPotentialSideEffects: true, + }); }); it("keeps successful mutating retries replay-invalid after an earlier tool failure", async () => { @@ -336,8 +337,10 @@ describe("handleToolExecutionEnd mutating failure recovery", () => { ); expect(ctx.state.lastToolError).toBeUndefined(); - expect(ctx.state.replayInvalid).toBe(true); - expect(ctx.state.hadPotentialSideEffects).toBe(true); + expect(ctx.state.replayState).toEqual({ + replayInvalid: true, + hadPotentialSideEffects: true, + }); }); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 34c7d306a9c..598727d55ad 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -26,6 +26,7 @@ import type { ExecToolDetails } from "./bash-tools.exec-types.js"; import { parseExecApprovalResultText } from "./exec-approval-result.js"; import { normalizeTextForComparison } from "./pi-embedded-helpers.js"; import { isMessagingTool, isMessagingToolSendAction } from "./pi-embedded-messaging.js"; +import { mergeEmbeddedRunReplayState } from "./pi-embedded-runner/replay-state.js"; import type { ToolCallSummary, ToolHandlerContext, @@ -795,8 +796,10 @@ export async function handleToolExecutionEnd( } } if (completedMutatingAction) { - ctx.state.replayInvalid = true; - ctx.state.hadPotentialSideEffects = true; + ctx.state.replayState = mergeEmbeddedRunReplayState(ctx.state.replayState, { + replayInvalid: true, + hadPotentialSideEffects: true, + }); } // Commit messaging tool text on success, discard on error. diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 49d2413c0a4..4ae7064b185 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -6,6 +6,7 @@ import type { HookRunner } from "../plugins/hooks.js"; import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; import type { MessagingToolSend } from "./pi-embedded-messaging.js"; import type { BlockReplyPayload } from "./pi-embedded-payloads.js"; +import type { EmbeddedRunReplayState } from "./pi-embedded-runner/replay-state.js"; import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js"; import type { BlockReplyChunking, @@ -65,8 +66,7 @@ export type EmbeddedPiSubscribeState = { compactionRetryReject?: (reason?: unknown) => void; compactionRetryPromise: Promise | null; unsubscribed: boolean; - replayInvalid?: boolean; - hadPotentialSideEffects?: boolean; + replayState: EmbeddedRunReplayState; livenessState?: EmbeddedRunLivenessState; messagingToolSentTexts: string[]; @@ -163,8 +163,7 @@ export type ToolHandlerState = Pick< | "pendingToolMediaUrls" | "pendingToolAudioAsVoice" | "deterministicApprovalPromptPending" - | "replayInvalid" - | "hadPotentialSideEffects" + | "replayState" | "messagingToolSentTexts" | "messagingToolSentTextsNormalized" | "messagingToolSentMediaUrls" diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 46857293bf1..81197911748 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -576,8 +576,10 @@ describe("subscribeEmbeddedPiSession", () => { emit({ type: "auto_compaction_end", willRetry: true, result: { summary: "compacted" } }); emit({ type: "agent_end" }); - expect(subscription.getReplayInvalid()).toBe(true); - expect(subscription.getHadPotentialSideEffects()).toBe(true); + expect(subscription.getReplayState()).toEqual({ + replayInvalid: true, + hadPotentialSideEffects: true, + }); const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls); expect(payloads).toContainEqual( expect.objectContaining({ diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index f4a65d0f7fd..7a9451907d8 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -15,6 +15,10 @@ import { normalizeTextForComparison, } from "./pi-embedded-helpers.js"; import type { BlockReplyPayload } from "./pi-embedded-payloads.js"; +import { + createEmbeddedRunReplayState, + mergeEmbeddedRunReplayState, +} from "./pi-embedded-runner/replay-state.js"; import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js"; import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js"; import { consumePendingToolMediaIntoReply } from "./pi-embedded-subscribe.handlers.messages.js"; @@ -105,8 +109,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar compactionRetryReject: undefined, compactionRetryPromise: null, unsubscribed: false, - replayInvalid: params.initialReplayInvalid === true, - hadPotentialSideEffects: params.initialHadPotentialSideEffects === true, + replayState: createEmbeddedRunReplayState(params.initialReplayState), livenessState: "working", messagingToolSentTexts: [], messagingToolSentTextsNormalized: [], @@ -695,9 +698,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.pendingToolAudioAsVoice = false; state.deterministicApprovalPromptPending = false; state.deterministicApprovalPromptSent = false; - state.replayInvalid = state.replayInvalid || params.initialReplayInvalid === true; - state.hadPotentialSideEffects = - state.hadPotentialSideEffects || params.initialHadPotentialSideEffects === true; + state.replayState = mergeEmbeddedRunReplayState(state.replayState, params.initialReplayState); state.livenessState = "working"; resetAssistantMessageState(0); }; @@ -785,7 +786,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar livenessState?: EmbeddedRunLivenessState; }) => { if (typeof meta.replayInvalid === "boolean") { - state.replayInvalid = meta.replayInvalid; + state.replayState = { ...state.replayState, replayInvalid: meta.replayInvalid }; } if (meta.livenessState) { state.livenessState = meta.livenessState; @@ -797,8 +798,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar getMessagingToolSentMediaUrls: () => messagingToolSentMediaUrls.slice(), getMessagingToolSentTargets: () => messagingToolSentTargets.slice(), getSuccessfulCronAdds: () => state.successfulCronAdds, - getReplayInvalid: () => state.replayInvalid === true, - getHadPotentialSideEffects: () => state.hadPotentialSideEffects === true, + getReplayState: () => ({ ...state.replayState }), // Returns true if any messaging tool successfully sent a message. // Used to suppress agent's confirmation text (e.g., "Respondi no Telegram!") // which is generated AFTER the tool sends the actual answer. diff --git a/src/agents/pi-embedded-subscribe.types.ts b/src/agents/pi-embedded-subscribe.types.ts index 5a1714e0dfb..9984f451f8c 100644 --- a/src/agents/pi-embedded-subscribe.types.ts +++ b/src/agents/pi-embedded-subscribe.types.ts @@ -6,14 +6,14 @@ import type { HookRunner } from "../plugins/hooks.js"; import type { AgentInternalEvent } from "./internal-events.js"; import type { BlockReplyChunking } from "./pi-embedded-block-chunker.js"; import type { BlockReplyPayload } from "./pi-embedded-payloads.js"; +import type { EmbeddedRunReplayState } from "./pi-embedded-runner/replay-state.js"; export type ToolResultFormat = "markdown" | "plain"; export type SubscribeEmbeddedPiSessionParams = { session: AgentSession; runId: string; - initialReplayInvalid?: boolean; - initialHadPotentialSideEffects?: boolean; + initialReplayState?: EmbeddedRunReplayState; hookRunner?: HookRunner; verboseLevel?: VerboseLevel; reasoningMode?: ReasoningLevel; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index b602a8802d5..797d2d18999 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -346,6 +346,32 @@ export async function dispatchReplyFromConfig(params: { const originatingTo = ctx.OriginatingTo; const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface; + const routeReplyToOriginating = async ( + payload: ReplyPayload, + options?: { abortSignal?: AbortSignal; mirror?: boolean }, + ) => { + if (!shouldRouteToOriginating || !originatingChannel || !originatingTo || !routeReplyRuntime) { + return null; + } + return await routeReplyRuntime.routeReply({ + payload, + channel: originatingChannel, + to: originatingTo, + sessionKey: ctx.SessionKey, + accountId: ctx.AccountId, + requesterSenderId: ctx.SenderId, + requesterSenderName: ctx.SenderName, + requesterSenderUsername: ctx.SenderUsername, + requesterSenderE164: ctx.SenderE164, + threadId: routeThreadId, + cfg, + abortSignal: options?.abortSignal, + mirror: options?.mirror, + isGroup, + groupId, + }); + }; + /** * Helper to send a payload via route-reply (async). * Only used when actually routing to a different provider. @@ -365,24 +391,11 @@ export async function dispatchReplyFromConfig(params: { if (abortSignal?.aborted) { return; } - const result = await routeReplyRuntime.routeReply({ - payload, - channel: originatingChannel, - to: originatingTo, - sessionKey: ctx.SessionKey, - accountId: ctx.AccountId, - requesterSenderId: ctx.SenderId, - requesterSenderName: ctx.SenderName, - requesterSenderUsername: ctx.SenderUsername, - requesterSenderE164: ctx.SenderE164, - threadId: routeThreadId, - cfg, + const result = await routeReplyToOriginating(payload, { abortSignal, mirror, - isGroup, - groupId, }); - if (!result.ok) { + if (result && !result.ok) { logVerbose(`dispatch-from-config: route-reply failed: ${result.error ?? "unknown error"}`); } }; @@ -391,22 +404,8 @@ export async function dispatchReplyFromConfig(params: { payload: ReplyPayload, mode: "additive" | "terminal", ): Promise => { - if (shouldRouteToOriginating && routeReplyRuntime && originatingChannel && originatingTo) { - const result = await routeReplyRuntime.routeReply({ - payload, - channel: originatingChannel, - to: originatingTo, - sessionKey: ctx.SessionKey, - accountId: ctx.AccountId, - requesterSenderId: ctx.SenderId, - requesterSenderName: ctx.SenderName, - requesterSenderUsername: ctx.SenderUsername, - requesterSenderE164: ctx.SenderE164, - threadId: routeThreadId, - cfg, - isGroup, - groupId, - }); + const result = await routeReplyToOriginating(payload); + if (result) { if (!result.ok) { logVerbose( `dispatch-from-config: route-reply (plugin binding notice) failed: ${result.error ?? "unknown error"}`, @@ -551,22 +550,8 @@ export async function dispatchReplyFromConfig(params: { } satisfies ReplyPayload; let queuedFinal = false; let routedFinalCount = 0; - if (shouldRouteToOriginating && routeReplyRuntime && originatingChannel && originatingTo) { - const result = await routeReplyRuntime.routeReply({ - payload, - channel: originatingChannel, - to: originatingTo, - sessionKey: ctx.SessionKey, - accountId: ctx.AccountId, - requesterSenderId: ctx.SenderId, - requesterSenderName: ctx.SenderName, - requesterSenderUsername: ctx.SenderUsername, - requesterSenderE164: ctx.SenderE164, - threadId: routeThreadId, - cfg, - isGroup, - groupId, - }); + const result = await routeReplyToOriginating(payload); + if (result) { queuedFinal = result.ok; if (result.ok) { routedFinalCount += 1; @@ -612,22 +597,8 @@ export async function dispatchReplyFromConfig(params: { inboundAudio, ttsAuto: sessionTtsAuto, }); - if (shouldRouteToOriginating && routeReplyRuntime && originatingChannel && originatingTo) { - const result = await routeReplyRuntime.routeReply({ - payload: ttsPayload, - channel: originatingChannel, - to: originatingTo, - sessionKey: ctx.SessionKey, - accountId: ctx.AccountId, - requesterSenderId: ctx.SenderId, - requesterSenderName: ctx.SenderName, - requesterSenderUsername: ctx.SenderUsername, - requesterSenderE164: ctx.SenderE164, - threadId: routeThreadId, - cfg, - isGroup, - groupId, - }); + const result = await routeReplyToOriginating(ttsPayload); + if (result) { if (!result.ok) { logVerbose( `dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`, @@ -1046,27 +1017,8 @@ export async function dispatchReplyFromConfig(params: { mediaUrl: ttsSyntheticReply.mediaUrl, audioAsVoice: ttsSyntheticReply.audioAsVoice, }; - if ( - shouldRouteToOriginating && - routeReplyRuntime && - originatingChannel && - originatingTo - ) { - const result = await routeReplyRuntime.routeReply({ - payload: ttsOnlyPayload, - channel: originatingChannel, - to: originatingTo, - sessionKey: ctx.SessionKey, - accountId: ctx.AccountId, - requesterSenderId: ctx.SenderId, - requesterSenderName: ctx.SenderName, - requesterSenderUsername: ctx.SenderUsername, - requesterSenderE164: ctx.SenderE164, - threadId: routeThreadId, - cfg, - isGroup, - groupId, - }); + const result = await routeReplyToOriginating(ttsOnlyPayload); + if (result) { queuedFinal = result.ok || queuedFinal; if (result.ok) { routedFinalCount += 1; diff --git a/src/plugin-sdk/provider-tools.test.ts b/src/plugin-sdk/provider-tools.test.ts index 3686f240030..388ab4f61e9 100644 --- a/src/plugin-sdk/provider-tools.test.ts +++ b/src/plugin-sdk/provider-tools.test.ts @@ -10,6 +10,24 @@ import { } from "./provider-tools.js"; describe("buildProviderToolCompatFamilyHooks", () => { + function normalizeOpenAIParameters(parameters: unknown): unknown { + const hooks = buildProviderToolCompatFamilyHooks("openai"); + const tools = [{ name: "demo", description: "", parameters }] as never; + const normalized = hooks.normalizeToolSchemas({ + provider: "openai", + modelId: "gpt-5.4", + modelApi: "openai-responses", + model: { + provider: "openai", + api: "openai-responses", + baseUrl: "https://api.openai.com/v1", + id: "gpt-5.4", + } as never, + tools, + }); + return normalized[0]?.parameters; + } + it("covers the tool compat family matrix", () => { const cases = [ { @@ -101,37 +119,51 @@ describe("buildProviderToolCompatFamilyHooks", () => { }); }); - it("preserves nested empty property schemas and object annotations", () => { - const hooks = buildProviderToolCompatFamilyHooks("openai"); - const parameters = { - type: "object", - properties: { - payload: {}, - mode: { - type: "string", - default: {}, - const: {}, + it("preserves nested schemas and annotation objects while normalizing strict openai schemas", () => { + const cases = [ + { + name: "property schema", + parameters: { + type: "object", + properties: { payload: {} }, + required: ["payload"], + additionalProperties: false, }, }, - required: ["payload", "mode"], - additionalProperties: false, - }; - const tools = [{ name: "demo", description: "", parameters }] as never; + { + name: "schema maps", + parameters: { + type: "object", + properties: { mode: { $defs: { nested: {} }, dependentSchemas: { flag: {} } } }, + required: ["mode"], + additionalProperties: false, + }, + }, + { + name: "nested schema arrays", + parameters: { + type: "object", + properties: { mode: { anyOf: [{}], prefixItems: [{}] } }, + required: ["mode"], + additionalProperties: false, + }, + }, + { + name: "annotation objects", + parameters: { + type: "object", + properties: { mode: { type: "string", default: {}, const: {}, examples: [{}] } }, + required: ["mode"], + additionalProperties: false, + }, + }, + ]; - const normalized = hooks.normalizeToolSchemas({ - provider: "openai", - modelId: "gpt-5.4", - modelApi: "openai-responses", - model: { - provider: "openai", - api: "openai-responses", - baseUrl: "https://api.openai.com/v1", - id: "gpt-5.4", - } as never, - tools, - }); - - expect(normalized[0]?.parameters).toEqual(parameters); + for (const testCase of cases) { + expect(normalizeOpenAIParameters(testCase.parameters), testCase.name).toEqual( + testCase.parameters, + ); + } }); it("does not tighten permissive object schemas just to satisfy strict mode", () => {