mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 20:51:10 +00:00
fix(reply): use runtime snapshot for queued reply runs (#62693)
Merged via squash.
Prepared head SHA: 2a3e4e5c60
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
@@ -125,6 +125,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/model resolution: let explicit `openai-codex/gpt-5.4` selection prefer provider runtime metadata when it reports a larger context window, keeping configured Codex runs aligned with the live provider limits. (#62694) Thanks @ruclaw7.
|
||||
- Agents/model resolution: keep explicit-model runtime comparisons on the configured workspace plugin registry, so workspace-installed providers do not silently fall back to stale explicit metadata during runtime model lookup.
|
||||
- Providers/Z.AI: default onboarding and endpoint detection to GLM-5.1 instead of GLM-5. (#61998) Thanks @serg0x.
|
||||
- Reply execution: prefer the active runtime snapshot over stale queued reply config during embedded reply and follow-up execution so SecretRef-backed reply turns stop crashing after secrets have already resolved. (#62693) Thanks @mbelinky.
|
||||
|
||||
## 2026.4.5
|
||||
|
||||
|
||||
@@ -103,6 +103,7 @@ vi.mock("./agent-runner-utils.js", () => ({
|
||||
params.provider === params.run.provider ? params.run.authProfileIdSource : undefined,
|
||||
},
|
||||
}),
|
||||
resolveQueuedReplyRuntimeConfig: <T>(config: T) => config,
|
||||
resolveModelFallbackOptions: vi.fn(() => ({})),
|
||||
}));
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { resolveRunAuthProfile } from "./agent-runner-auth-profile.js";
|
||||
import {
|
||||
buildEmbeddedRunExecutionParams,
|
||||
resolveQueuedReplyRuntimeConfig,
|
||||
resolveModelFallbackOptions,
|
||||
} from "./agent-runner-utils.js";
|
||||
import { type BlockReplyPipeline } from "./block-reply-pipeline.js";
|
||||
@@ -523,10 +524,18 @@ export async function runAgentTurnWithFallback(params: {
|
||||
let autoCompactionCount = 0;
|
||||
// Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates.
|
||||
const directlySentBlockKeys = new Set<string>();
|
||||
const runtimeConfig = resolveQueuedReplyRuntimeConfig(params.followupRun.run.config);
|
||||
const effectiveRun =
|
||||
runtimeConfig === params.followupRun.run.config
|
||||
? params.followupRun.run
|
||||
: {
|
||||
...params.followupRun.run,
|
||||
config: runtimeConfig,
|
||||
};
|
||||
|
||||
const runId = params.opts?.runId ?? crypto.randomUUID();
|
||||
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
|
||||
cfg: params.followupRun.run.config,
|
||||
cfg: runtimeConfig,
|
||||
sessionKey: params.sessionKey,
|
||||
workspaceDir: params.followupRun.run.workspaceDir,
|
||||
});
|
||||
@@ -728,7 +737,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
);
|
||||
}
|
||||
|
||||
if (isCliProvider(provider, params.followupRun.run.config)) {
|
||||
if (isCliProvider(provider, runtimeConfig)) {
|
||||
const startedAt = Date.now();
|
||||
notifyAgentRunStart();
|
||||
emitAgentEvent({
|
||||
@@ -756,7 +765,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
agentId: params.followupRun.run.agentId,
|
||||
sessionFile: params.followupRun.run.sessionFile,
|
||||
workspaceDir: params.followupRun.run.workspaceDir,
|
||||
config: params.followupRun.run.config,
|
||||
config: runtimeConfig,
|
||||
prompt: params.commandBody,
|
||||
provider,
|
||||
model,
|
||||
@@ -850,7 +859,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
}
|
||||
const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams(
|
||||
{
|
||||
run: params.followupRun.run,
|
||||
run: effectiveRun,
|
||||
sessionCtx: params.sessionCtx,
|
||||
hasRepliedRef: params.opts?.hasRepliedRef,
|
||||
provider,
|
||||
@@ -1030,8 +1039,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
// Keep custom compaction callbacks active, but gate the
|
||||
// fallback user-facing notice behind explicit opt-in.
|
||||
const notifyUser =
|
||||
params.followupRun.run.config.agents?.defaults?.compaction?.notifyUser ===
|
||||
true;
|
||||
runtimeConfig?.agents?.defaults?.compaction?.notifyUser === true;
|
||||
if (params.opts?.onCompactionStart) {
|
||||
await params.opts.onCompactionStart();
|
||||
} else if (notifyUser && params.opts?.onBlockReply) {
|
||||
|
||||
83
src/auto-reply/reply/agent-runner-runtime-config.test.ts
Normal file
83
src/auto-reply/reply/agent-runner-runtime-config.test.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
clearRuntimeConfigSnapshot,
|
||||
setRuntimeConfigSnapshot,
|
||||
type OpenClawConfig,
|
||||
} from "../../config/config.js";
|
||||
import {
|
||||
buildEmbeddedRunBaseParams,
|
||||
resolveProviderScopedAuthProfile,
|
||||
} from "./agent-runner-utils.js";
|
||||
import type { FollowupRun } from "./queue.js";
|
||||
|
||||
function makeRun(config: OpenClawConfig): FollowupRun["run"] {
|
||||
return {
|
||||
sessionId: "session-1",
|
||||
agentId: "agent-1",
|
||||
config,
|
||||
provider: "openai",
|
||||
model: "gpt-4.1",
|
||||
agentDir: "/tmp/agent",
|
||||
sessionKey: "agent:test:session",
|
||||
sessionFile: "/tmp/session.json",
|
||||
workspaceDir: "/tmp/workspace",
|
||||
skillsSnapshot: [],
|
||||
ownerNumbers: ["+15550001"],
|
||||
enforceFinalTag: false,
|
||||
thinkLevel: "medium",
|
||||
verboseLevel: "off",
|
||||
reasoningLevel: "none",
|
||||
execOverrides: {},
|
||||
bashElevated: false,
|
||||
timeoutMs: 60_000,
|
||||
} as unknown as FollowupRun["run"];
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
clearRuntimeConfigSnapshot();
|
||||
});
|
||||
|
||||
describe("buildEmbeddedRunBaseParams runtime config", () => {
|
||||
it("prefers the active runtime snapshot when queued reply config still contains SecretRefs", () => {
|
||||
const sourceConfig: OpenClawConfig = {
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: "https://api.openai.com/v1",
|
||||
apiKey: {
|
||||
source: "env",
|
||||
provider: "default",
|
||||
id: "OPENAI_API_KEY",
|
||||
},
|
||||
models: [],
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const runtimeConfig: OpenClawConfig = {
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: "https://api.openai.com/v1",
|
||||
apiKey: "resolved-runtime-key",
|
||||
models: [],
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
setRuntimeConfigSnapshot(runtimeConfig, sourceConfig);
|
||||
|
||||
const resolved = buildEmbeddedRunBaseParams({
|
||||
run: makeRun(sourceConfig),
|
||||
provider: "openai",
|
||||
model: "gpt-4.1-mini",
|
||||
runId: "run-1",
|
||||
authProfile: resolveProviderScopedAuthProfile({
|
||||
provider: "openai",
|
||||
primaryProvider: "openai",
|
||||
}),
|
||||
});
|
||||
|
||||
expect(resolved.config).toBe(runtimeConfig);
|
||||
});
|
||||
});
|
||||
@@ -2,7 +2,7 @@ import { resolveRunModelFallbacksOverride } from "../../agents/agent-scope.js";
|
||||
import { getChannelPlugin } from "../../channels/plugins/index.js";
|
||||
import type { ChannelId, ChannelThreadingToolContext } from "../../channels/plugins/types.js";
|
||||
import { normalizeAnyChannelId, normalizeChannelId } from "../../channels/registry.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { getRuntimeConfigSnapshot, type OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
@@ -19,6 +19,12 @@ import type { FollowupRun } from "./queue.js";
|
||||
|
||||
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i;
|
||||
|
||||
export function resolveQueuedReplyRuntimeConfig(config: OpenClawConfig): OpenClawConfig {
|
||||
return (
|
||||
(typeof getRuntimeConfigSnapshot === "function" ? getRuntimeConfigSnapshot() : null) ?? config
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build provider-specific threading context for tool auto-injection.
|
||||
*/
|
||||
@@ -105,20 +111,21 @@ export const resolveEnforceFinalTag = (
|
||||
(run.skipProviderRuntimeHints ? false : undefined) ??
|
||||
(run.enforceFinalTag ||
|
||||
isReasoningTagProvider(provider, {
|
||||
config: run.config,
|
||||
config: resolveQueuedReplyRuntimeConfig(run.config),
|
||||
workspaceDir: run.workspaceDir,
|
||||
modelId: model,
|
||||
})),
|
||||
);
|
||||
|
||||
export function resolveModelFallbackOptions(run: FollowupRun["run"]) {
|
||||
const config = resolveQueuedReplyRuntimeConfig(run.config);
|
||||
return {
|
||||
cfg: run.config,
|
||||
cfg: config,
|
||||
provider: run.provider,
|
||||
model: run.model,
|
||||
agentDir: run.agentDir,
|
||||
fallbacksOverride: resolveRunModelFallbacksOverride({
|
||||
cfg: run.config,
|
||||
cfg: config,
|
||||
agentId: run.agentId,
|
||||
sessionKey: run.sessionKey,
|
||||
}),
|
||||
@@ -133,11 +140,12 @@ export function buildEmbeddedRunBaseParams(params: {
|
||||
authProfile: ReturnType<typeof resolveProviderScopedAuthProfile>;
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
}) {
|
||||
const config = resolveQueuedReplyRuntimeConfig(params.run.config);
|
||||
return {
|
||||
sessionFile: params.run.sessionFile,
|
||||
workspaceDir: params.run.workspaceDir,
|
||||
agentDir: params.run.agentDir,
|
||||
config: params.run.config,
|
||||
config,
|
||||
skillsSnapshot: params.run.skillsSnapshot,
|
||||
ownerNumbers: params.run.ownerNumbers,
|
||||
inputProvenance: params.run.inputProvenance,
|
||||
@@ -163,6 +171,7 @@ export function buildEmbeddedContextFromTemplate(params: {
|
||||
sessionCtx: TemplateContext;
|
||||
hasRepliedRef: { value: boolean } | undefined;
|
||||
}) {
|
||||
const config = resolveQueuedReplyRuntimeConfig(params.run.config);
|
||||
return {
|
||||
sessionId: params.run.sessionId,
|
||||
sessionKey: params.run.sessionKey,
|
||||
@@ -180,7 +189,7 @@ export function buildEmbeddedContextFromTemplate(params: {
|
||||
// Provider threading context for tool auto-injection
|
||||
...buildThreadingToolContext({
|
||||
sessionCtx: params.sessionCtx,
|
||||
config: params.run.config,
|
||||
config,
|
||||
hasRepliedRef: params.hasRepliedRef,
|
||||
}),
|
||||
};
|
||||
|
||||
@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { SessionEntry } from "../../config/sessions/types.js";
|
||||
import type { FollowupRun, QueueSettings } from "./queue.js";
|
||||
|
||||
@@ -10,6 +11,16 @@ const compactEmbeddedPiSessionMock = vi.fn();
|
||||
const routeReplyMock = vi.fn();
|
||||
const isRoutableChannelMock = vi.fn();
|
||||
const runPreflightCompactionIfNeededMock = vi.fn();
|
||||
let createFollowupRunner: typeof import("./followup-runner.js").createFollowupRunner;
|
||||
let clearRuntimeConfigSnapshot: typeof import("../../config/config.js").clearRuntimeConfigSnapshot;
|
||||
let loadSessionStore: typeof import("../../config/sessions/store.js").loadSessionStore;
|
||||
let saveSessionStore: typeof import("../../config/sessions/store.js").saveSessionStore;
|
||||
let clearFollowupQueue: typeof import("./queue.js").clearFollowupQueue;
|
||||
let enqueueFollowupRun: typeof import("./queue.js").enqueueFollowupRun;
|
||||
let sessionRunAccounting: typeof import("./session-run-accounting.js");
|
||||
let setRuntimeConfigSnapshot: typeof import("../../config/config.js").setRuntimeConfigSnapshot;
|
||||
let createMockFollowupRun: typeof import("./test-helpers.js").createMockFollowupRun;
|
||||
let createMockTypingController: typeof import("./test-helpers.js").createMockTypingController;
|
||||
const FOLLOWUP_DEBUG = process.env.OPENCLAW_DEBUG_FOLLOWUP_RUNNER_TEST === "1";
|
||||
const FOLLOWUP_TEST_QUEUES = new Map<
|
||||
string,
|
||||
@@ -223,50 +234,54 @@ async function persistRunSessionUsageForFollowupTest(
|
||||
await saveSessionStore(storePath, store);
|
||||
}
|
||||
|
||||
vi.mock(
|
||||
"../../agents/model-fallback.js",
|
||||
async () => await import("../../test-utils/model-fallback.mock.js"),
|
||||
);
|
||||
vi.mock("../../agents/session-write-lock.js", () => ({
|
||||
acquireSessionWriteLock: vi.fn(async () => ({
|
||||
release: async () => {},
|
||||
})),
|
||||
resolveSessionLockMaxHoldFromTimeout: vi.fn(() => 1),
|
||||
}));
|
||||
vi.mock("../../agents/pi-embedded.js", () => ({
|
||||
abortEmbeddedPiRun: vi.fn(async () => false),
|
||||
compactEmbeddedPiSession: (params: unknown) => compactEmbeddedPiSessionMock(params),
|
||||
isEmbeddedPiRunActive: vi.fn(() => false),
|
||||
isEmbeddedPiRunStreaming: vi.fn(() => false),
|
||||
queueEmbeddedPiMessage: vi.fn(async () => undefined),
|
||||
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
|
||||
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
|
||||
waitForEmbeddedPiRunEnd: vi.fn(async () => undefined),
|
||||
}));
|
||||
vi.mock("./queue.js", () => ({
|
||||
clearFollowupQueue: clearFollowupQueueForFollowupTest,
|
||||
enqueueFollowupRun: enqueueFollowupRunForFollowupTest,
|
||||
refreshQueuedFollowupSession: refreshQueuedFollowupSessionForFollowupTest,
|
||||
}));
|
||||
vi.mock("./session-run-accounting.js", () => ({
|
||||
persistRunSessionUsage: persistRunSessionUsageForFollowupTest,
|
||||
incrementRunCompactionCount: incrementRunCompactionCountForFollowupTest,
|
||||
}));
|
||||
vi.mock("./agent-runner-memory.js", () => ({
|
||||
runPreflightCompactionIfNeeded: (...args: unknown[]) =>
|
||||
runPreflightCompactionIfNeededMock(...args),
|
||||
}));
|
||||
vi.mock("./route-reply.js", () => ({
|
||||
isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args),
|
||||
routeReply: (...args: unknown[]) => routeReplyMock(...args),
|
||||
}));
|
||||
|
||||
const { createFollowupRunner } = await import("./followup-runner.js");
|
||||
const { loadSessionStore, saveSessionStore, clearSessionStoreCacheForTest } =
|
||||
await import("../../config/sessions/store.js");
|
||||
const { clearFollowupQueue, enqueueFollowupRun } = await import("./queue.js");
|
||||
const sessionRunAccounting = await import("./session-run-accounting.js");
|
||||
const { createMockFollowupRun, createMockTypingController } = await import("./test-helpers.js");
|
||||
async function loadFreshFollowupRunnerModuleForTest() {
|
||||
vi.resetModules();
|
||||
vi.doUnmock("../../config/config.js");
|
||||
vi.doMock(
|
||||
"../../agents/model-fallback.js",
|
||||
async () => await import("../../test-utils/model-fallback.mock.js"),
|
||||
);
|
||||
vi.doMock("../../agents/session-write-lock.js", () => ({
|
||||
acquireSessionWriteLock: vi.fn(async () => ({
|
||||
release: async () => {},
|
||||
})),
|
||||
resolveSessionLockMaxHoldFromTimeout: vi.fn(() => 1),
|
||||
}));
|
||||
vi.doMock("../../agents/pi-embedded.js", () => ({
|
||||
abortEmbeddedPiRun: vi.fn(async () => false),
|
||||
compactEmbeddedPiSession: (params: unknown) => compactEmbeddedPiSessionMock(params),
|
||||
isEmbeddedPiRunActive: vi.fn(() => false),
|
||||
isEmbeddedPiRunStreaming: vi.fn(() => false),
|
||||
queueEmbeddedPiMessage: vi.fn(async () => undefined),
|
||||
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
|
||||
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
|
||||
waitForEmbeddedPiRunEnd: vi.fn(async () => undefined),
|
||||
}));
|
||||
vi.doMock("./queue.js", () => ({
|
||||
clearFollowupQueue: clearFollowupQueueForFollowupTest,
|
||||
enqueueFollowupRun: enqueueFollowupRunForFollowupTest,
|
||||
refreshQueuedFollowupSession: refreshQueuedFollowupSessionForFollowupTest,
|
||||
}));
|
||||
vi.doMock("./session-run-accounting.js", () => ({
|
||||
persistRunSessionUsage: persistRunSessionUsageForFollowupTest,
|
||||
incrementRunCompactionCount: incrementRunCompactionCountForFollowupTest,
|
||||
}));
|
||||
vi.doMock("./agent-runner-memory.js", () => ({
|
||||
runPreflightCompactionIfNeeded: (...args: unknown[]) =>
|
||||
runPreflightCompactionIfNeededMock(...args),
|
||||
}));
|
||||
vi.doMock("./route-reply.js", () => ({
|
||||
isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args),
|
||||
routeReply: (...args: unknown[]) => routeReplyMock(...args),
|
||||
}));
|
||||
({ createFollowupRunner } = await import("./followup-runner.js"));
|
||||
({ clearRuntimeConfigSnapshot, setRuntimeConfigSnapshot } =
|
||||
await import("../../config/config.js"));
|
||||
({ loadSessionStore, saveSessionStore } = await import("../../config/sessions/store.js"));
|
||||
({ clearFollowupQueue, enqueueFollowupRun } = await import("./queue.js"));
|
||||
sessionRunAccounting = await import("./session-run-accounting.js");
|
||||
({ createMockFollowupRun, createMockTypingController } = await import("./test-helpers.js"));
|
||||
}
|
||||
|
||||
const ROUTABLE_TEST_CHANNELS = new Set([
|
||||
"telegram",
|
||||
@@ -279,6 +294,9 @@ const ROUTABLE_TEST_CHANNELS = new Set([
|
||||
]);
|
||||
|
||||
beforeEach(async () => {
|
||||
await loadFreshFollowupRunnerModuleForTest();
|
||||
await loadFreshFollowupRunnerModuleForTest();
|
||||
clearRuntimeConfigSnapshot?.();
|
||||
runEmbeddedPiAgentMock.mockReset();
|
||||
compactEmbeddedPiSessionMock.mockReset();
|
||||
runPreflightCompactionIfNeededMock.mockReset();
|
||||
@@ -297,11 +315,13 @@ beforeEach(async () => {
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
clearRuntimeConfigSnapshot?.();
|
||||
clearFollowupQueue("main");
|
||||
FOLLOWUP_TEST_QUEUES.clear();
|
||||
FOLLOWUP_TEST_SESSION_STORES.clear();
|
||||
vi.clearAllTimers();
|
||||
vi.useRealTimers();
|
||||
const { clearSessionStoreCacheForTest } = await import("../../config/sessions/store.js");
|
||||
clearSessionStoreCacheForTest();
|
||||
if (!FOLLOWUP_DEBUG) {
|
||||
return;
|
||||
@@ -354,6 +374,65 @@ function createAsyncReplySpy() {
|
||||
return vi.fn(async () => {});
|
||||
}
|
||||
|
||||
describe("createFollowupRunner runtime config", () => {
|
||||
it("uses the active runtime snapshot for queued embedded followup runs", async () => {
|
||||
const sourceConfig: OpenClawConfig = {
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: "https://api.openai.com/v1",
|
||||
apiKey: {
|
||||
source: "env",
|
||||
provider: "default",
|
||||
id: "OPENAI_API_KEY",
|
||||
},
|
||||
models: [],
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const runtimeConfig: OpenClawConfig = {
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: "https://api.openai.com/v1",
|
||||
apiKey: "resolved-runtime-key",
|
||||
models: [],
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
setRuntimeConfigSnapshot(runtimeConfig, sourceConfig);
|
||||
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
||||
payloads: [],
|
||||
meta: {},
|
||||
});
|
||||
|
||||
const runner = createFollowupRunner({
|
||||
typing: createMockTypingController(),
|
||||
typingMode: "instant",
|
||||
defaultModel: "openai/gpt-5.4",
|
||||
});
|
||||
|
||||
await runner(
|
||||
createQueuedRun({
|
||||
run: {
|
||||
config: sourceConfig,
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as
|
||||
| {
|
||||
config?: unknown;
|
||||
}
|
||||
| undefined;
|
||||
expect(call?.config).toBe(runtimeConfig);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createFollowupRunner compaction", () => {
|
||||
it("adds verbose auto-compaction notice and tracks count", async () => {
|
||||
const storePath = path.join(
|
||||
|
||||
@@ -21,7 +21,7 @@ import { stripHeartbeatToken } from "../heartbeat.js";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { runPreflightCompactionIfNeeded } from "./agent-runner-memory.js";
|
||||
import { resolveRunAuthProfile } from "./agent-runner-utils.js";
|
||||
import { resolveQueuedReplyRuntimeConfig, resolveRunAuthProfile } from "./agent-runner-utils.js";
|
||||
import { resolveFollowupDeliveryPayloads } from "./followup-delivery.js";
|
||||
import { resolveOriginMessageProvider } from "./origin-routing.js";
|
||||
import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
|
||||
@@ -70,6 +70,7 @@ export function createFollowupRunner(params: {
|
||||
const sendFollowupPayloads = async (payloads: ReplyPayload[], queued: FollowupRun) => {
|
||||
// Check if we should route to originating channel.
|
||||
const { originatingChannel, originatingTo } = queued;
|
||||
const runtimeConfig = resolveQueuedReplyRuntimeConfig(queued.run.config);
|
||||
const shouldRouteToOriginating = isRoutableChannel(originatingChannel) && originatingTo;
|
||||
|
||||
if (!shouldRouteToOriginating && !opts?.onBlockReply) {
|
||||
@@ -98,7 +99,7 @@ export function createFollowupRunner(params: {
|
||||
sessionKey: queued.run.sessionKey,
|
||||
accountId: queued.originatingAccountId,
|
||||
threadId: queued.originatingThreadId,
|
||||
cfg: queued.run.config,
|
||||
cfg: runtimeConfig,
|
||||
});
|
||||
if (!result.ok) {
|
||||
const errorMsg = result.error ?? "unknown error";
|
||||
@@ -127,8 +128,14 @@ export function createFollowupRunner(params: {
|
||||
|
||||
return async (queued: FollowupRun) => {
|
||||
const replySessionKey = queued.run.sessionKey ?? sessionKey;
|
||||
const runtimeConfig = resolveQueuedReplyRuntimeConfig(queued.run.config);
|
||||
const effectiveQueued =
|
||||
runtimeConfig === queued.run.config
|
||||
? queued
|
||||
: { ...queued, run: { ...queued.run, config: runtimeConfig } };
|
||||
const run = effectiveQueued.run;
|
||||
const replyOperation = createReplyOperation({
|
||||
sessionId: queued.run.sessionId,
|
||||
sessionId: run.sessionId,
|
||||
sessionKey: replySessionKey ?? "",
|
||||
resetTriggered: false,
|
||||
upstreamAbortSignal: opts?.abortSignal,
|
||||
@@ -138,25 +145,25 @@ export function createFollowupRunner(params: {
|
||||
const shouldSurfaceToControlUi = isInternalMessageChannel(
|
||||
resolveOriginMessageProvider({
|
||||
originatingChannel: queued.originatingChannel,
|
||||
provider: queued.run.messageProvider,
|
||||
provider: run.messageProvider,
|
||||
}),
|
||||
);
|
||||
if (queued.run.sessionKey) {
|
||||
if (run.sessionKey) {
|
||||
registerAgentRunContext(runId, {
|
||||
sessionKey: queued.run.sessionKey,
|
||||
verboseLevel: queued.run.verboseLevel,
|
||||
sessionKey: run.sessionKey,
|
||||
verboseLevel: run.verboseLevel,
|
||||
isControlUiVisible: shouldSurfaceToControlUi,
|
||||
});
|
||||
}
|
||||
let autoCompactionCount = 0;
|
||||
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
|
||||
let fallbackProvider = queued.run.provider;
|
||||
let fallbackModel = queued.run.model;
|
||||
let fallbackProvider = run.provider;
|
||||
let fallbackModel = run.model;
|
||||
let activeSessionEntry =
|
||||
(sessionKey ? sessionStore?.[sessionKey] : undefined) ?? sessionEntry;
|
||||
activeSessionEntry = await runPreflightCompactionIfNeeded({
|
||||
cfg: queued.run.config,
|
||||
followupRun: queued,
|
||||
cfg: runtimeConfig,
|
||||
followupRun: effectiveQueued,
|
||||
promptForEstimate: queued.prompt,
|
||||
defaultModel,
|
||||
agentCfgContextTokens,
|
||||
@@ -173,30 +180,30 @@ export function createFollowupRunner(params: {
|
||||
replyOperation.setPhase("running");
|
||||
try {
|
||||
const fallbackResult = await runWithModelFallback({
|
||||
cfg: queued.run.config,
|
||||
provider: queued.run.provider,
|
||||
model: queued.run.model,
|
||||
cfg: runtimeConfig,
|
||||
provider: run.provider,
|
||||
model: run.model,
|
||||
runId,
|
||||
agentDir: queued.run.agentDir,
|
||||
agentDir: run.agentDir,
|
||||
fallbacksOverride: resolveRunModelFallbacksOverride({
|
||||
cfg: queued.run.config,
|
||||
agentId: queued.run.agentId,
|
||||
sessionKey: queued.run.sessionKey,
|
||||
cfg: runtimeConfig,
|
||||
agentId: run.agentId,
|
||||
sessionKey: run.sessionKey,
|
||||
}),
|
||||
run: async (provider, model, runOptions) => {
|
||||
const authProfile = resolveRunAuthProfile(queued.run, provider);
|
||||
const authProfile = resolveRunAuthProfile(run, provider);
|
||||
let attemptCompactionCount = 0;
|
||||
try {
|
||||
const result = await runEmbeddedPiAgent({
|
||||
allowGatewaySubagentBinding: true,
|
||||
replyOperation,
|
||||
sessionId: queued.run.sessionId,
|
||||
sessionKey: queued.run.sessionKey,
|
||||
agentId: queued.run.agentId,
|
||||
sessionId: run.sessionId,
|
||||
sessionKey: run.sessionKey,
|
||||
agentId: run.agentId,
|
||||
trigger: "user",
|
||||
messageChannel: queued.originatingChannel ?? undefined,
|
||||
messageProvider: queued.run.messageProvider,
|
||||
agentAccountId: queued.run.agentAccountId,
|
||||
messageProvider: run.messageProvider,
|
||||
agentAccountId: run.agentAccountId,
|
||||
messageTo: queued.originatingTo,
|
||||
messageThreadId: queued.originatingThreadId,
|
||||
currentChannelId: queued.originatingTo,
|
||||
@@ -204,36 +211,36 @@ export function createFollowupRunner(params: {
|
||||
queued.originatingThreadId != null
|
||||
? String(queued.originatingThreadId)
|
||||
: undefined,
|
||||
groupId: queued.run.groupId,
|
||||
groupChannel: queued.run.groupChannel,
|
||||
groupSpace: queued.run.groupSpace,
|
||||
senderId: queued.run.senderId,
|
||||
senderName: queued.run.senderName,
|
||||
senderUsername: queued.run.senderUsername,
|
||||
senderE164: queued.run.senderE164,
|
||||
senderIsOwner: queued.run.senderIsOwner,
|
||||
sessionFile: queued.run.sessionFile,
|
||||
agentDir: queued.run.agentDir,
|
||||
workspaceDir: queued.run.workspaceDir,
|
||||
config: queued.run.config,
|
||||
skillsSnapshot: queued.run.skillsSnapshot,
|
||||
groupId: run.groupId,
|
||||
groupChannel: run.groupChannel,
|
||||
groupSpace: run.groupSpace,
|
||||
senderId: run.senderId,
|
||||
senderName: run.senderName,
|
||||
senderUsername: run.senderUsername,
|
||||
senderE164: run.senderE164,
|
||||
senderIsOwner: run.senderIsOwner,
|
||||
sessionFile: run.sessionFile,
|
||||
agentDir: run.agentDir,
|
||||
workspaceDir: run.workspaceDir,
|
||||
config: runtimeConfig,
|
||||
skillsSnapshot: run.skillsSnapshot,
|
||||
prompt: queued.prompt,
|
||||
extraSystemPrompt: queued.run.extraSystemPrompt,
|
||||
ownerNumbers: queued.run.ownerNumbers,
|
||||
enforceFinalTag: queued.run.enforceFinalTag,
|
||||
extraSystemPrompt: run.extraSystemPrompt,
|
||||
ownerNumbers: run.ownerNumbers,
|
||||
enforceFinalTag: run.enforceFinalTag,
|
||||
provider,
|
||||
model,
|
||||
...authProfile,
|
||||
thinkLevel: queued.run.thinkLevel,
|
||||
verboseLevel: queued.run.verboseLevel,
|
||||
reasoningLevel: queued.run.reasoningLevel,
|
||||
thinkLevel: run.thinkLevel,
|
||||
verboseLevel: run.verboseLevel,
|
||||
reasoningLevel: run.reasoningLevel,
|
||||
suppressToolErrorWarnings: opts?.suppressToolErrorWarnings,
|
||||
execOverrides: queued.run.execOverrides,
|
||||
bashElevated: queued.run.bashElevated,
|
||||
timeoutMs: queued.run.timeoutMs,
|
||||
execOverrides: run.execOverrides,
|
||||
bashElevated: run.bashElevated,
|
||||
timeoutMs: run.timeoutMs,
|
||||
runId,
|
||||
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
|
||||
blockReplyBreak: queued.run.blockReplyBreak,
|
||||
blockReplyBreak: run.blockReplyBreak,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature:
|
||||
bootstrapPromptWarningSignaturesSeen[
|
||||
@@ -287,7 +294,7 @@ export function createFollowupRunner(params: {
|
||||
await persistRunSessionUsage({
|
||||
storePath,
|
||||
sessionKey,
|
||||
cfg: queued.run.config,
|
||||
cfg: runtimeConfig,
|
||||
usage,
|
||||
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
|
||||
promptTokens,
|
||||
@@ -296,10 +303,7 @@ export function createFollowupRunner(params: {
|
||||
contextTokensUsed,
|
||||
systemPromptReport: runResult.meta?.systemPromptReport,
|
||||
cliSessionBinding: runResult.meta?.agentMeta?.cliSessionBinding,
|
||||
usageIsContextSnapshot: isCliProvider(
|
||||
fallbackProvider ?? queued.run.provider,
|
||||
queued.run.config,
|
||||
),
|
||||
usageIsContextSnapshot: isCliProvider(fallbackProvider ?? run.provider, runtimeConfig),
|
||||
logLabel: "followup",
|
||||
});
|
||||
}
|
||||
@@ -321,10 +325,10 @@ export function createFollowupRunner(params: {
|
||||
return [{ ...payload, text: stripped.text }];
|
||||
});
|
||||
const finalPayloads = resolveFollowupDeliveryPayloads({
|
||||
cfg: queued.run.config,
|
||||
cfg: runtimeConfig,
|
||||
payloads: sanitizedPayloads,
|
||||
messageProvider: queued.run.messageProvider,
|
||||
originatingAccountId: queued.originatingAccountId ?? queued.run.agentAccountId,
|
||||
messageProvider: run.messageProvider,
|
||||
originatingAccountId: queued.originatingAccountId ?? run.agentAccountId,
|
||||
originatingChannel: queued.originatingChannel,
|
||||
originatingChatType: queued.originatingChatType,
|
||||
originatingTo: queued.originatingTo,
|
||||
@@ -338,9 +342,9 @@ export function createFollowupRunner(params: {
|
||||
}
|
||||
|
||||
if (autoCompactionCount > 0) {
|
||||
const previousSessionId = queued.run.sessionId;
|
||||
const previousSessionId = run.sessionId;
|
||||
const count = await incrementRunCompactionCount({
|
||||
cfg: queued.run.config,
|
||||
cfg: runtimeConfig,
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey,
|
||||
@@ -353,7 +357,7 @@ export function createFollowupRunner(params: {
|
||||
const refreshedSessionEntry =
|
||||
sessionKey && sessionStore ? sessionStore[sessionKey] : undefined;
|
||||
if (refreshedSessionEntry) {
|
||||
const queueKey = queued.run.sessionKey ?? sessionKey;
|
||||
const queueKey = run.sessionKey ?? sessionKey;
|
||||
if (queueKey) {
|
||||
refreshQueuedFollowupSession({
|
||||
key: queueKey,
|
||||
@@ -363,7 +367,7 @@ export function createFollowupRunner(params: {
|
||||
});
|
||||
}
|
||||
}
|
||||
if (queued.run.verboseLevel && queued.run.verboseLevel !== "off") {
|
||||
if (run.verboseLevel && run.verboseLevel !== "off") {
|
||||
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
||||
finalPayloads.unshift({
|
||||
text: `🧹 Auto-compaction complete${suffix}.`,
|
||||
@@ -371,7 +375,7 @@ export function createFollowupRunner(params: {
|
||||
}
|
||||
}
|
||||
|
||||
await sendFollowupPayloads(finalPayloads, queued);
|
||||
await sendFollowupPayloads(finalPayloads, effectiveQueued);
|
||||
} finally {
|
||||
replyOperation.complete();
|
||||
// Both signals are required for the typing controller to clean up.
|
||||
|
||||
Reference in New Issue
Block a user