diff --git a/CHANGELOG.md b/CHANGELOG.md index a940daf05a1..278c6b96a1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai - Cron: keep legacy string schedules and blank system-event jobs available for runtime repair/skip handling instead of dropping them as malformed persisted rows. - Task persistence: drop malformed array/scalar requester-origin JSON from task and task-flow SQLite sidecars instead of restoring it as delivery metadata. - Agents/timeouts: clarify model idle-timeout errors and docs so provider `timeoutSeconds` is shown as bounded by the whole agent/run timeout ceiling. +- Agents/OpenAI streams: yield cooperatively while processing bursty Completions and Responses chunks, keeping aborts, channel liveness timers, and startup heartbeats responsive under noisy model output. Refs #82462. - Release tooling: align the published launcher Node floor, `npm start`, package script checks, sharded lint locking, Vitest root project coverage, and plugin-SDK declaration build cache metadata so release/package validation does not silently skip or ship stale surfaces. - Cron/agents: honor configured subagent model fallbacks for isolated scheduled runs and forward that fallback policy into embedded agent timeout failover. Fixes #74985. Thanks @chrisgwynne. - Codex app-server/MCP: scope user MCP servers to specific OpenClaw agent ids through an optional `mcp.servers..codex.agents` list and accept `codex.defaultToolsApprovalMode` (`auto`/`prompt`/`approve`) for native Codex approval defaults; OpenClaw strips the `codex` block before handing `mcp_servers` config to Codex. (#82180) Thanks @sercada. diff --git a/src/agents/openai-transport-stream.test.ts b/src/agents/openai-transport-stream.test.ts index ed8969ba5d8..0a555b061d7 100644 --- a/src/agents/openai-transport-stream.test.ts +++ b/src/agents/openai-transport-stream.test.ts @@ -1034,6 +1034,85 @@ describe("openai transport stream", () => { }); }); + it("yields to aborts during bursty OpenAI-compatible streams", async () => { + const model = { + id: "deepseek-v4-flash", + name: "DeepSeek V4 Flash", + api: "openai-completions", + provider: "opencode-go", + baseUrl: "http://localhost:8000/v1", + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128000, + maxTokens: 4096, + } satisfies Model<"openai-completions">; + const output = createAssistantOutput(model); + const abort = new AbortController(); + const stream = { push: vi.fn() }; + let yieldedToTimer = false; + + async function* mockStream() { + for (let index = 0; index < 512; index += 1) { + yield { + id: "chatcmpl-bursty", + object: "chat.completion.chunk" as const, + created: 1775425651, + model: model.id, + choices: [ + { + index: 0, + delta: { role: "assistant" as const, content: "x" }, + logprobs: null, + finish_reason: null, + }, + ], + }; + } + } + + setTimeout(() => { + yieldedToTimer = true; + abort.abort(); + }, 0); + + await expect( + __testing.processOpenAICompletionsStream(mockStream(), output, model, stream, { + signal: abort.signal, + }), + ).rejects.toThrow("Request was aborted"); + expect(yieldedToTimer).toBe(true); + expect(stream.push.mock.calls.length).toBeLessThan(512); + }); + + it("yields to aborts during bursty Responses streams", async () => { + const model = createAzureResponsesModel(); + const output = createResponsesAssistantOutput(model); + const abort = new AbortController(); + const stream = { push: vi.fn() }; + let yieldedToTimer = false; + + async function* mockStream() { + yield { type: "response.output_item.added", item: { type: "message" } }; + for (let index = 0; index < 512; index += 1) { + yield { type: "response.output_text.delta", delta: "x" }; + } + } + + setTimeout(() => { + yieldedToTimer = true; + abort.abort(); + }, 0); + + await expect( + __testing.processResponsesStream(mockStream(), output, stream, model, { + signal: abort.signal, + }), + ).rejects.toThrow("Request was aborted"); + expect(yieldedToTimer).toBe(true); + expect(stream.push.mock.calls.length).toBeLessThan(512); + }); + it("skips null and non-object OpenAI-compatible stream chunks", async () => { const model = { id: "glm-5", diff --git a/src/agents/openai-transport-stream.ts b/src/agents/openai-transport-stream.ts index f72b1f696b5..3cc8061794a 100644 --- a/src/agents/openai-transport-stream.ts +++ b/src/agents/openai-transport-stream.ts @@ -73,6 +73,8 @@ const DEFAULT_AZURE_OPENAI_API_VERSION = "preview"; const OPENAI_CODEX_RESPONSES_EMPTY_INPUT_TEXT = " "; const GEMINI_THOUGHT_SIGNATURE_VALIDATOR_SKIP = "skip_thought_signature_validator"; const AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS = 30_000; +const MODEL_STREAM_COOPERATIVE_YIELD_INTERVAL_MS = 12; +const MODEL_STREAM_COOPERATIVE_YIELD_MAX_EVENTS = 64; const log = createSubsystemLogger("openai-transport"); type ReplayableResponseOutputMessage = Omit & { id?: string }; @@ -92,6 +94,42 @@ type BaseStreamOptions = { responseFormat?: Record; }; +type ModelStreamCooperativeScheduler = { + afterEvent: () => Promise; +}; + +function throwIfModelStreamAborted(signal?: AbortSignal): void { + if (signal?.aborted) { + throw new Error("Request was aborted"); + } +} + +function createModelStreamCooperativeScheduler( + signal?: AbortSignal, +): ModelStreamCooperativeScheduler { + let lastYieldedAt = Date.now(); + let eventsSinceYield = 0; + return { + async afterEvent() { + throwIfModelStreamAborted(signal); + eventsSinceYield += 1; + const now = Date.now(); + if ( + eventsSinceYield < MODEL_STREAM_COOPERATIVE_YIELD_MAX_EVENTS && + now - lastYieldedAt < MODEL_STREAM_COOPERATIVE_YIELD_INTERVAL_MS + ) { + return; + } + eventsSinceYield = 0; + lastYieldedAt = now; + await new Promise((resolve) => { + setImmediate(resolve); + }); + throwIfModelStreamAborted(signal); + }, + }; +} + type OpenAIResponsesOptions = BaseStreamOptions & { reasoning?: OpenAIReasoningEffort; reasoningEffort?: OpenAIReasoningEffort; @@ -722,6 +760,7 @@ async function processResponsesStream( serviceTier?: ResponseCreateParamsStreaming["service_tier"], ) => void; firstEventTimeoutMs?: number; + signal?: AbortSignal; }, ) { let currentItem: Record | null = null; @@ -736,7 +775,9 @@ async function processResponsesStream( model, options?.firstEventTimeoutMs, ); + const cooperativeScheduler = createModelStreamCooperativeScheduler(options?.signal); for await (const rawEvent of guardedStream) { + throwIfModelStreamAborted(options?.signal); const event = rawEvent as Record; const type = stringifyUnknown(event.type); eventCount += 1; @@ -933,6 +974,7 @@ async function processResponsesStream( : "Unknown error (no error details in response)"; throw new Error(msg); } + await cooperativeScheduler.afterEvent(); } const eventTypeSummary = [...eventTypes.entries()] .slice(0, 12) @@ -1141,6 +1183,7 @@ export function createOpenAIResponsesTransportStreamFn(): StreamFn { await processResponsesStream(responseStream, output, stream, model, { serviceTier: (options as OpenAIResponsesOptions | undefined)?.serviceTier, applyServiceTierPricing, + signal: options?.signal, }); if (options?.signal?.aborted) { throw new Error("Request was aborted"); @@ -1538,6 +1581,7 @@ export function createAzureOpenAIResponsesTransportStreamFn(): StreamFn { stream.push({ type: "start", partial: output as never }); await processResponsesStream(responseStream, output, stream, model, { firstEventTimeoutMs: AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS, + signal: options?.signal, }); if (options?.signal?.aborted) { throw new Error("Request was aborted"); @@ -1738,7 +1782,9 @@ export function createOpenAICompletionsTransportStreamFn(): StreamFn { buildOpenAISdkRequestOptions(model, options?.signal), )) as unknown as AsyncIterable; stream.push({ type: "start", partial: output as never }); - await processOpenAICompletionsStream(responseStream, output, model, stream); + await processOpenAICompletionsStream(responseStream, output, model, stream, { + signal: options?.signal, + }); if (options?.signal?.aborted) { throw new Error("Request was aborted"); } @@ -1760,6 +1806,7 @@ async function processOpenAICompletionsStream( output: MutableAssistantOutput, model: Model, stream: { push(event: unknown): void }, + options?: { signal?: AbortSignal }, ) { const MAX_POST_TOOL_CALL_BUFFER_BYTES = 256_000; const MAX_TOOL_CALL_ARGUMENT_BUFFER_BYTES = 256_000; @@ -1907,8 +1954,11 @@ async function processOpenAICompletionsStream( appendVisibleTextDelta(part); } }; + const cooperativeScheduler = createModelStreamCooperativeScheduler(options?.signal); for await (const rawChunk of responseStream as AsyncIterable) { + throwIfModelStreamAborted(options?.signal); if (!rawChunk || typeof rawChunk !== "object") { + await cooperativeScheduler.afterEvent(); continue; } const chunk = rawChunk as ChatCompletionChunk; @@ -1918,6 +1968,7 @@ async function processOpenAICompletionsStream( } const choice = Array.isArray(chunk.choices) ? chunk.choices[0] : undefined; if (!choice) { + await cooperativeScheduler.afterEvent(); continue; } const choiceUsage = (choice as unknown as { usage?: ChatCompletionChunk["usage"] }).usage; @@ -1935,6 +1986,7 @@ async function processOpenAICompletionsStream( choice.delta ?? (choice as unknown as { message?: ChatCompletionChunk["choices"][number]["delta"] }).message; if (!choiceDelta) { + await cooperativeScheduler.afterEvent(); continue; } if (choiceDelta.content) { @@ -2026,6 +2078,7 @@ async function processOpenAICompletionsStream( } } flushPendingPostToolCallDeltas(); + await cooperativeScheduler.afterEvent(); } flushDeepSeekTextFilterAtEnd(); finishCurrentBlock();