mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-31 10:28:34 +00:00
fix(agents): handle seeded Anthropic signatures
This commit is contained in:
@@ -697,7 +697,7 @@ describe("anthropic transport stream", () => {
|
||||
const thinkingContent = requireRecord(result.content[0], "thinking content");
|
||||
expect(thinkingContent.type).toBe("thinking");
|
||||
expect(thinkingContent.thinking).toBe("checking");
|
||||
expect(thinkingContent.thinkingSignature).toBe("sig_1sig_2");
|
||||
expect(thinkingContent.thinkingSignature).toBe("sig_2");
|
||||
expect(result.content[1]).toEqual({ type: "text", text: "NO_REPLY" });
|
||||
expect(events.some((event) => event.type === "text_delta" && event.delta === "NO_REPLY")).toBe(
|
||||
true,
|
||||
@@ -727,6 +727,11 @@ describe("anthropic transport stream", () => {
|
||||
index: 0,
|
||||
delta: { type: "signature_delta", signature: "sig_2" },
|
||||
},
|
||||
{
|
||||
type: "content_block_delta",
|
||||
index: 0,
|
||||
delta: { type: "signature_delta", signature: "sig_3" },
|
||||
},
|
||||
{
|
||||
type: "content_block_stop",
|
||||
index: 0,
|
||||
@@ -752,7 +757,48 @@ describe("anthropic transport stream", () => {
|
||||
expect(result.content[0]).toMatchObject({
|
||||
type: "thinking",
|
||||
thinking: signedThinking,
|
||||
thinkingSignature: "sig_1sig_2",
|
||||
thinkingSignature: "sig_2sig_3",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves provider-seeded thinking signatures when no signature_delta follows", async () => {
|
||||
guardedFetchMock.mockResolvedValueOnce(
|
||||
createSseResponse([
|
||||
{
|
||||
type: "message_start",
|
||||
message: { id: "msg_1", usage: { input_tokens: 6, output_tokens: 0 } },
|
||||
},
|
||||
{
|
||||
type: "content_block_start",
|
||||
index: 0,
|
||||
content_block: { type: "thinking", thinking: "seeded", signature: "seed_signature" },
|
||||
},
|
||||
{
|
||||
type: "content_block_stop",
|
||||
index: 0,
|
||||
},
|
||||
{
|
||||
type: "message_delta",
|
||||
delta: { stop_reason: "end_turn" },
|
||||
usage: { input_tokens: 6, output_tokens: 5 },
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
const result = await runTransportStream(
|
||||
makeAnthropicTransportModel(),
|
||||
{
|
||||
messages: [{ role: "user", content: "think" }],
|
||||
} as AnthropicStreamContext,
|
||||
{
|
||||
apiKey: "sk-ant-api",
|
||||
} as AnthropicStreamOptions,
|
||||
);
|
||||
|
||||
expect(result.content[0]).toMatchObject({
|
||||
type: "thinking",
|
||||
thinking: "seeded",
|
||||
thinkingSignature: "seed_signature",
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -959,6 +959,7 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn {
|
||||
);
|
||||
stream.push({ type: "start", partial: output as never });
|
||||
const blocks = output.content;
|
||||
const signatureDeltaIndexes = new Set<number>();
|
||||
const allowReasoningContentReplay = supportsReasoningContentReplay(model);
|
||||
const reasoningContentThinkingBlocks = new Map<number, number>();
|
||||
const reasoningContentTextBlocks = new Map<number, number>();
|
||||
@@ -1291,6 +1292,11 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn {
|
||||
delta?.type === "signature_delta" &&
|
||||
typeof delta.signature === "string"
|
||||
) {
|
||||
const signatureIndex = eventIndexKey(event.index);
|
||||
if (!signatureDeltaIndexes.has(signatureIndex)) {
|
||||
signatureDeltaIndexes.add(signatureIndex);
|
||||
block.thinkingSignature = "";
|
||||
}
|
||||
block.thinkingSignature = (block.thinkingSignature || "") + delta.signature;
|
||||
}
|
||||
continue;
|
||||
|
||||
@@ -21,8 +21,10 @@
|
||||
* or: OPENAI_API_KEY=sk-xxx ANTHROPIC_BASE_URL=http://localhost:xxx/v1 npx tsx ...
|
||||
*/
|
||||
|
||||
const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY || process.env.OPENAI_API_KEY;
|
||||
const BASE_URL = process.env.ANTHROPIC_BASE_URL || "https://api.anthropic.com";
|
||||
const ANTHROPIC_API_KEY =
|
||||
process.env.ANTHROPIC_API_KEY ||
|
||||
(process.env.ANTHROPIC_BASE_URL ? process.env.OPENAI_API_KEY : undefined);
|
||||
|
||||
// ============================================================================
|
||||
// Section 1: Patched Code Path Proof
|
||||
@@ -57,6 +59,7 @@ async function runPatchedCodePathProof(): Promise<{ passed: number; failed: numb
|
||||
"rX4uVnB9oI6mKpL",
|
||||
];
|
||||
const expectedSignature = signatureChunks.join("");
|
||||
const seededStartSignature = "stale_seed_should_not_prefix_";
|
||||
const thinkingText = "Let me analyze this step by step. The key insight is...";
|
||||
|
||||
const sseEvents: Record<string, unknown>[] = [
|
||||
@@ -67,7 +70,7 @@ async function runPatchedCodePathProof(): Promise<{ passed: number; failed: numb
|
||||
{
|
||||
type: "content_block_start",
|
||||
index: 0,
|
||||
content_block: { type: "thinking", thinking: "", signature: "" },
|
||||
content_block: { type: "thinking", thinking: "", signature: seededStartSignature },
|
||||
},
|
||||
{
|
||||
type: "content_block_delta",
|
||||
@@ -100,7 +103,10 @@ async function runPatchedCodePathProof(): Promise<{ passed: number; failed: numb
|
||||
// Mock globalThis.fetch — the transport stream's guarded fetch recognizes a mock
|
||||
// (via .mock property) and skips SSRF DNS pinning, calling it directly.
|
||||
const originalFetch = globalThis.fetch;
|
||||
const mockFetch = async (_url: string | URL | Request, _init?: RequestInit): Promise<Response> => {
|
||||
const mockFetch = async (
|
||||
_url: string | URL | Request,
|
||||
_init?: RequestInit,
|
||||
): Promise<Response> => {
|
||||
return new Response(sseBody, {
|
||||
status: 200,
|
||||
headers: { "content-type": "text/event-stream" },
|
||||
@@ -110,9 +116,8 @@ async function runPatchedCodePathProof(): Promise<{ passed: number; failed: numb
|
||||
globalThis.fetch = mockFetch as typeof fetch;
|
||||
|
||||
try {
|
||||
const { createAnthropicMessagesTransportStreamFn } = await import(
|
||||
"../../src/agents/anthropic-transport-stream.js"
|
||||
);
|
||||
const { createAnthropicMessagesTransportStreamFn } =
|
||||
await import("../../src/agents/anthropic-transport-stream.js");
|
||||
|
||||
const streamFn = createAnthropicMessagesTransportStreamFn();
|
||||
|
||||
@@ -140,11 +145,18 @@ async function runPatchedCodePathProof(): Promise<{ passed: number; failed: numb
|
||||
};
|
||||
|
||||
console.log(" Calling createAnthropicMessagesTransportStreamFn()...");
|
||||
console.log(` SSE payload: ${signatureChunks.length} signature_delta events\n`);
|
||||
console.log(
|
||||
` SSE payload: seeded start signature + ${signatureChunks.length} signature_delta events\n`,
|
||||
);
|
||||
|
||||
const eventStream = streamFn(model as never, context as never, options as never);
|
||||
const result = await (eventStream as { result(): Promise<unknown> }).result() as {
|
||||
content: Array<{ type: string; thinking?: string; thinkingSignature?: string; text?: string }>;
|
||||
const result = (await (eventStream as { result(): Promise<unknown> }).result()) as {
|
||||
content: Array<{
|
||||
type: string;
|
||||
thinking?: string;
|
||||
thinkingSignature?: string;
|
||||
text?: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
// Find the thinking block in the output
|
||||
@@ -162,7 +174,12 @@ async function runPatchedCodePathProof(): Promise<{ passed: number; failed: numb
|
||||
|
||||
assert(
|
||||
thinkingBlock.thinkingSignature === expectedSignature,
|
||||
`Signature equals concatenation of all ${signatureChunks.length} chunks (${expectedSignature.length} chars)`,
|
||||
`Signature equals concatenation of all ${signatureChunks.length} delta chunks (${expectedSignature.length} chars)`,
|
||||
);
|
||||
|
||||
assert(
|
||||
!thinkingBlock.thinkingSignature!.startsWith(seededStartSignature),
|
||||
"Signature delta accumulation replaces the seeded start signature instead of prefixing it",
|
||||
);
|
||||
|
||||
// The OLD bug would only keep the LAST chunk
|
||||
@@ -190,7 +207,10 @@ async function runPatchedCodePathProof(): Promise<{ passed: number; failed: numb
|
||||
}
|
||||
offset += chunk.length;
|
||||
}
|
||||
assert(allChunksInOrder, "All signature chunks appear in order (verifies append, not prepend/shuffle)");
|
||||
assert(
|
||||
allChunksInOrder,
|
||||
"All signature chunks appear in order (verifies append, not prepend/shuffle)",
|
||||
);
|
||||
}
|
||||
|
||||
if (textBlock) {
|
||||
@@ -219,7 +239,8 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
}
|
||||
|
||||
const MODEL = process.env.ANTHROPIC_MODEL || "claude-sonnet-4-6";
|
||||
const INITIAL_PROMPT = "Analyze the philosophical implications of Gödel's incompleteness theorems on the foundations of mathematics. Consider: 1) How do they relate to Hilbert's program? 2) What are the epistemological consequences? 3) How do they connect to Turing's halting problem? Think deeply and thoroughly.";
|
||||
const INITIAL_PROMPT =
|
||||
"Analyze the philosophical implications of Gödel's incompleteness theorems on the foundations of mathematics. Consider: 1) How do they relate to Hilbert's program? 2) What are the epistemological consequences? 3) How do they connect to Turing's halting problem? Think deeply and thoroughly.";
|
||||
console.log(`API Base: ${BASE_URL}`);
|
||||
console.log(`Model: ${MODEL}\n`);
|
||||
|
||||
@@ -290,10 +311,7 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
const event = JSON.parse(data);
|
||||
allEvents.push(event.type);
|
||||
|
||||
if (
|
||||
event.type === "content_block_delta" &&
|
||||
event.delta?.type === "signature_delta"
|
||||
) {
|
||||
if (event.type === "content_block_delta" && event.delta?.type === "signature_delta") {
|
||||
signatureDeltas.push(event.delta.signature);
|
||||
// Simulate OLD behavior (overwrite)
|
||||
simulatedOverwrite = event.delta.signature;
|
||||
@@ -301,10 +319,7 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
simulatedAppend += event.delta.signature;
|
||||
}
|
||||
|
||||
if (
|
||||
event.type === "content_block_delta" &&
|
||||
event.delta?.type === "thinking_delta"
|
||||
) {
|
||||
if (event.type === "content_block_delta" && event.delta?.type === "thinking_delta") {
|
||||
thinkingText += event.delta.thinking;
|
||||
}
|
||||
|
||||
@@ -332,7 +347,9 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
console.log(`\n--- Signature Delta Analysis ---`);
|
||||
console.log(` Total signature_delta events: ${signatureDeltas.length}`);
|
||||
for (let i = 0; i < signatureDeltas.length; i++) {
|
||||
console.log(` Chunk ${i + 1}: ${signatureDeltas[i].length} chars "${signatureDeltas[i].slice(0, 30)}..."`);
|
||||
console.log(
|
||||
` Chunk ${i + 1}: ${signatureDeltas[i].length} chars "${signatureDeltas[i].slice(0, 30)}..."`,
|
||||
);
|
||||
}
|
||||
|
||||
console.log(`\n--- Signature Comparison ---`);
|
||||
@@ -358,27 +375,33 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
|
||||
assert(signatureDeltas.length > 0, "At least one signature_delta event received");
|
||||
if (signatureDeltas.length >= 2) {
|
||||
assert(true, `Multiple signature_delta chunks received (got ${signatureDeltas.length} — confirms chunked delivery)`);
|
||||
assert(
|
||||
true,
|
||||
`Multiple signature_delta chunks received (got ${signatureDeltas.length} — confirms chunked delivery)`,
|
||||
);
|
||||
} else {
|
||||
console.log(` ℹ️ INFO: Only ${signatureDeltas.length} signature_delta chunk received (proxy/Bedrock may coalesce chunks)`);
|
||||
console.log(` Multi-chunk concatenation verified by unit tests; real proof focuses on replay integrity.`);
|
||||
console.log(
|
||||
` ℹ️ INFO: Only ${signatureDeltas.length} signature_delta chunk received (proxy/Bedrock may coalesce chunks)`,
|
||||
);
|
||||
console.log(
|
||||
` Multi-chunk concatenation verified by unit tests; real proof focuses on replay integrity.`,
|
||||
);
|
||||
}
|
||||
assert(
|
||||
simulatedAppend.length > 0,
|
||||
`Signature captured successfully (${simulatedAppend.length} chars)`
|
||||
);
|
||||
assert(
|
||||
thinkingText.length > 0,
|
||||
"Thinking text was captured from thinking_delta events"
|
||||
`Signature captured successfully (${simulatedAppend.length} chars)`,
|
||||
);
|
||||
assert(thinkingText.length > 0, "Thinking text was captured from thinking_delta events");
|
||||
|
||||
if (signatureDeltas.length >= 2) {
|
||||
assert(
|
||||
simulatedOverwrite !== simulatedAppend,
|
||||
"OLD behavior (overwrite) produces DIFFERENT result than NEW behavior (append) — confirms the bug"
|
||||
"OLD behavior (overwrite) produces DIFFERENT result than NEW behavior (append) — confirms the bug",
|
||||
);
|
||||
const truncationRatio = simulatedOverwrite.length / simulatedAppend.length;
|
||||
console.log(`\n 📊 Truncation ratio: ${(truncationRatio * 100).toFixed(1)}% — old behavior kept only ${(truncationRatio * 100).toFixed(1)}% of the full signature`);
|
||||
console.log(
|
||||
`\n 📊 Truncation ratio: ${(truncationRatio * 100).toFixed(1)}% — old behavior kept only ${(truncationRatio * 100).toFixed(1)}% of the full signature`,
|
||||
);
|
||||
} else {
|
||||
console.log(`\n ℹ️ Single-chunk delivery — simulating truncation for negative proof`);
|
||||
}
|
||||
@@ -430,7 +453,7 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
|
||||
assert(
|
||||
replayResponse.ok,
|
||||
`Replay with CORRECT (concatenated) signature succeeds: HTTP ${replayResponse.status}`
|
||||
`Replay with CORRECT (concatenated) signature succeeds: HTTP ${replayResponse.status}`,
|
||||
);
|
||||
if (!replayResponse.ok) {
|
||||
const errBody = await replayResponse.text();
|
||||
@@ -441,8 +464,12 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
}
|
||||
|
||||
// Step 6: Negative proof — replay with truncated signature should fail
|
||||
{ // Always run negative proof (artificially truncate if needed)
|
||||
const truncatedSig = signatureDeltas.length >= 2 ? simulatedOverwrite : simulatedAppend.slice(0, Math.floor(simulatedAppend.length / 3));
|
||||
{
|
||||
// Always run negative proof (artificially truncate if needed)
|
||||
const truncatedSig =
|
||||
signatureDeltas.length >= 2
|
||||
? simulatedOverwrite
|
||||
: simulatedAppend.slice(0, Math.floor(simulatedAppend.length / 3));
|
||||
console.log(" Sending replay with TRUNCATED (old bug) signature...");
|
||||
const badReplayResponse = await fetch(`${BASE_URL}/v1/messages`, {
|
||||
method: "POST",
|
||||
@@ -487,7 +514,7 @@ async function runProof(): Promise<{ passed: number; failed: number }> {
|
||||
|
||||
assert(
|
||||
!badReplayResponse.ok,
|
||||
`Replay with TRUNCATED (old bug) signature fails: HTTP ${badReplayResponse.status} — confirms the bug causes API rejection`
|
||||
`Replay with TRUNCATED (old bug) signature fails: HTTP ${badReplayResponse.status} — confirms the bug causes API rejection`,
|
||||
);
|
||||
if (!badReplayResponse.ok) {
|
||||
const errBody = await badReplayResponse.text();
|
||||
@@ -516,7 +543,9 @@ async function main(): Promise<void> {
|
||||
const totalFailed = patchedResult.failed + liveResult.failed;
|
||||
|
||||
console.log(`\n=== OVERALL PROOF SUMMARY ===`);
|
||||
console.log(` Patched code path: ${patchedResult.passed} passed, ${patchedResult.failed} failed`);
|
||||
console.log(
|
||||
` Patched code path: ${patchedResult.passed} passed, ${patchedResult.failed} failed`,
|
||||
);
|
||||
console.log(` Live API replay: ${liveResult.passed} passed, ${liveResult.failed} failed`);
|
||||
console.log(` Total: ${totalPassed} passed, ${totalFailed} failed`);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user