mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 21:34:06 +00:00
fix(agents): classify expired thinking signatures (#88340)
Summary: - The branch adds thinking-signature replay-invalid classification, retries matching terminal stream-error eve ... output, preserves static fallback model params, and updates related tests including a Copilot hook fixture. - PR surface: Source +57, Tests +177. Total +234 across 6 files. - Reproducibility: yes. for the classifier boundary: current main lacks a thinking-signature replay-invalid ma ... ort supplies the exact provider error payload. The time-dependent live expiry path was not reproduced here. Automerge notes: - PR branch already contained follow-up commit before automerge: fix(agents): classify expired thinking signatures - PR branch already contained follow-up commit before automerge: fix(agents): recover thinking signature stream errors - PR branch already contained follow-up commit before automerge: fix(agents): recover expired thinking signatures - PR branch already contained follow-up commit before automerge: fix(clawsweeper): address review for automerge-openclaw-openclaw-8807… Validation: - ClawSweeper review passed for headb65f2b8bda. - Required merge gates passed before the squash merge. Prepared head SHA:b65f2b8bdaReview: https://github.com/openclaw/openclaw/pull/88340#issuecomment-4582955790 Co-authored-by: Bryan Tegomoh <bryan.tegomoh@gmail.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -5,6 +5,7 @@ describe("createHooksBridge", () => {
|
||||
const hookBase = {
|
||||
sessionId: "runtime-session",
|
||||
timestamp: new Date(0),
|
||||
cwd: "/",
|
||||
workingDirectory: "/",
|
||||
};
|
||||
|
||||
@@ -40,6 +41,7 @@ describe("createHooksBridge", () => {
|
||||
const hooks = createHooksBridge({ onPreToolUse })!;
|
||||
const input = {
|
||||
...hookBase,
|
||||
cwd: "/tmp",
|
||||
workingDirectory: "/tmp",
|
||||
toolName: "bash",
|
||||
toolArgs: { cmd: "ls" },
|
||||
|
||||
@@ -1563,6 +1563,25 @@ describe("classifyProviderRuntimeFailureKind", () => {
|
||||
).toBe("replay_invalid");
|
||||
});
|
||||
|
||||
it("classifies expired Anthropic thinking signatures as replay invalid", () => {
|
||||
expect(
|
||||
classifyProviderRuntimeFailureKind(
|
||||
'{"type":"error","error":{"type":"invalid_request_error","message":"messages.1.content.440: Invalid `signature` in `thinking` block"}}',
|
||||
),
|
||||
).toBe("replay_invalid");
|
||||
expect(
|
||||
classifyProviderRuntimeFailureKind(
|
||||
"ValidationException: invalid signature on thinking block",
|
||||
),
|
||||
).toBe("replay_invalid");
|
||||
expect(
|
||||
classifyProviderRuntimeFailureKind(
|
||||
"ValidationException: signature present in thinking block",
|
||||
),
|
||||
).not.toBe("replay_invalid");
|
||||
expect(classifyProviderRuntimeFailureKind("Invalid signature")).not.toBe("replay_invalid");
|
||||
});
|
||||
|
||||
it("splits ambiguous provider runtime failures instead of collapsing to unknown", () => {
|
||||
expect(classifyProviderRuntimeFailureKind({})).toBe("empty_response");
|
||||
expect(classifyProviderRuntimeFailureKind("Unknown error (no error details in response)")).toBe(
|
||||
|
||||
@@ -356,6 +356,8 @@ const INTERRUPTED_NETWORK_ERROR_RE =
|
||||
/\beconnrefused\b|\beconnreset\b|\beconnaborted\b|\benetreset\b|\behostunreach\b|\behostdown\b|\benetunreach\b|\bepipe\b|\bsocket hang up\b|\bconnection refused\b|\bconnection reset\b|\bconnection aborted\b|\bnetwork is unreachable\b|\bhost is unreachable\b|\bfetch failed\b|\bconnection error\b|\bnetwork request failed\b/i;
|
||||
const REPLAY_INVALID_RE =
|
||||
/\bprevious_response_id\b.*\b(?:invalid|unknown|not found|does not exist|expired|mismatch)\b|\btool_(?:use|call)\.(?:input|arguments)\b.*\b(?:missing|required)\b|\bincorrect role information\b|\broles must alternate\b|\binput item id does not belong to this connection\b/i;
|
||||
const THINKING_SIGNATURE_ERROR_RE =
|
||||
/\b(?:invalid|expired)\b.*\bsignature\b|\bsignature\b.*\b(?:invalid|expired)\b/i;
|
||||
const SANDBOX_BLOCKED_RE =
|
||||
/\bapproval is required\b|\bapproval timed out\b|\bapproval was denied\b|\bblocked by sandbox\b|\bsandbox\b.*\b(?:blocked|denied|forbidden|disabled|not allowed)\b|\bexec denied\s*\(/i;
|
||||
const NO_BODY_HTTP_WRAPPER_RE =
|
||||
@@ -471,7 +473,11 @@ function isDnsTransportErrorMessage(raw: string): boolean {
|
||||
}
|
||||
|
||||
function isReplayInvalidErrorMessage(raw: string): boolean {
|
||||
return REPLAY_INVALID_RE.test(raw);
|
||||
return REPLAY_INVALID_RE.test(raw) || isThinkingSignatureReplayInvalidErrorMessage(raw);
|
||||
}
|
||||
|
||||
function isThinkingSignatureReplayInvalidErrorMessage(raw: string): boolean {
|
||||
return /\bthinking\b/i.test(raw) && THINKING_SIGNATURE_ERROR_RE.test(raw);
|
||||
}
|
||||
|
||||
function isSandboxBlockedErrorMessage(raw: string): boolean {
|
||||
|
||||
@@ -77,6 +77,7 @@ type ProviderRuntimeHooks = {
|
||||
type StaticCatalogFallbackModel = Model & {
|
||||
compat?: ModelCompatConfig;
|
||||
contextTokens?: number;
|
||||
params?: Record<string, unknown>;
|
||||
mediaInput?: ModelMediaInputConfig;
|
||||
};
|
||||
|
||||
@@ -1025,7 +1026,7 @@ function resolveConfiguredFallbackModel(params: {
|
||||
provider,
|
||||
modelId,
|
||||
providerParams: providerConfig?.params,
|
||||
configuredParams: configuredModel?.params,
|
||||
configuredParams: metadataModel?.params,
|
||||
});
|
||||
const fallbackTransport = resolveProviderTransport({
|
||||
provider,
|
||||
|
||||
@@ -482,6 +482,37 @@ describe("wrapAnthropicStreamWithRecovery", () => {
|
||||
const anthropicThinkingError = new Error(
|
||||
"thinking or redacted_thinking blocks in the latest assistant message cannot be modified",
|
||||
);
|
||||
const terminalThinkingSignatureError =
|
||||
"ValidationException: invalid signature on thinking block in message history";
|
||||
|
||||
function createTestAssistantMessage(
|
||||
overrides: Partial<AssistantMessage> & Pick<AssistantMessage, "content" | "stopReason">,
|
||||
): AssistantMessage {
|
||||
return castAgentMessage({
|
||||
role: "assistant",
|
||||
api: "anthropic-messages",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-6",
|
||||
usage: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
timestamp: 0,
|
||||
...overrides,
|
||||
}) as AssistantMessage;
|
||||
}
|
||||
|
||||
function createTestStreamErrorMessage(errorMessage: string): AssistantMessage {
|
||||
return createTestAssistantMessage({
|
||||
content: [{ type: "text", text: "stream failed" }],
|
||||
stopReason: "error",
|
||||
errorMessage,
|
||||
});
|
||||
}
|
||||
|
||||
it("retries once with omitted-reasoning text when the request is rejected before streaming", async () => {
|
||||
let callCount = 0;
|
||||
@@ -584,6 +615,131 @@ describe("wrapAnthropicStreamWithRecovery", () => {
|
||||
expect(callCount).toBe(2);
|
||||
});
|
||||
|
||||
it("retries pre-content terminal stream-error events with omitted-reasoning text", async () => {
|
||||
let callCount = 0;
|
||||
const contexts: Array<{ messages?: AgentMessage[] }> = [];
|
||||
const finalMessage = createTestAssistantMessage({
|
||||
content: [{ type: "text", text: "recovered" }],
|
||||
stopReason: "stop",
|
||||
});
|
||||
const wrapped = wrapAnthropicStreamWithRecovery(
|
||||
((_model, context) => {
|
||||
callCount += 1;
|
||||
const attempt = callCount;
|
||||
contexts.push(context as { messages?: AgentMessage[] });
|
||||
const stream = createAssistantMessageEventStream();
|
||||
queueMicrotask(() => {
|
||||
if (attempt === 1) {
|
||||
stream.push({
|
||||
type: "error",
|
||||
reason: "error",
|
||||
error: createTestStreamErrorMessage(terminalThinkingSignatureError),
|
||||
});
|
||||
} else {
|
||||
stream.push({ type: "done", reason: "stop", message: finalMessage });
|
||||
}
|
||||
stream.end();
|
||||
});
|
||||
return stream;
|
||||
}) as Parameters<typeof wrapAnthropicStreamWithRecovery>[0],
|
||||
{ id: "test-session" },
|
||||
);
|
||||
|
||||
const response = wrapped(
|
||||
{} as never,
|
||||
{
|
||||
messages: castAgentMessages([
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "thinking", thinking: "secret", thinkingSignature: "sig" }],
|
||||
},
|
||||
]),
|
||||
} as never,
|
||||
{} as never,
|
||||
) as { result: () => Promise<unknown> } & AsyncIterable<unknown>;
|
||||
const events: unknown[] = [];
|
||||
for await (const event of response) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
expect(events).toEqual([{ type: "done", reason: "stop", message: finalMessage }]);
|
||||
await expect(response.result()).resolves.toEqual(finalMessage);
|
||||
expect(callCount).toBe(2);
|
||||
const retryMessage = contexts[1]?.messages?.[0];
|
||||
if (!retryMessage || retryMessage.role !== "assistant") {
|
||||
throw new Error("Expected Anthropic recovery retry to start with an assistant message");
|
||||
}
|
||||
expect(retryMessage.content).toEqual([
|
||||
{ type: "text", text: OMITTED_ASSISTANT_REASONING_TEXT },
|
||||
]);
|
||||
});
|
||||
|
||||
it("does not retry non-thinking terminal stream-error events", async () => {
|
||||
let callCount = 0;
|
||||
const errorMessage = createTestStreamErrorMessage("rate limit exceeded");
|
||||
const wrapped = wrapAnthropicStreamWithRecovery(
|
||||
(() => {
|
||||
callCount += 1;
|
||||
const stream = createAssistantMessageEventStream();
|
||||
queueMicrotask(() => {
|
||||
stream.push({ type: "error", reason: "error", error: errorMessage });
|
||||
stream.end();
|
||||
});
|
||||
return stream;
|
||||
}) as Parameters<typeof wrapAnthropicStreamWithRecovery>[0],
|
||||
{ id: "test-session" },
|
||||
);
|
||||
|
||||
const response = wrapped({} as never, { messages: [] } as never, {} as never) as {
|
||||
result: () => Promise<unknown>;
|
||||
} & AsyncIterable<unknown>;
|
||||
const events: unknown[] = [];
|
||||
for await (const event of response) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
expect(events).toEqual([{ type: "error", reason: "error", error: errorMessage }]);
|
||||
await expect(response.result()).resolves.toEqual(errorMessage);
|
||||
expect(callCount).toBe(1);
|
||||
});
|
||||
|
||||
it("does not retry terminal stream-error events after output was yielded", async () => {
|
||||
let callCount = 0;
|
||||
const partialMessage = createTestAssistantMessage({
|
||||
content: [{ type: "text", text: "" }],
|
||||
stopReason: "stop",
|
||||
});
|
||||
const errorMessage = createTestStreamErrorMessage(terminalThinkingSignatureError);
|
||||
const wrapped = wrapAnthropicStreamWithRecovery(
|
||||
(() => {
|
||||
callCount += 1;
|
||||
const stream = createAssistantMessageEventStream();
|
||||
queueMicrotask(() => {
|
||||
stream.push({ type: "start", partial: partialMessage });
|
||||
stream.push({ type: "error", reason: "error", error: errorMessage });
|
||||
stream.end();
|
||||
});
|
||||
return stream;
|
||||
}) as Parameters<typeof wrapAnthropicStreamWithRecovery>[0],
|
||||
{ id: "test-session" },
|
||||
);
|
||||
|
||||
const response = wrapped({} as never, { messages: [] } as never, {} as never) as {
|
||||
result: () => Promise<unknown>;
|
||||
} & AsyncIterable<unknown>;
|
||||
const events: unknown[] = [];
|
||||
for await (const event of response) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
expect(events).toEqual([
|
||||
{ type: "start", partial: partialMessage },
|
||||
{ type: "error", reason: "error", error: errorMessage },
|
||||
]);
|
||||
await expect(response.result()).resolves.toEqual(errorMessage);
|
||||
expect(callCount).toBe(1);
|
||||
});
|
||||
|
||||
it("does not retry when the stream fails after yielding a chunk", async () => {
|
||||
let callCount = 0;
|
||||
const wrapped = wrapAnthropicStreamWithRecovery(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import type { AssistantMessageEvent } from "../../llm/types.js";
|
||||
import { createAssistantMessageEventStream } from "../../llm/utils/event-stream.js";
|
||||
import type { AgentMessage, StreamFn } from "../runtime/index.js";
|
||||
import { log } from "./logger.js";
|
||||
@@ -427,7 +428,13 @@ function shouldRecoverAnthropicThinkingError(
|
||||
error: unknown,
|
||||
sessionMeta: RecoverySessionMeta,
|
||||
): boolean {
|
||||
const message = formatErrorMessage(error);
|
||||
return shouldRecoverAnthropicThinkingErrorMessage(formatErrorMessage(error), sessionMeta);
|
||||
}
|
||||
|
||||
function shouldRecoverAnthropicThinkingErrorMessage(
|
||||
message: string,
|
||||
sessionMeta: RecoverySessionMeta,
|
||||
): boolean {
|
||||
if (!THINKING_BLOCK_ERROR_PATTERN.test(message)) {
|
||||
return false;
|
||||
}
|
||||
@@ -440,17 +447,66 @@ function shouldRecoverAnthropicThinkingError(
|
||||
return true;
|
||||
}
|
||||
|
||||
function isAssistantMessageErrorEvent(
|
||||
event: unknown,
|
||||
): event is Extract<AssistantMessageEvent, { type: "error" }> {
|
||||
return (
|
||||
Boolean(event) && typeof event === "object" && (event as { type?: unknown }).type === "error"
|
||||
);
|
||||
}
|
||||
|
||||
function getAssistantMessageErrorText(
|
||||
event: Extract<AssistantMessageEvent, { type: "error" }>,
|
||||
): string {
|
||||
const errorMessage = (event.error as { errorMessage?: unknown }).errorMessage;
|
||||
return typeof errorMessage === "string" ? errorMessage : "";
|
||||
}
|
||||
|
||||
async function retryStreamWithoutThinking(
|
||||
outer: ReturnType<typeof createAssistantMessageEventStream>,
|
||||
retry: () => ReturnType<StreamFn>,
|
||||
): Promise<AssistantMessage> {
|
||||
const retryStream = retry();
|
||||
const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream;
|
||||
for await (const chunk of resolvedRetry as AsyncIterable<unknown>) {
|
||||
outer.push(chunk as Parameters<typeof outer.push>[0]);
|
||||
}
|
||||
const result = await (resolvedRetry as { result?: () => Promise<AssistantMessage> }).result?.();
|
||||
return result as AssistantMessage;
|
||||
}
|
||||
|
||||
async function pumpStreamWithRecovery(
|
||||
outer: ReturnType<typeof createAssistantMessageEventStream>,
|
||||
stream: ReturnType<StreamFn>,
|
||||
sessionMeta: RecoverySessionMeta,
|
||||
retry: () => ReturnType<StreamFn>,
|
||||
): Promise<AssistantMessage> {
|
||||
let yieldedChunk = false;
|
||||
let yieldedOutput = false;
|
||||
try {
|
||||
const resolved = stream instanceof Promise ? await stream : stream;
|
||||
for await (const chunk of resolved as AsyncIterable<unknown>) {
|
||||
yieldedChunk = true;
|
||||
if (isAssistantMessageErrorEvent(chunk)) {
|
||||
if (
|
||||
shouldRecoverAnthropicThinkingErrorMessage(
|
||||
getAssistantMessageErrorText(chunk),
|
||||
sessionMeta,
|
||||
)
|
||||
) {
|
||||
if (yieldedOutput) {
|
||||
log.warn(
|
||||
`[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`,
|
||||
);
|
||||
} else {
|
||||
sessionMeta.recoveredAnthropicThinking = true;
|
||||
log.warn(
|
||||
`[session-recovery] Anthropic thinking stream error; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
|
||||
);
|
||||
return retryStreamWithoutThinking(outer, retry);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
yieldedOutput = true;
|
||||
}
|
||||
outer.push(chunk as Parameters<typeof outer.push>[0]);
|
||||
}
|
||||
const result = await (resolved as { result?: () => Promise<AssistantMessage> }).result?.();
|
||||
@@ -459,7 +515,7 @@ async function pumpStreamWithRecovery(
|
||||
if (!shouldRecoverAnthropicThinkingError(error, sessionMeta)) {
|
||||
throw error;
|
||||
}
|
||||
if (yieldedChunk) {
|
||||
if (yieldedOutput) {
|
||||
log.warn(
|
||||
`[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`,
|
||||
);
|
||||
@@ -469,13 +525,7 @@ async function pumpStreamWithRecovery(
|
||||
log.warn(
|
||||
`[session-recovery] Anthropic thinking error during stream; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
|
||||
);
|
||||
const retryStream = retry();
|
||||
const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream;
|
||||
for await (const chunk of resolvedRetry as AsyncIterable<unknown>) {
|
||||
outer.push(chunk as Parameters<typeof outer.push>[0]);
|
||||
}
|
||||
const result = await (resolvedRetry as { result?: () => Promise<AssistantMessage> }).result?.();
|
||||
return result as AssistantMessage;
|
||||
return retryStreamWithoutThinking(outer, retry);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user