From 145cb6d35aeaa0f68ab79e8728a6cf96357ec42d Mon Sep 17 00:00:00 2001 From: samzong Date: Sat, 9 May 2026 15:36:48 +0800 Subject: [PATCH] fix(agents): suppress DeepSeek DSML stream artifacts --- CHANGELOG.md | 4 + src/agents/deepseek-text-filter.test.ts | 63 ++++++++ src/agents/deepseek-text-filter.ts | 91 ++++++++++++ src/agents/openai-transport-stream.test.ts | 159 +++++++++++++++++++++ src/agents/openai-transport-stream.ts | 42 +++++- 5 files changed, 355 insertions(+), 4 deletions(-) create mode 100644 src/agents/deepseek-text-filter.test.ts create mode 100644 src/agents/deepseek-text-filter.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index b0dc0ed0373..7591854e350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -263,6 +263,10 @@ Docs: https://docs.openclaw.ai - Cron/heartbeat: let restricted cron-triggered runs read their own status and current-job list metadata again, preventing heartbeat STATUS freshness checks from going stale while preserving self-remove-only mutation limits. Fixes #78208. Thanks @amknight. - Channels/cron: ignore stale runtime conversation bindings that point at completed isolated cron run sessions, so follow-up DMs fall back to their normal route instead of reusing a closed cron task prompt. Fixes #78074. Thanks @amknight. - ACP: preserve streamed chunk boundaries in background-task progress summaries so CJK text, paths, URLs, and identifiers are no longer split with synthetic spaces. Fixes #78312. Thanks @amknight. +- Control UI/sessions: fire the documented `/new` command and lifecycle hooks only for explicit Control UI session creation, restoring session-memory and custom hook capture without changing SDK parent-session creates. Fixes #76957. Thanks @BunsDev. +- Exec approvals: fall back to a guarded copy when Windows rejects rename-overwrite for `exec-approvals.json`, while preserving symlink, hard-link, and owner-only permission safeguards. Fixes #77785. (#77907) Thanks @Alex-Alaniz and @MilleniumGenAI. +- Agents/DeepSeek: suppress provider-private DSML transport syntax (tool-use-error, tool-call, function-call shadow blocks) so it never leaks into assistant-visible text; native `delta.tool_calls` remains the only authoritative tool-call source. Thanks @samzong. +- Slack: preserve Socket Mode SDK error context and structured Slack API fields in reconnect logs, so startup failures no longer collapse to a bare `unknown error`. - Agents/subagents: preserve the delegated task prompt when a spawned target agent uses `systemPromptOverride`, so `sessions_spawn(mode: "run")` child runs still see their assigned task. Fixes #77950. Thanks @amknight. - Node/Windows: fall back to the Startup-folder launcher when Spanish-localized `schtasks` reports `Acceso denegado`, matching the existing access-denied fallback path. Fixes #77993. Thanks @jackonedev. - Agents/compaction: treat visible custom-message, bash, and branch-summary entries as real conversation anchors so safeguard mode does not write empty fallback summaries for cron and split-turn sessions with substantive tool work. Fixes #78300. Thanks @amknight. diff --git a/src/agents/deepseek-text-filter.test.ts b/src/agents/deepseek-text-filter.test.ts new file mode 100644 index 00000000000..fb7ff80014f --- /dev/null +++ b/src/agents/deepseek-text-filter.test.ts @@ -0,0 +1,63 @@ +import { describe, expect, it } from "vitest"; +import { createDeepSeekTextFilter } from "./deepseek-text-filter.js"; + +function filteredText(chunks: readonly string[]) { + const filter = createDeepSeekTextFilter(); + return [...chunks.flatMap((chunk) => filter.push(chunk)), ...filter.flush()].join(""); +} + +describe("createDeepSeekTextFilter", () => { + it.each([ + { + name: "tool_use_error in visible text", + chunks: [ + "before <|DSML|tool_use_error>write after", + ], + expected: "before after", + }, + { + name: "split open token", + chunks: ["before ", "<|DS", "ML|tool_calls>body", " after"], + expected: "before after", + }, + { + name: "singular tool_call close", + chunks: ["<|DSML|tool_call>read visible"], + expected: " visible", + }, + { + name: "singular open plural close", + chunks: ["<|DS", "ML|tool_call>read\n", ""], + expected: "", + }, + { + name: "unterminated block", + chunks: ["visible <|DSML|tool_calls>partial body, no close"], + expected: "visible ", + }, + { + name: "multiple blocks", + chunks: [ + "a<|DSML|tool_use_error>xb<|DSML|function_calls>yc", + ], + expected: "abc", + }, + ])("drops DSML: $name", ({ chunks, expected }) => { + const text = filteredText(chunks); + expect(text).toBe(expected); + expect(text).not.toContain("DSML"); + }); + + it("holds a partial open token until it can classify it", () => { + const filter = createDeepSeekTextFilter(); + const mid = filter.push("safe text<|DSM"); + expect(mid.join("")).not.toContain("<|DSM"); + + const all = [ + ...mid, + ...filter.push("L|tool_calls>body done"), + ...filter.flush(), + ]; + expect(all.join("")).toBe("safe text done"); + }); +}); diff --git a/src/agents/deepseek-text-filter.ts b/src/agents/deepseek-text-filter.ts new file mode 100644 index 00000000000..7ae7a9626e4 --- /dev/null +++ b/src/agents/deepseek-text-filter.ts @@ -0,0 +1,91 @@ +const DSML_KINDS = ["tool_use_error", "tool_calls", "tool_call", "function_calls"] as const; +const DSML_BARS = ["|", "|"] as const; + +const DSML_OPEN_TOKENS = DSML_BARS.flatMap((bar) => + DSML_KINDS.map((kind) => `<${bar}DSML${bar}${kind}>`), +); +const DSML_CLOSE_TOKENS = DSML_BARS.flatMap((bar) => + DSML_KINDS.map((kind) => ``), +); +const MAX_OPEN_TOKEN_LEN = Math.max(...DSML_OPEN_TOKENS.map((token) => token.length)); +const MAX_CLOSE_TOKEN_LEN = Math.max(...DSML_CLOSE_TOKENS.map((token) => token.length)); + +export interface DeepSeekTextFilter { + push(chunk: string): string[]; + flush(): string[]; +} + +export function createDeepSeekTextFilter(): DeepSeekTextFilter { + let buffer = ""; + let insideDsml = false; + + const consume = (final: boolean): string[] => { + const output: string[] = []; + const emit = (text: string) => { + if (text) { + output.push(text); + } + }; + + while (buffer) { + if (insideDsml) { + const close = findEarliestToken(buffer, DSML_CLOSE_TOKENS); + if (close) { + buffer = buffer.slice(close.index + close.token.length); + insideDsml = false; + continue; + } + const keep = final ? 0 : Math.min(buffer.length, MAX_CLOSE_TOKEN_LEN - 1); + buffer = buffer.slice(buffer.length - keep); + if (final) { + insideDsml = false; + } + return output; + } + + const open = findEarliestToken(buffer, DSML_OPEN_TOKENS); + if (open) { + emit(buffer.slice(0, open.index)); + buffer = buffer.slice(open.index + open.token.length); + insideDsml = true; + continue; + } + + if (final) { + emit(buffer); + buffer = ""; + return output; + } + + const emitLength = buffer.length - Math.min(buffer.length, MAX_OPEN_TOKEN_LEN - 1); + if (emitLength <= 0) { + return output; + } + emit(buffer.slice(0, emitLength)); + buffer = buffer.slice(emitLength); + return output; + } + return output; + }; + + return { + push(chunk: string) { + buffer += chunk; + return consume(false); + }, + flush() { + return consume(true); + }, + }; +} + +function findEarliestToken(text: string, tokens: readonly string[]) { + let best: { index: number; token: string } | null = null; + for (const token of tokens) { + const index = text.indexOf(token); + if (index !== -1 && (!best || index < best.index)) { + best = { index, token }; + } + } + return best; +} diff --git a/src/agents/openai-transport-stream.test.ts b/src/agents/openai-transport-stream.test.ts index e634d5ccc64..045a4d2b2da 100644 --- a/src/agents/openai-transport-stream.test.ts +++ b/src/agents/openai-transport-stream.test.ts @@ -21,6 +21,51 @@ import { } from "./provider-transport-stream.js"; import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; +type OpenAICompletionsOutput = Parameters[1]; + +type CapturedStreamEvent = { type?: string; delta?: string }; + +function createDeepSeekCompletionsModel(): Model<"openai-completions"> { + return { + id: "deepseek-v4-pro", + name: "DeepSeek V4 Pro", + api: "openai-completions", + provider: "deepseek", + baseUrl: "https://api.deepseek.com", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 1_000_000, + maxTokens: 384_000, + }; +} + +function createAssistantOutput(model: Model<"openai-completions">): OpenAICompletionsOutput { + return { + role: "assistant" as const, + content: [], + api: model.api, + provider: model.provider, + model: model.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; +} + +async function* streamChunks(chunks: readonly unknown[]): AsyncGenerator { + for (const chunk of chunks) { + yield chunk as never; + } +} + describe("openai transport stream", () => { it("adds OpenClaw attribution to native OpenAI transport headers and protects it from pi", () => { vi.stubEnv("OPENCLAW_VERSION", "2026.3.22"); @@ -781,6 +826,120 @@ describe("openai transport stream", () => { expect(output.stopReason).toBe("stop"); }); + it("filters DeepSeek DSML content without disturbing native tool calls", async () => { + const model = createDeepSeekCompletionsModel(); + const output = createAssistantOutput(model); + const events: CapturedStreamEvent[] = []; + + await __testing.processOpenAICompletionsStream( + streamChunks([ + { + id: "chatcmpl-deepseek-dsml", + object: "chat.completion.chunk", + created: 1, + model: model.id, + choices: [ + { + index: 0, + delta: { + content: "before <|DSML|tool_use_error>body after", + }, + logprobs: null, + finish_reason: null, + }, + ], + }, + { + id: "chatcmpl-deepseek-dsml", + object: "chat.completion.chunk", + created: 1, + model: model.id, + choices: [ + { + index: 0, + delta: { + content: "<|DSML|tool_calls>shadow", + tool_calls: [ + { + index: 0, + id: "call_native_1", + type: "function", + function: { name: "read", arguments: '{"path":"/tmp/native.md"}' }, + }, + ], + }, + logprobs: null, + finish_reason: "tool_calls", + }, + ], + }, + ]), + output, + model, + { push: (event) => events.push(event as CapturedStreamEvent) }, + ); + + expect(output.content).toEqual([ + { type: "text", text: "before after" }, + { + type: "toolCall", + id: "call_native_1", + name: "read", + arguments: { path: "/tmp/native.md" }, + partialArgs: '{"path":"/tmp/native.md"}', + }, + ]); + expect(JSON.stringify(events)).not.toContain("DSML"); + }); + + it("preserves DeepSeek visible content before same-chunk native tool calls", async () => { + const model = createDeepSeekCompletionsModel(); + const output = createAssistantOutput(model); + + await __testing.processOpenAICompletionsStream( + streamChunks([ + { + id: "chatcmpl-deepseek-native-tool", + object: "chat.completion.chunk", + created: 1, + model: model.id, + choices: [ + { + index: 0, + delta: { + content: "I'll check", + tool_calls: [ + { + index: 0, + id: "call_native_1", + type: "function", + function: { name: "read", arguments: '{"path":"/tmp/native.md"}' }, + }, + ], + }, + logprobs: null, + finish_reason: "tool_calls", + }, + ], + }, + ]), + output, + model, + { push() {} }, + ); + + expect(output.content).toEqual([ + { type: "text", text: "I'll check" }, + { + type: "toolCall", + id: "call_native_1", + name: "read", + arguments: { path: "/tmp/native.md" }, + partialArgs: '{"path":"/tmp/native.md"}', + }, + ]); + }); + it("keeps OpenRouter thinking format for declared OpenRouter providers on custom proxy URLs", () => { const params = buildOpenAICompletionsParams( attachModelProviderRequestTransport( diff --git a/src/agents/openai-transport-stream.ts b/src/agents/openai-transport-stream.ts index 3c57e1e92ab..7e3c1ec9006 100644 --- a/src/agents/openai-transport-stream.ts +++ b/src/agents/openai-transport-stream.ts @@ -27,6 +27,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import type { ProviderRuntimeModel } from "../plugins/provider-runtime-model.types.js"; import { resolveProviderTransportTurnStateWithPlugin } from "../plugins/provider-runtime.js"; import { buildCopilotDynamicHeaders, hasCopilotVisionInput } from "./copilot-dynamic-headers.js"; +import { createDeepSeekTextFilter } from "./deepseek-text-filter.js"; import { detectOpenAICompletionsCompat } from "./openai-completions-compat.js"; import { flattenCompletionMessagesToStringContent } from "./openai-completions-string-content.js"; import { resolveOpenAIReasoningEffortMap } from "./openai-reasoning-compat.js"; @@ -1351,6 +1352,9 @@ async function processOpenAICompletionsStream( const MAX_POST_TOOL_CALL_BUFFER_BYTES = 256_000; const MAX_TOOL_CALL_ARGUMENT_BUFFER_BYTES = 256_000; const compat = getCompat(model as OpenAIModeModel); + const deepSeekTextFilter = shouldFilterDeepSeekDsmlText(compat) + ? createDeepSeekTextFilter() + : null; let currentBlock: | { type: "text"; text: string } | { type: "thinking"; thinking: string; thinkingSignature?: string } @@ -1466,6 +1470,24 @@ async function processOpenAICompletionsStream( flushPendingPostToolCallDeltas(); appendTextDeltaInternal(text); }; + const appendVisibleTextDelta = (text: string) => { + if (!text) { + return; + } + if (currentBlock?.type === "toolCall") { + queuePostToolCallDelta({ kind: "text", text }); + } else { + appendTextDelta(text); + } + }; + const flushDeepSeekTextFilter = () => { + if (!deepSeekTextFilter) { + return; + } + for (const event of deepSeekTextFilter.flush()) { + appendVisibleTextDelta(event); + } + }; for await (const rawChunk of responseStream as AsyncIterable) { if (!rawChunk || typeof rawChunk !== "object") { continue; @@ -1501,19 +1523,26 @@ async function processOpenAICompletionsStream( if (currentBlock?.type === "toolCall") { queuePostToolCallDelta(contentDelta); } else if (contentDelta.kind === "text") { - appendTextDelta(contentDelta.text); + const filtered = deepSeekTextFilter?.push(contentDelta.text) ?? [contentDelta.text]; + for (const part of filtered) { + appendVisibleTextDelta(part); + } } else { appendThinkingDelta(contentDelta); } } - if (contentDeltas.length > 0) { - continue; - } + } + if (!choice.delta.content) { + flushDeepSeekTextFilter(); } const reasoningDeltas = getCompletionsReasoningDeltas( choice.delta as Record, compat.visibleReasoningDetailTypes, ); + const hasNativeToolCalls = Boolean(choice.delta.tool_calls?.length); + if (choice.delta.content && (reasoningDeltas.length > 0 || hasNativeToolCalls)) { + flushDeepSeekTextFilter(); + } for (const reasoningDelta of reasoningDeltas) { if (currentBlock?.type === "toolCall") { queuePostToolCallDelta({ ...reasoningDelta }); @@ -1586,6 +1615,7 @@ async function processOpenAICompletionsStream( } flushPendingPostToolCallDeltas(); } + flushDeepSeekTextFilter(); finishCurrentBlock(); if (currentBlock?.type === "toolCall") { currentBlock = null; @@ -1608,6 +1638,10 @@ type CompletionsReasoningDelta = text: string; }; +function shouldFilterDeepSeekDsmlText(compat: ReturnType) { + return compat.thinkingFormat === "deepseek"; +} + function getCompletionsContentDeltas(content: unknown): CompletionsReasoningDelta[] { if (typeof content === "string") { return content ? [{ kind: "text", text: content }] : [];