mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
refactor: consolidate embedded replay state
This commit is contained in:
49
src/agents/pi-embedded-runner/replay-state.ts
Normal file
49
src/agents/pi-embedded-runner/replay-state.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
export type EmbeddedRunReplayState = {
|
||||
replayInvalid: boolean;
|
||||
hadPotentialSideEffects: boolean;
|
||||
};
|
||||
|
||||
export type EmbeddedRunReplayMetadata = {
|
||||
hadPotentialSideEffects: boolean;
|
||||
replaySafe: boolean;
|
||||
};
|
||||
|
||||
export function createEmbeddedRunReplayState(
|
||||
state?: Partial<EmbeddedRunReplayState>,
|
||||
): EmbeddedRunReplayState {
|
||||
return {
|
||||
replayInvalid: state?.replayInvalid === true,
|
||||
hadPotentialSideEffects: state?.hadPotentialSideEffects === true,
|
||||
};
|
||||
}
|
||||
|
||||
export function mergeEmbeddedRunReplayState(
|
||||
current: EmbeddedRunReplayState,
|
||||
next?: Partial<EmbeddedRunReplayState>,
|
||||
): EmbeddedRunReplayState {
|
||||
if (!next) {
|
||||
return current;
|
||||
}
|
||||
return {
|
||||
replayInvalid: current.replayInvalid || next.replayInvalid === true,
|
||||
hadPotentialSideEffects:
|
||||
current.hadPotentialSideEffects || next.hadPotentialSideEffects === true,
|
||||
};
|
||||
}
|
||||
|
||||
export function observeReplayMetadata(
|
||||
current: EmbeddedRunReplayState,
|
||||
metadata: EmbeddedRunReplayMetadata,
|
||||
): EmbeddedRunReplayState {
|
||||
return mergeEmbeddedRunReplayState(current, {
|
||||
replayInvalid: !metadata.replaySafe,
|
||||
hadPotentialSideEffects: metadata.hadPotentialSideEffects,
|
||||
});
|
||||
}
|
||||
|
||||
export function replayMetadataFromState(state: EmbeddedRunReplayState): EmbeddedRunReplayMetadata {
|
||||
return {
|
||||
hadPotentialSideEffects: state.hadPotentialSideEffects,
|
||||
replaySafe: !state.replayInvalid && !state.hadPotentialSideEffects,
|
||||
};
|
||||
}
|
||||
@@ -76,6 +76,7 @@ import { runContextEngineMaintenance } from "./context-engine-maintenance.js";
|
||||
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
import { resolveModelAsync } from "./model.js";
|
||||
import { createEmbeddedRunReplayState, observeReplayMetadata } from "./replay-state.js";
|
||||
import { handleAssistantFailover } from "./run/assistant-failover.js";
|
||||
import { createEmbeddedRunAuthController } from "./run/auth-controller.js";
|
||||
import { runEmbeddedAttemptWithBackend } from "./run/backend.js";
|
||||
@@ -564,8 +565,7 @@ export async function runEmbeddedPiAgent(
|
||||
}
|
||||
};
|
||||
let authRetryPending = false;
|
||||
let accumulatedReplayInvalid = false;
|
||||
let accumulatedHadPotentialSideEffects = false;
|
||||
let accumulatedReplayState = createEmbeddedRunReplayState();
|
||||
// Hoisted so the retry-limit error path can use the most recent API total.
|
||||
let lastTurnTotal: number | undefined;
|
||||
while (true) {
|
||||
@@ -598,7 +598,7 @@ export async function runEmbeddedPiAgent(
|
||||
lastRunPromptUsage,
|
||||
lastTurnTotal,
|
||||
}),
|
||||
replayInvalid: accumulatedReplayInvalid ? true : undefined,
|
||||
replayInvalid: accumulatedReplayState.replayInvalid ? true : undefined,
|
||||
livenessState: "blocked",
|
||||
});
|
||||
}
|
||||
@@ -676,8 +676,7 @@ export async function runEmbeddedPiAgent(
|
||||
resolvedApiKey: resolvedStreamApiKey,
|
||||
authProfileId: lastProfileId,
|
||||
authProfileIdSource: lockedProfileId ? "user" : "auto",
|
||||
initialReplayInvalid: accumulatedReplayInvalid,
|
||||
initialHadPotentialSideEffects: accumulatedHadPotentialSideEffects,
|
||||
initialReplayState: accumulatedReplayState,
|
||||
authStorage,
|
||||
modelRegistry,
|
||||
agentId: workspaceResolution.agentId,
|
||||
@@ -754,17 +753,18 @@ export async function runEmbeddedPiAgent(
|
||||
model: modelId,
|
||||
});
|
||||
const resolveReplayInvalidForAttempt = (incompleteTurnText?: string | null) =>
|
||||
accumulatedReplayInvalid ||
|
||||
accumulatedReplayState.replayInvalid ||
|
||||
resolveReplayInvalidFlag({
|
||||
attempt,
|
||||
incompleteTurnText,
|
||||
});
|
||||
if (resolveReplayInvalidForAttempt(null)) {
|
||||
accumulatedReplayInvalid = true;
|
||||
}
|
||||
if (attempt.replayMetadata.hadPotentialSideEffects) {
|
||||
accumulatedHadPotentialSideEffects = true;
|
||||
accumulatedReplayState = { ...accumulatedReplayState, replayInvalid: true };
|
||||
}
|
||||
accumulatedReplayState = observeReplayMetadata(
|
||||
accumulatedReplayState,
|
||||
attempt.replayMetadata,
|
||||
);
|
||||
const formattedAssistantErrorText = lastAssistant
|
||||
? formatAssistantErrorText(lastAssistant, {
|
||||
cfg: params.config,
|
||||
|
||||
@@ -87,8 +87,10 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
|
||||
getMessagingToolSentMediaUrls: () => [] as string[],
|
||||
getMessagingToolSentTargets: () => [] as MessagingToolSend[],
|
||||
getSuccessfulCronAdds: () => 0,
|
||||
getReplayInvalid: () => false,
|
||||
getHadPotentialSideEffects: () => false,
|
||||
getReplayState: () => ({
|
||||
replayInvalid: false,
|
||||
hadPotentialSideEffects: false,
|
||||
}),
|
||||
didSendViaMessagingTool: () => false,
|
||||
didSendDeterministicApprovalPrompt: () => false,
|
||||
getLastToolError: () => undefined,
|
||||
@@ -629,8 +631,10 @@ export function createSubscriptionMock(): SubscriptionMock {
|
||||
getMessagingToolSentMediaUrls: () => [] as string[],
|
||||
getMessagingToolSentTargets: () => [] as MessagingToolSend[],
|
||||
getSuccessfulCronAdds: () => 0,
|
||||
getReplayInvalid: () => false,
|
||||
getHadPotentialSideEffects: () => false,
|
||||
getReplayState: () => ({
|
||||
replayInvalid: false,
|
||||
hadPotentialSideEffects: false,
|
||||
}),
|
||||
didSendViaMessagingTool: () => false,
|
||||
didSendDeterministicApprovalPrompt: () => false,
|
||||
getLastToolError: () => undefined,
|
||||
|
||||
@@ -125,6 +125,7 @@ import {
|
||||
} from "../prompt-cache-observability.js";
|
||||
import { resolveCacheRetention } from "../prompt-cache-retention.js";
|
||||
import { sanitizeSessionHistory, validateReplayTurns } from "../replay-history.js";
|
||||
import { observeReplayMetadata, replayMetadataFromState } from "../replay-state.js";
|
||||
import {
|
||||
clearActiveEmbeddedRun,
|
||||
type EmbeddedPiQueueHandle,
|
||||
@@ -1408,8 +1409,7 @@ export async function runEmbeddedAttempt(
|
||||
buildEmbeddedSubscriptionParams({
|
||||
session: activeSession,
|
||||
runId: params.runId,
|
||||
initialReplayInvalid: params.initialReplayInvalid,
|
||||
initialHadPotentialSideEffects: params.initialHadPotentialSideEffects,
|
||||
initialReplayState: params.initialReplayState,
|
||||
hookRunner: getGlobalHookRunner() ?? undefined,
|
||||
verboseLevel: params.verboseLevel,
|
||||
reasoningMode: params.reasoningLevel ?? "off",
|
||||
@@ -1447,8 +1447,7 @@ export async function runEmbeddedAttempt(
|
||||
getMessagingToolSentMediaUrls,
|
||||
getMessagingToolSentTargets,
|
||||
getSuccessfulCronAdds,
|
||||
getReplayInvalid,
|
||||
getHadPotentialSideEffects,
|
||||
getReplayState,
|
||||
didSendViaMessagingTool,
|
||||
getLastToolError,
|
||||
setTerminalLifecycleMeta,
|
||||
@@ -2280,11 +2279,9 @@ export async function runEmbeddedAttempt(
|
||||
didSendViaMessagingTool: didSendViaMessagingTool(),
|
||||
successfulCronAdds: getSuccessfulCronAdds(),
|
||||
});
|
||||
const replayMetadata = {
|
||||
hadPotentialSideEffects:
|
||||
observedReplayMetadata.hadPotentialSideEffects || getHadPotentialSideEffects(),
|
||||
replaySafe: observedReplayMetadata.replaySafe && !getReplayInvalid(),
|
||||
};
|
||||
const replayMetadata = replayMetadataFromState(
|
||||
observeReplayMetadata(getReplayState(), observedReplayMetadata),
|
||||
);
|
||||
|
||||
return {
|
||||
replayMetadata,
|
||||
|
||||
@@ -40,6 +40,13 @@ type RunLivenessAttempt = Pick<
|
||||
"lastAssistant" | "promptErrorSource" | "replayMetadata" | "timedOutDuringCompaction"
|
||||
>;
|
||||
|
||||
export function isIncompleteTerminalAssistantTurn(params: {
|
||||
hasAssistantVisibleText: boolean;
|
||||
lastAssistant?: { stopReason?: string } | null;
|
||||
}): boolean {
|
||||
return !params.hasAssistantVisibleText && params.lastAssistant?.stopReason === "toolUse";
|
||||
}
|
||||
|
||||
const PLANNING_ONLY_PROMISE_RE =
|
||||
/\b(?:i(?:'ll| will)|let me|going to|first[, ]+i(?:'ll| will)|next[, ]+i(?:'ll| will)|i can do that)\b/i;
|
||||
const PLANNING_ONLY_COMPLETION_RE =
|
||||
@@ -133,7 +140,11 @@ export function resolveIncompleteTurnPayloadText(params: {
|
||||
}
|
||||
|
||||
const stopReason = params.attempt.lastAssistant?.stopReason;
|
||||
if (stopReason !== "toolUse" && stopReason !== "error") {
|
||||
const incompleteTerminalAssistant = isIncompleteTerminalAssistantTurn({
|
||||
hasAssistantVisibleText: params.payloadCount > 0,
|
||||
lastAssistant: params.attempt.lastAssistant,
|
||||
});
|
||||
if (!incompleteTerminalAssistant && stopReason !== "error") {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import type { PluginHookBeforeAgentStartResult } from "../../../plugins/types.js
|
||||
import type { MessagingToolSend } from "../../pi-embedded-messaging.js";
|
||||
import type { ToolErrorSummary } from "../../tool-error-summary.js";
|
||||
import type { NormalizedUsage } from "../../usage.js";
|
||||
import type { EmbeddedRunReplayMetadata, EmbeddedRunReplayState } from "../replay-state.js";
|
||||
import type { EmbeddedRunLivenessState } from "../types.js";
|
||||
import type { RunEmbeddedPiAgentParams } from "./params.js";
|
||||
import type { PreemptiveCompactionRoute } from "./preemptive-compaction.js";
|
||||
@@ -18,8 +19,7 @@ type EmbeddedRunAttemptBase = Omit<
|
||||
>;
|
||||
|
||||
export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & {
|
||||
initialReplayInvalid?: boolean;
|
||||
initialHadPotentialSideEffects?: boolean;
|
||||
initialReplayState?: EmbeddedRunReplayState;
|
||||
/** Pluggable context engine for ingest/assemble/compact lifecycle. */
|
||||
contextEngine?: ContextEngine;
|
||||
/** Resolved model context window in tokens for assemble/compact budgeting. */
|
||||
@@ -92,10 +92,7 @@ export type EmbeddedRunAttemptResult = {
|
||||
clientToolCall?: { name: string; params: Record<string, unknown> };
|
||||
/** True when sessions_yield tool was called during this attempt. */
|
||||
yieldDetected?: boolean;
|
||||
replayMetadata: {
|
||||
hadPotentialSideEffects: boolean;
|
||||
replaySafe: boolean;
|
||||
};
|
||||
replayMetadata: EmbeddedRunReplayMetadata;
|
||||
itemLifecycle: {
|
||||
startedCount: number;
|
||||
completedCount: number;
|
||||
|
||||
@@ -29,6 +29,7 @@ function createContext(
|
||||
pendingCompactionRetry: 0,
|
||||
pendingToolMediaUrls: [],
|
||||
pendingToolAudioAsVoice: false,
|
||||
replayState: { replayInvalid: false, hadPotentialSideEffects: false },
|
||||
blockState: {
|
||||
thinking: true,
|
||||
final: true,
|
||||
@@ -196,7 +197,7 @@ describe("handleAgentEnd", () => {
|
||||
it("surfaces replay-invalid paused lifecycle end state when present", async () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const ctx = createContext(undefined, { onAgentEvent });
|
||||
ctx.state.replayInvalid = true;
|
||||
ctx.state.replayState = { ...ctx.state.replayState, replayInvalid: true };
|
||||
ctx.state.livenessState = "paused";
|
||||
|
||||
await handleAgentEnd(ctx);
|
||||
@@ -214,7 +215,7 @@ describe("handleAgentEnd", () => {
|
||||
it("derives abandoned lifecycle end state when replay-invalid work finished without a reply", async () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const ctx = createContext(undefined, { onAgentEvent });
|
||||
ctx.state.replayInvalid = true;
|
||||
ctx.state.replayState = { ...ctx.state.replayState, replayInvalid: true };
|
||||
ctx.state.livenessState = "working";
|
||||
ctx.state.assistantTexts = [];
|
||||
ctx.state.messagingToolSentTexts = [];
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
sanitizeForConsole,
|
||||
} from "./pi-embedded-error-observation.js";
|
||||
import { classifyFailoverReason, formatAssistantErrorText } from "./pi-embedded-helpers.js";
|
||||
import { isIncompleteTerminalAssistantTurn } from "./pi-embedded-runner/run/incomplete-turn.js";
|
||||
import {
|
||||
consumePendingToolMediaReply,
|
||||
hasAssistantVisibleReply,
|
||||
@@ -42,12 +43,12 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
|
||||
const hasAssistantVisibleText =
|
||||
Array.isArray(ctx.state.assistantTexts) &&
|
||||
ctx.state.assistantTexts.some((text) => hasAssistantVisibleReply({ text }));
|
||||
const incompleteTerminalAssistant =
|
||||
!hasAssistantVisibleText &&
|
||||
isAssistantMessage(lastAssistant) &&
|
||||
lastAssistant.stopReason === "toolUse";
|
||||
const incompleteTerminalAssistant = isIncompleteTerminalAssistantTurn({
|
||||
hasAssistantVisibleText,
|
||||
lastAssistant: isAssistantMessage(lastAssistant) ? lastAssistant : null,
|
||||
});
|
||||
const replayInvalid =
|
||||
ctx.state.replayInvalid === true || incompleteTerminalAssistant ? true : undefined;
|
||||
ctx.state.replayState.replayInvalid || incompleteTerminalAssistant ? true : undefined;
|
||||
const derivedWorkingTerminalState = isError
|
||||
? "blocked"
|
||||
: replayInvalid && !hasAssistantVisibleText
|
||||
|
||||
@@ -48,8 +48,7 @@ function createTestContext(): {
|
||||
pendingToolMediaUrls: [],
|
||||
pendingToolAudioAsVoice: false,
|
||||
deterministicApprovalPromptPending: false,
|
||||
replayInvalid: false,
|
||||
hadPotentialSideEffects: false,
|
||||
replayState: { replayInvalid: false, hadPotentialSideEffects: false },
|
||||
messagingToolSentTexts: [],
|
||||
messagingToolSentTextsNormalized: [],
|
||||
messagingToolSentMediaUrls: [],
|
||||
@@ -278,8 +277,10 @@ describe("handleToolExecutionEnd mutating failure recovery", () => {
|
||||
} as never,
|
||||
);
|
||||
|
||||
expect(ctx.state.replayInvalid).toBe(true);
|
||||
expect(ctx.state.hadPotentialSideEffects).toBe(true);
|
||||
expect(ctx.state.replayState).toEqual({
|
||||
replayInvalid: true,
|
||||
hadPotentialSideEffects: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps successful mutating retries replay-invalid after an earlier tool failure", async () => {
|
||||
@@ -336,8 +337,10 @@ describe("handleToolExecutionEnd mutating failure recovery", () => {
|
||||
);
|
||||
|
||||
expect(ctx.state.lastToolError).toBeUndefined();
|
||||
expect(ctx.state.replayInvalid).toBe(true);
|
||||
expect(ctx.state.hadPotentialSideEffects).toBe(true);
|
||||
expect(ctx.state.replayState).toEqual({
|
||||
replayInvalid: true,
|
||||
hadPotentialSideEffects: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ import type { ExecToolDetails } from "./bash-tools.exec-types.js";
|
||||
import { parseExecApprovalResultText } from "./exec-approval-result.js";
|
||||
import { normalizeTextForComparison } from "./pi-embedded-helpers.js";
|
||||
import { isMessagingTool, isMessagingToolSendAction } from "./pi-embedded-messaging.js";
|
||||
import { mergeEmbeddedRunReplayState } from "./pi-embedded-runner/replay-state.js";
|
||||
import type {
|
||||
ToolCallSummary,
|
||||
ToolHandlerContext,
|
||||
@@ -795,8 +796,10 @@ export async function handleToolExecutionEnd(
|
||||
}
|
||||
}
|
||||
if (completedMutatingAction) {
|
||||
ctx.state.replayInvalid = true;
|
||||
ctx.state.hadPotentialSideEffects = true;
|
||||
ctx.state.replayState = mergeEmbeddedRunReplayState(ctx.state.replayState, {
|
||||
replayInvalid: true,
|
||||
hadPotentialSideEffects: true,
|
||||
});
|
||||
}
|
||||
|
||||
// Commit messaging tool text on success, discard on error.
|
||||
|
||||
@@ -6,6 +6,7 @@ import type { HookRunner } from "../plugins/hooks.js";
|
||||
import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
|
||||
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
|
||||
import type { BlockReplyPayload } from "./pi-embedded-payloads.js";
|
||||
import type { EmbeddedRunReplayState } from "./pi-embedded-runner/replay-state.js";
|
||||
import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js";
|
||||
import type {
|
||||
BlockReplyChunking,
|
||||
@@ -65,8 +66,7 @@ export type EmbeddedPiSubscribeState = {
|
||||
compactionRetryReject?: (reason?: unknown) => void;
|
||||
compactionRetryPromise: Promise<void> | null;
|
||||
unsubscribed: boolean;
|
||||
replayInvalid?: boolean;
|
||||
hadPotentialSideEffects?: boolean;
|
||||
replayState: EmbeddedRunReplayState;
|
||||
livenessState?: EmbeddedRunLivenessState;
|
||||
|
||||
messagingToolSentTexts: string[];
|
||||
@@ -163,8 +163,7 @@ export type ToolHandlerState = Pick<
|
||||
| "pendingToolMediaUrls"
|
||||
| "pendingToolAudioAsVoice"
|
||||
| "deterministicApprovalPromptPending"
|
||||
| "replayInvalid"
|
||||
| "hadPotentialSideEffects"
|
||||
| "replayState"
|
||||
| "messagingToolSentTexts"
|
||||
| "messagingToolSentTextsNormalized"
|
||||
| "messagingToolSentMediaUrls"
|
||||
|
||||
@@ -576,8 +576,10 @@ describe("subscribeEmbeddedPiSession", () => {
|
||||
emit({ type: "auto_compaction_end", willRetry: true, result: { summary: "compacted" } });
|
||||
emit({ type: "agent_end" });
|
||||
|
||||
expect(subscription.getReplayInvalid()).toBe(true);
|
||||
expect(subscription.getHadPotentialSideEffects()).toBe(true);
|
||||
expect(subscription.getReplayState()).toEqual({
|
||||
replayInvalid: true,
|
||||
hadPotentialSideEffects: true,
|
||||
});
|
||||
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
|
||||
expect(payloads).toContainEqual(
|
||||
expect.objectContaining({
|
||||
|
||||
@@ -15,6 +15,10 @@ import {
|
||||
normalizeTextForComparison,
|
||||
} from "./pi-embedded-helpers.js";
|
||||
import type { BlockReplyPayload } from "./pi-embedded-payloads.js";
|
||||
import {
|
||||
createEmbeddedRunReplayState,
|
||||
mergeEmbeddedRunReplayState,
|
||||
} from "./pi-embedded-runner/replay-state.js";
|
||||
import type { EmbeddedRunLivenessState } from "./pi-embedded-runner/types.js";
|
||||
import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js";
|
||||
import { consumePendingToolMediaIntoReply } from "./pi-embedded-subscribe.handlers.messages.js";
|
||||
@@ -105,8 +109,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
||||
compactionRetryReject: undefined,
|
||||
compactionRetryPromise: null,
|
||||
unsubscribed: false,
|
||||
replayInvalid: params.initialReplayInvalid === true,
|
||||
hadPotentialSideEffects: params.initialHadPotentialSideEffects === true,
|
||||
replayState: createEmbeddedRunReplayState(params.initialReplayState),
|
||||
livenessState: "working",
|
||||
messagingToolSentTexts: [],
|
||||
messagingToolSentTextsNormalized: [],
|
||||
@@ -695,9 +698,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
||||
state.pendingToolAudioAsVoice = false;
|
||||
state.deterministicApprovalPromptPending = false;
|
||||
state.deterministicApprovalPromptSent = false;
|
||||
state.replayInvalid = state.replayInvalid || params.initialReplayInvalid === true;
|
||||
state.hadPotentialSideEffects =
|
||||
state.hadPotentialSideEffects || params.initialHadPotentialSideEffects === true;
|
||||
state.replayState = mergeEmbeddedRunReplayState(state.replayState, params.initialReplayState);
|
||||
state.livenessState = "working";
|
||||
resetAssistantMessageState(0);
|
||||
};
|
||||
@@ -785,7 +786,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
||||
livenessState?: EmbeddedRunLivenessState;
|
||||
}) => {
|
||||
if (typeof meta.replayInvalid === "boolean") {
|
||||
state.replayInvalid = meta.replayInvalid;
|
||||
state.replayState = { ...state.replayState, replayInvalid: meta.replayInvalid };
|
||||
}
|
||||
if (meta.livenessState) {
|
||||
state.livenessState = meta.livenessState;
|
||||
@@ -797,8 +798,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
||||
getMessagingToolSentMediaUrls: () => messagingToolSentMediaUrls.slice(),
|
||||
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
|
||||
getSuccessfulCronAdds: () => state.successfulCronAdds,
|
||||
getReplayInvalid: () => state.replayInvalid === true,
|
||||
getHadPotentialSideEffects: () => state.hadPotentialSideEffects === true,
|
||||
getReplayState: () => ({ ...state.replayState }),
|
||||
// Returns true if any messaging tool successfully sent a message.
|
||||
// Used to suppress agent's confirmation text (e.g., "Respondi no Telegram!")
|
||||
// which is generated AFTER the tool sends the actual answer.
|
||||
|
||||
@@ -6,14 +6,14 @@ import type { HookRunner } from "../plugins/hooks.js";
|
||||
import type { AgentInternalEvent } from "./internal-events.js";
|
||||
import type { BlockReplyChunking } from "./pi-embedded-block-chunker.js";
|
||||
import type { BlockReplyPayload } from "./pi-embedded-payloads.js";
|
||||
import type { EmbeddedRunReplayState } from "./pi-embedded-runner/replay-state.js";
|
||||
|
||||
export type ToolResultFormat = "markdown" | "plain";
|
||||
|
||||
export type SubscribeEmbeddedPiSessionParams = {
|
||||
session: AgentSession;
|
||||
runId: string;
|
||||
initialReplayInvalid?: boolean;
|
||||
initialHadPotentialSideEffects?: boolean;
|
||||
initialReplayState?: EmbeddedRunReplayState;
|
||||
hookRunner?: HookRunner;
|
||||
verboseLevel?: VerboseLevel;
|
||||
reasoningMode?: ReasoningLevel;
|
||||
|
||||
@@ -346,6 +346,32 @@ export async function dispatchReplyFromConfig(params: {
|
||||
const originatingTo = ctx.OriginatingTo;
|
||||
const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface;
|
||||
|
||||
const routeReplyToOriginating = async (
|
||||
payload: ReplyPayload,
|
||||
options?: { abortSignal?: AbortSignal; mirror?: boolean },
|
||||
) => {
|
||||
if (!shouldRouteToOriginating || !originatingChannel || !originatingTo || !routeReplyRuntime) {
|
||||
return null;
|
||||
}
|
||||
return await routeReplyRuntime.routeReply({
|
||||
payload,
|
||||
channel: originatingChannel,
|
||||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
requesterSenderId: ctx.SenderId,
|
||||
requesterSenderName: ctx.SenderName,
|
||||
requesterSenderUsername: ctx.SenderUsername,
|
||||
requesterSenderE164: ctx.SenderE164,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
abortSignal: options?.abortSignal,
|
||||
mirror: options?.mirror,
|
||||
isGroup,
|
||||
groupId,
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Helper to send a payload via route-reply (async).
|
||||
* Only used when actually routing to a different provider.
|
||||
@@ -365,24 +391,11 @@ export async function dispatchReplyFromConfig(params: {
|
||||
if (abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
const result = await routeReplyRuntime.routeReply({
|
||||
payload,
|
||||
channel: originatingChannel,
|
||||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
requesterSenderId: ctx.SenderId,
|
||||
requesterSenderName: ctx.SenderName,
|
||||
requesterSenderUsername: ctx.SenderUsername,
|
||||
requesterSenderE164: ctx.SenderE164,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
const result = await routeReplyToOriginating(payload, {
|
||||
abortSignal,
|
||||
mirror,
|
||||
isGroup,
|
||||
groupId,
|
||||
});
|
||||
if (!result.ok) {
|
||||
if (result && !result.ok) {
|
||||
logVerbose(`dispatch-from-config: route-reply failed: ${result.error ?? "unknown error"}`);
|
||||
}
|
||||
};
|
||||
@@ -391,22 +404,8 @@ export async function dispatchReplyFromConfig(params: {
|
||||
payload: ReplyPayload,
|
||||
mode: "additive" | "terminal",
|
||||
): Promise<boolean> => {
|
||||
if (shouldRouteToOriginating && routeReplyRuntime && originatingChannel && originatingTo) {
|
||||
const result = await routeReplyRuntime.routeReply({
|
||||
payload,
|
||||
channel: originatingChannel,
|
||||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
requesterSenderId: ctx.SenderId,
|
||||
requesterSenderName: ctx.SenderName,
|
||||
requesterSenderUsername: ctx.SenderUsername,
|
||||
requesterSenderE164: ctx.SenderE164,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
isGroup,
|
||||
groupId,
|
||||
});
|
||||
const result = await routeReplyToOriginating(payload);
|
||||
if (result) {
|
||||
if (!result.ok) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: route-reply (plugin binding notice) failed: ${result.error ?? "unknown error"}`,
|
||||
@@ -551,22 +550,8 @@ export async function dispatchReplyFromConfig(params: {
|
||||
} satisfies ReplyPayload;
|
||||
let queuedFinal = false;
|
||||
let routedFinalCount = 0;
|
||||
if (shouldRouteToOriginating && routeReplyRuntime && originatingChannel && originatingTo) {
|
||||
const result = await routeReplyRuntime.routeReply({
|
||||
payload,
|
||||
channel: originatingChannel,
|
||||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
requesterSenderId: ctx.SenderId,
|
||||
requesterSenderName: ctx.SenderName,
|
||||
requesterSenderUsername: ctx.SenderUsername,
|
||||
requesterSenderE164: ctx.SenderE164,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
isGroup,
|
||||
groupId,
|
||||
});
|
||||
const result = await routeReplyToOriginating(payload);
|
||||
if (result) {
|
||||
queuedFinal = result.ok;
|
||||
if (result.ok) {
|
||||
routedFinalCount += 1;
|
||||
@@ -612,22 +597,8 @@ export async function dispatchReplyFromConfig(params: {
|
||||
inboundAudio,
|
||||
ttsAuto: sessionTtsAuto,
|
||||
});
|
||||
if (shouldRouteToOriginating && routeReplyRuntime && originatingChannel && originatingTo) {
|
||||
const result = await routeReplyRuntime.routeReply({
|
||||
payload: ttsPayload,
|
||||
channel: originatingChannel,
|
||||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
requesterSenderId: ctx.SenderId,
|
||||
requesterSenderName: ctx.SenderName,
|
||||
requesterSenderUsername: ctx.SenderUsername,
|
||||
requesterSenderE164: ctx.SenderE164,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
isGroup,
|
||||
groupId,
|
||||
});
|
||||
const result = await routeReplyToOriginating(ttsPayload);
|
||||
if (result) {
|
||||
if (!result.ok) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`,
|
||||
@@ -1046,27 +1017,8 @@ export async function dispatchReplyFromConfig(params: {
|
||||
mediaUrl: ttsSyntheticReply.mediaUrl,
|
||||
audioAsVoice: ttsSyntheticReply.audioAsVoice,
|
||||
};
|
||||
if (
|
||||
shouldRouteToOriginating &&
|
||||
routeReplyRuntime &&
|
||||
originatingChannel &&
|
||||
originatingTo
|
||||
) {
|
||||
const result = await routeReplyRuntime.routeReply({
|
||||
payload: ttsOnlyPayload,
|
||||
channel: originatingChannel,
|
||||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
requesterSenderId: ctx.SenderId,
|
||||
requesterSenderName: ctx.SenderName,
|
||||
requesterSenderUsername: ctx.SenderUsername,
|
||||
requesterSenderE164: ctx.SenderE164,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
isGroup,
|
||||
groupId,
|
||||
});
|
||||
const result = await routeReplyToOriginating(ttsOnlyPayload);
|
||||
if (result) {
|
||||
queuedFinal = result.ok || queuedFinal;
|
||||
if (result.ok) {
|
||||
routedFinalCount += 1;
|
||||
|
||||
@@ -10,6 +10,24 @@ import {
|
||||
} from "./provider-tools.js";
|
||||
|
||||
describe("buildProviderToolCompatFamilyHooks", () => {
|
||||
function normalizeOpenAIParameters(parameters: unknown): unknown {
|
||||
const hooks = buildProviderToolCompatFamilyHooks("openai");
|
||||
const tools = [{ name: "demo", description: "", parameters }] as never;
|
||||
const normalized = hooks.normalizeToolSchemas({
|
||||
provider: "openai",
|
||||
modelId: "gpt-5.4",
|
||||
modelApi: "openai-responses",
|
||||
model: {
|
||||
provider: "openai",
|
||||
api: "openai-responses",
|
||||
baseUrl: "https://api.openai.com/v1",
|
||||
id: "gpt-5.4",
|
||||
} as never,
|
||||
tools,
|
||||
});
|
||||
return normalized[0]?.parameters;
|
||||
}
|
||||
|
||||
it("covers the tool compat family matrix", () => {
|
||||
const cases = [
|
||||
{
|
||||
@@ -101,37 +119,51 @@ describe("buildProviderToolCompatFamilyHooks", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves nested empty property schemas and object annotations", () => {
|
||||
const hooks = buildProviderToolCompatFamilyHooks("openai");
|
||||
const parameters = {
|
||||
type: "object",
|
||||
properties: {
|
||||
payload: {},
|
||||
mode: {
|
||||
type: "string",
|
||||
default: {},
|
||||
const: {},
|
||||
it("preserves nested schemas and annotation objects while normalizing strict openai schemas", () => {
|
||||
const cases = [
|
||||
{
|
||||
name: "property schema",
|
||||
parameters: {
|
||||
type: "object",
|
||||
properties: { payload: {} },
|
||||
required: ["payload"],
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
required: ["payload", "mode"],
|
||||
additionalProperties: false,
|
||||
};
|
||||
const tools = [{ name: "demo", description: "", parameters }] as never;
|
||||
{
|
||||
name: "schema maps",
|
||||
parameters: {
|
||||
type: "object",
|
||||
properties: { mode: { $defs: { nested: {} }, dependentSchemas: { flag: {} } } },
|
||||
required: ["mode"],
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nested schema arrays",
|
||||
parameters: {
|
||||
type: "object",
|
||||
properties: { mode: { anyOf: [{}], prefixItems: [{}] } },
|
||||
required: ["mode"],
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "annotation objects",
|
||||
parameters: {
|
||||
type: "object",
|
||||
properties: { mode: { type: "string", default: {}, const: {}, examples: [{}] } },
|
||||
required: ["mode"],
|
||||
additionalProperties: false,
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
const normalized = hooks.normalizeToolSchemas({
|
||||
provider: "openai",
|
||||
modelId: "gpt-5.4",
|
||||
modelApi: "openai-responses",
|
||||
model: {
|
||||
provider: "openai",
|
||||
api: "openai-responses",
|
||||
baseUrl: "https://api.openai.com/v1",
|
||||
id: "gpt-5.4",
|
||||
} as never,
|
||||
tools,
|
||||
});
|
||||
|
||||
expect(normalized[0]?.parameters).toEqual(parameters);
|
||||
for (const testCase of cases) {
|
||||
expect(normalizeOpenAIParameters(testCase.parameters), testCase.name).toEqual(
|
||||
testCase.parameters,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
it("does not tighten permissive object schemas just to satisfy strict mode", () => {
|
||||
|
||||
Reference in New Issue
Block a user