fix(agents): concatenate signature_delta chunks in transport stream

The anthropic-transport-stream was overwriting thinkingSignature on each
signature_delta event instead of appending. Since Anthropic sends the
thinking block signature across multiple streaming chunks, only the last
chunk survived. The truncated signature was persisted to session JSONL,
causing all subsequent replay attempts to fail with HTTP 400:

  thinking or redacted_thinking blocks in the latest assistant message
  cannot be modified

This permanently bricked sessions with no user recovery path.

Fix: accumulate signature_delta values by concatenating instead of
overwriting, matching the correct implementation in the LLM provider
layer (src/llm/providers/anthropic.ts:629-634).

Includes real-scenario proof against live Anthropic API validating that
correct signatures replay successfully while truncated signatures are
rejected.

Fixes #87574
Refs #80625, #85781, #87475
This commit is contained in:
Jerry Xin
2026-05-28 16:30:31 +08:00
committed by Peter Steinberger
parent e5adde9fe3
commit 8dc9cfe734
3 changed files with 594 additions and 3 deletions

View File

@@ -697,7 +697,7 @@ describe("anthropic transport stream", () => {
const thinkingContent = requireRecord(result.content[0], "thinking content");
expect(thinkingContent.type).toBe("thinking");
expect(thinkingContent.thinking).toBe("checking");
expect(thinkingContent.thinkingSignature).toBe("sig_2");
expect(thinkingContent.thinkingSignature).toBe("sig_1sig_2");
expect(result.content[1]).toEqual({ type: "text", text: "NO_REPLY" });
expect(events.some((event) => event.type === "text_delta" && event.delta === "NO_REPLY")).toBe(
true,
@@ -752,7 +752,63 @@ describe("anthropic transport stream", () => {
expect(result.content[0]).toMatchObject({
type: "thinking",
thinking: signedThinking,
thinkingSignature: "sig_2",
thinkingSignature: "sig_1sig_2",
});
});
it("concatenates multiple signature_delta events instead of overwriting", async () => {
guardedFetchMock.mockResolvedValueOnce(
createSseResponse([
{
type: "message_start",
message: { id: "msg_1", usage: { input_tokens: 6, output_tokens: 0 } },
},
{
type: "content_block_start",
index: 0,
content_block: { type: "thinking", thinking: "step by step", signature: "" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "signature_delta", signature: "chunk1" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "signature_delta", signature: "chunk2" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "signature_delta", signature: "chunk3" },
},
{
type: "content_block_stop",
index: 0,
},
{
type: "message_delta",
delta: { stop_reason: "end_turn" },
usage: { input_tokens: 6, output_tokens: 5 },
},
]),
);
const result = await runTransportStream(
makeAnthropicTransportModel(),
{
messages: [{ role: "user", content: "think" }],
} as AnthropicStreamContext,
{
apiKey: "sk-ant-api",
} as AnthropicStreamOptions,
);
expect(result.content[0]).toMatchObject({
type: "thinking",
thinking: "step by step",
thinkingSignature: "chunk1chunk2chunk3",
});
});

View File

@@ -1291,7 +1291,7 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn {
delta?.type === "signature_delta" &&
typeof delta.signature === "string"
) {
block.thinkingSignature = delta.signature;
block.thinkingSignature = (block.thinkingSignature || "") + delta.signature;
}
continue;
}

View File

@@ -0,0 +1,535 @@
/**
* Real Proof: Thinking Block Signature Integrity
*
* This script validates that the fixed anthropic-transport-stream correctly
* concatenates signature_delta chunks.
*
* Section 1 — Patched Code Path Proof:
* Exercises createAnthropicMessagesTransportStreamFn directly with a mocked
* fetch returning crafted SSE events with multiple signature_delta chunks.
* Verifies the transport stream concatenates (not overwrites) them.
* Does NOT require a real API key.
*
* Section 2 — Live API Replay Proof:
* Sends a real request to the Anthropic Messages API, collects raw SSE events,
* and verifies:
* 1. Multiple signature_delta events are received (confirming chunked delivery)
* 2. The concatenated signature matches the final message's signature
* 3. The signature is valid for replay (not truncated)
*
* Usage: ANTHROPIC_API_KEY=sk-ant-xxx npx tsx test/proof/thinking-signature-real-proof.ts
* or: OPENAI_API_KEY=sk-xxx ANTHROPIC_BASE_URL=http://localhost:xxx/v1 npx tsx ...
*/
const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY || process.env.OPENAI_API_KEY;
const BASE_URL = process.env.ANTHROPIC_BASE_URL || "https://api.anthropic.com";
// ============================================================================
// Section 1: Patched Code Path Proof
// Exercises createAnthropicMessagesTransportStreamFn directly.
// No real API key required — uses mocked fetch with crafted SSE response.
// ============================================================================
async function runPatchedCodePathProof(): Promise<{ passed: number; failed: number }> {
console.log("=== Patched Code Path Proof: createAnthropicMessagesTransportStreamFn ===\n");
console.log(" This section exercises the ACTUAL patched transport stream function");
console.log(" with a mocked fetch serving crafted SSE events.\n");
let passed = 0;
let failed = 0;
function assert(condition: boolean, description: string): void {
if (condition) {
console.log(` ✅ PASS: ${description}`);
passed++;
} else {
console.log(` ❌ FAIL: ${description}`);
failed++;
}
}
// Craft SSE events with 5 signature_delta chunks to prove concatenation
const signatureChunks = [
"EqMBCkYKMHd2YWl",
"Mn9kSEpweFGblQt",
"R2Bf8mKL3xNpvZw",
"Hy7TcDfG2jAqWsE",
"rX4uVnB9oI6mKpL",
];
const expectedSignature = signatureChunks.join("");
const thinkingText = "Let me analyze this step by step. The key insight is...";
const sseEvents: Record<string, unknown>[] = [
{
type: "message_start",
message: { id: "msg_proof_patched_1", usage: { input_tokens: 10, output_tokens: 150 } },
},
{
type: "content_block_start",
index: 0,
content_block: { type: "thinking", thinking: "", signature: "" },
},
{
type: "content_block_delta",
index: 0,
delta: { type: "thinking_delta", thinking: thinkingText },
},
// 5 separate signature_delta events — the fix concatenates them
...signatureChunks.map((chunk) => ({
type: "content_block_delta",
index: 0,
delta: { type: "signature_delta", signature: chunk },
})),
{ type: "content_block_stop", index: 0 },
{
type: "content_block_start",
index: 1,
content_block: { type: "text", text: "" },
},
{
type: "content_block_delta",
index: 1,
delta: { type: "text_delta", text: "The answer is 42." },
},
{ type: "content_block_stop", index: 1 },
{ type: "message_delta", delta: { stop_reason: "end_turn" }, usage: { output_tokens: 150 } },
];
const sseBody = sseEvents.map((e) => `data: ${JSON.stringify(e)}\n\n`).join("");
// Mock globalThis.fetch — the transport stream's guarded fetch recognizes a mock
// (via .mock property) and skips SSRF DNS pinning, calling it directly.
const originalFetch = globalThis.fetch;
const mockFetch = async (_url: string | URL | Request, _init?: RequestInit): Promise<Response> => {
return new Response(sseBody, {
status: 200,
headers: { "content-type": "text/event-stream" },
});
};
(mockFetch as unknown as { mock: object }).mock = {};
globalThis.fetch = mockFetch as typeof fetch;
try {
const { createAnthropicMessagesTransportStreamFn } = await import(
"../../src/agents/anthropic-transport-stream.js"
);
const streamFn = createAnthropicMessagesTransportStreamFn();
// Minimal model satisfying AnthropicTransportModel shape
const model = {
id: "claude-sonnet-4-6",
name: "Claude Sonnet 4.6",
api: "anthropic-messages" as const,
provider: "anthropic",
baseUrl: "",
reasoning: false,
input: ["text"] as ("text" | "image")[],
cost: { input: 3, output: 15, cacheRead: 0.3, cacheWrite: 3.75 },
contextWindow: 200000,
maxTokens: 8192,
};
const context = {
messages: [{ role: "user" as const, content: "Explain Gödel's theorems" }],
};
const options = {
apiKey: "sk-ant-proof-test-key-not-real",
maxTokens: 8192,
};
console.log(" Calling createAnthropicMessagesTransportStreamFn()...");
console.log(` SSE payload: ${signatureChunks.length} signature_delta events\n`);
const eventStream = streamFn(model as never, context as never, options as never);
const result = await (eventStream as { result(): Promise<unknown> }).result() as {
content: Array<{ type: string; thinking?: string; thinkingSignature?: string; text?: string }>;
};
// Find the thinking block in the output
const thinkingBlock = result.content.find((b) => b.type === "thinking");
const textBlock = result.content.find((b) => b.type === "text");
assert(thinkingBlock !== undefined, "Transport stream produced a thinking block");
assert(textBlock !== undefined, "Transport stream produced a text block");
if (thinkingBlock) {
assert(
thinkingBlock.thinking === thinkingText,
`Thinking text captured correctly (${thinkingBlock.thinking!.length} chars)`,
);
assert(
thinkingBlock.thinkingSignature === expectedSignature,
`Signature equals concatenation of all ${signatureChunks.length} chunks (${expectedSignature.length} chars)`,
);
// The OLD bug would only keep the LAST chunk
const lastChunkOnly = signatureChunks[signatureChunks.length - 1];
assert(
thinkingBlock.thinkingSignature !== lastChunkOnly,
`Signature is NOT just the last chunk (old overwrite bug would produce "${lastChunkOnly}")`,
);
// Verify full length proves all chunks concatenated
const expectedLength = signatureChunks.reduce((sum, c) => sum + c.length, 0);
assert(
thinkingBlock.thinkingSignature!.length === expectedLength,
`Signature length ${thinkingBlock.thinkingSignature!.length} === sum of chunk lengths ${expectedLength}`,
);
// Verify each chunk appears in order within the signature
let offset = 0;
let allChunksInOrder = true;
for (const chunk of signatureChunks) {
const idx = thinkingBlock.thinkingSignature!.indexOf(chunk, offset);
if (idx !== offset) {
allChunksInOrder = false;
break;
}
offset += chunk.length;
}
assert(allChunksInOrder, "All signature chunks appear in order (verifies append, not prepend/shuffle)");
}
if (textBlock) {
assert(textBlock.text === "The answer is 42.", "Text block content preserved correctly");
}
} finally {
globalThis.fetch = originalFetch;
}
console.log(`\n--- Patched Code Path: ${passed} passed, ${failed} failed ---\n`);
return { passed, failed };
}
// ============================================================================
// Section 2: Live API Replay Proof
// Validates signature integrity against a real Anthropic API endpoint.
// Requires ANTHROPIC_API_KEY or OPENAI_API_KEY environment variable.
// ============================================================================
async function runProof(): Promise<{ passed: number; failed: number }> {
console.log("=== Live API Replay Proof: Thinking Block Signature Integrity ===\n");
if (!ANTHROPIC_API_KEY) {
console.log(" ⚠️ SKIPPED: Set ANTHROPIC_API_KEY or OPENAI_API_KEY to run live API proof\n");
return { passed: 0, failed: 0 };
}
const MODEL = process.env.ANTHROPIC_MODEL || "claude-sonnet-4-6";
const INITIAL_PROMPT = "Analyze the philosophical implications of Gödel's incompleteness theorems on the foundations of mathematics. Consider: 1) How do they relate to Hilbert's program? 2) What are the epistemological consequences? 3) How do they connect to Turing's halting problem? Think deeply and thoroughly.";
console.log(`API Base: ${BASE_URL}`);
console.log(`Model: ${MODEL}\n`);
// Step 1: Send a request with thinking enabled
console.log("Step 1: Sending request with extended thinking enabled...");
const response = await fetch(`${BASE_URL}/v1/messages`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": ANTHROPIC_API_KEY!,
"anthropic-version": "2023-06-01",
},
body: JSON.stringify({
model: MODEL,
max_tokens: 16000,
thinking: {
type: "enabled",
budget_tokens: 10000,
},
stream: true,
messages: [
{
role: "user",
content: INITIAL_PROMPT,
},
],
}),
});
if (!response.ok) {
const errorText = await response.text();
console.error(` API Error ${response.status}: ${errorText.slice(0, 200)}`);
console.log(" ❌ FAIL: Initial API request failed — cannot proceed with live proof");
console.log("\n--- Live API Replay: 0 passed, 1 failed ---");
return { passed: 0, failed: 1 };
}
// Step 2: Parse SSE stream, collect signature_delta events
console.log("Step 2: Parsing SSE stream...\n");
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
const signatureDeltas: string[] = [];
let thinkingText = "";
let allEvents: string[] = [];
// Simulate transport stream behavior (BEFORE fix = overwrite, AFTER fix = append)
let simulatedOverwrite = "";
let simulatedAppend = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") continue;
try {
const event = JSON.parse(data);
allEvents.push(event.type);
if (
event.type === "content_block_delta" &&
event.delta?.type === "signature_delta"
) {
signatureDeltas.push(event.delta.signature);
// Simulate OLD behavior (overwrite)
simulatedOverwrite = event.delta.signature;
// Simulate NEW behavior (append)
simulatedAppend += event.delta.signature;
}
if (
event.type === "content_block_delta" &&
event.delta?.type === "thinking_delta"
) {
thinkingText += event.delta.thinking;
}
// Capture the final message to get the complete thinking block
if (event.type === "message_stop") {
// We already have everything from deltas
}
} catch {
// Skip non-JSON lines
}
}
}
// Step 3: Validate results
console.log("Step 3: Validation Results\n");
console.log("--- Event Summary ---");
const eventCounts: Record<string, number> = {};
for (const e of allEvents) {
eventCounts[e] = (eventCounts[e] || 0) + 1;
}
for (const [type, count] of Object.entries(eventCounts)) {
console.log(` ${type}: ${count}`);
}
console.log(`\n--- Signature Delta Analysis ---`);
console.log(` Total signature_delta events: ${signatureDeltas.length}`);
for (let i = 0; i < signatureDeltas.length; i++) {
console.log(` Chunk ${i + 1}: ${signatureDeltas[i].length} chars "${signatureDeltas[i].slice(0, 30)}..."`);
}
console.log(`\n--- Signature Comparison ---`);
console.log(` Full signature (concatenated): ${simulatedAppend.length} chars`);
console.log(` Overwrite-only (OLD BUG): ${simulatedOverwrite.length} chars`);
console.log(` Thinking text length: ${thinkingText.length} chars`);
// Step 4: Assertions
console.log(`\n--- Assertions ---`);
let passed = 0;
let failed = 0;
function assert(condition: boolean, description: string): void {
if (condition) {
console.log(` ✅ PASS: ${description}`);
passed++;
} else {
console.log(` ❌ FAIL: ${description}`);
failed++;
}
}
assert(signatureDeltas.length > 0, "At least one signature_delta event received");
if (signatureDeltas.length >= 2) {
assert(true, `Multiple signature_delta chunks received (got ${signatureDeltas.length} — confirms chunked delivery)`);
} else {
console.log(` INFO: Only ${signatureDeltas.length} signature_delta chunk received (proxy/Bedrock may coalesce chunks)`);
console.log(` Multi-chunk concatenation verified by unit tests; real proof focuses on replay integrity.`);
}
assert(
simulatedAppend.length > 0,
`Signature captured successfully (${simulatedAppend.length} chars)`
);
assert(
thinkingText.length > 0,
"Thinking text was captured from thinking_delta events"
);
if (signatureDeltas.length >= 2) {
assert(
simulatedOverwrite !== simulatedAppend,
"OLD behavior (overwrite) produces DIFFERENT result than NEW behavior (append) — confirms the bug"
);
const truncationRatio = simulatedOverwrite.length / simulatedAppend.length;
console.log(`\n 📊 Truncation ratio: ${(truncationRatio * 100).toFixed(1)}% — old behavior kept only ${(truncationRatio * 100).toFixed(1)}% of the full signature`);
} else {
console.log(`\n Single-chunk delivery — simulating truncation for negative proof`);
}
// Step 5: Replay validation — verify the concatenated signature can be sent back
console.log(`\n--- Replay Validation ---`);
console.log(" Sending replay request with the captured signature...");
const replayResponse = await fetch(`${BASE_URL}/v1/messages`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": ANTHROPIC_API_KEY!,
"anthropic-version": "2023-06-01",
},
body: JSON.stringify({
model: MODEL,
max_tokens: 2048,
thinking: {
type: "enabled",
budget_tokens: 1024,
},
messages: [
{
role: "user",
content: INITIAL_PROMPT,
},
{
role: "assistant",
content: [
{
type: "thinking",
thinking: thinkingText,
signature: simulatedAppend,
},
{
type: "text",
text: "2 + 2 = 4",
},
],
},
{
role: "user",
content: "And 3+3?",
},
],
}),
});
assert(
replayResponse.ok,
`Replay with CORRECT (concatenated) signature succeeds: HTTP ${replayResponse.status}`
);
if (!replayResponse.ok) {
const errBody = await replayResponse.text();
console.log(` Replay error: ${errBody.slice(0, 200)}`);
} else {
// Consume the response
await replayResponse.text();
}
// Step 6: Negative proof — replay with truncated signature should fail
{ // Always run negative proof (artificially truncate if needed)
const truncatedSig = signatureDeltas.length >= 2 ? simulatedOverwrite : simulatedAppend.slice(0, Math.floor(simulatedAppend.length / 3));
console.log(" Sending replay with TRUNCATED (old bug) signature...");
const badReplayResponse = await fetch(`${BASE_URL}/v1/messages`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": ANTHROPIC_API_KEY!,
"anthropic-version": "2023-06-01",
},
body: JSON.stringify({
model: MODEL,
max_tokens: 2048,
thinking: {
type: "enabled",
budget_tokens: 1024,
},
messages: [
{
role: "user",
content: INITIAL_PROMPT,
},
{
role: "assistant",
content: [
{
type: "thinking",
thinking: thinkingText,
signature: truncatedSig, // TRUNCATED!
},
{
type: "text",
text: "2 + 2 = 4",
},
],
},
{
role: "user",
content: "And 3+3?",
},
],
}),
});
assert(
!badReplayResponse.ok,
`Replay with TRUNCATED (old bug) signature fails: HTTP ${badReplayResponse.status} — confirms the bug causes API rejection`
);
if (!badReplayResponse.ok) {
const errBody = await badReplayResponse.text();
const parsed = JSON.parse(errBody);
console.log(` Expected error: ${parsed.error?.message?.slice(0, 150)}`);
} else {
await badReplayResponse.text();
}
}
// Final summary
console.log(`\n--- Live API Replay: ${passed} passed, ${failed} failed ---`);
return { passed, failed };
}
async function main(): Promise<void> {
// Section 1: Patched code path proof (no API key needed)
const patchedResult = await runPatchedCodePathProof();
// Section 2: Live API replay proof (requires API key)
const liveResult = await runProof();
// Overall summary
const totalPassed = patchedResult.passed + liveResult.passed;
const totalFailed = patchedResult.failed + liveResult.failed;
console.log(`\n=== OVERALL PROOF SUMMARY ===`);
console.log(` Patched code path: ${patchedResult.passed} passed, ${patchedResult.failed} failed`);
console.log(` Live API replay: ${liveResult.passed} passed, ${liveResult.failed} failed`);
console.log(` Total: ${totalPassed} passed, ${totalFailed} failed`);
if (totalFailed > 0) {
console.log("\n❌ PROOF FAILED — some assertions did not pass");
process.exit(1);
} else {
console.log("\n✅ PROOF PASSED — signature concatenation fix verified");
process.exit(0);
}
}
main().catch((err) => {
console.error("Proof script error:", err);
process.exit(1);
});