fix(agents): yield during model stream bursts

This commit is contained in:
Peter Steinberger
2026-05-16 12:08:27 +01:00
parent f50c65f124
commit 210ff7d318
3 changed files with 134 additions and 1 deletions

View File

@@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai
- Cron: keep legacy string schedules and blank system-event jobs available for runtime repair/skip handling instead of dropping them as malformed persisted rows.
- Task persistence: drop malformed array/scalar requester-origin JSON from task and task-flow SQLite sidecars instead of restoring it as delivery metadata.
- Agents/timeouts: clarify model idle-timeout errors and docs so provider `timeoutSeconds` is shown as bounded by the whole agent/run timeout ceiling.
- Agents/OpenAI streams: yield cooperatively while processing bursty Completions and Responses chunks, keeping aborts, channel liveness timers, and startup heartbeats responsive under noisy model output. Refs #82462.
- Release tooling: align the published launcher Node floor, `npm start`, package script checks, sharded lint locking, Vitest root project coverage, and plugin-SDK declaration build cache metadata so release/package validation does not silently skip or ship stale surfaces.
- Cron/agents: honor configured subagent model fallbacks for isolated scheduled runs and forward that fallback policy into embedded agent timeout failover. Fixes #74985. Thanks @chrisgwynne.
- Codex app-server/MCP: scope user MCP servers to specific OpenClaw agent ids through an optional `mcp.servers.<name>.codex.agents` list and accept `codex.defaultToolsApprovalMode` (`auto`/`prompt`/`approve`) for native Codex approval defaults; OpenClaw strips the `codex` block before handing `mcp_servers` config to Codex. (#82180) Thanks @sercada.

View File

@@ -1034,6 +1034,85 @@ describe("openai transport stream", () => {
});
});
it("yields to aborts during bursty OpenAI-compatible streams", async () => {
const model = {
id: "deepseek-v4-flash",
name: "DeepSeek V4 Flash",
api: "openai-completions",
provider: "opencode-go",
baseUrl: "http://localhost:8000/v1",
reasoning: false,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128000,
maxTokens: 4096,
} satisfies Model<"openai-completions">;
const output = createAssistantOutput(model);
const abort = new AbortController();
const stream = { push: vi.fn() };
let yieldedToTimer = false;
async function* mockStream() {
for (let index = 0; index < 512; index += 1) {
yield {
id: "chatcmpl-bursty",
object: "chat.completion.chunk" as const,
created: 1775425651,
model: model.id,
choices: [
{
index: 0,
delta: { role: "assistant" as const, content: "x" },
logprobs: null,
finish_reason: null,
},
],
};
}
}
setTimeout(() => {
yieldedToTimer = true;
abort.abort();
}, 0);
await expect(
__testing.processOpenAICompletionsStream(mockStream(), output, model, stream, {
signal: abort.signal,
}),
).rejects.toThrow("Request was aborted");
expect(yieldedToTimer).toBe(true);
expect(stream.push.mock.calls.length).toBeLessThan(512);
});
it("yields to aborts during bursty Responses streams", async () => {
const model = createAzureResponsesModel();
const output = createResponsesAssistantOutput(model);
const abort = new AbortController();
const stream = { push: vi.fn() };
let yieldedToTimer = false;
async function* mockStream() {
yield { type: "response.output_item.added", item: { type: "message" } };
for (let index = 0; index < 512; index += 1) {
yield { type: "response.output_text.delta", delta: "x" };
}
}
setTimeout(() => {
yieldedToTimer = true;
abort.abort();
}, 0);
await expect(
__testing.processResponsesStream(mockStream(), output, stream, model, {
signal: abort.signal,
}),
).rejects.toThrow("Request was aborted");
expect(yieldedToTimer).toBe(true);
expect(stream.push.mock.calls.length).toBeLessThan(512);
});
it("skips null and non-object OpenAI-compatible stream chunks", async () => {
const model = {
id: "glm-5",

View File

@@ -73,6 +73,8 @@ const DEFAULT_AZURE_OPENAI_API_VERSION = "preview";
const OPENAI_CODEX_RESPONSES_EMPTY_INPUT_TEXT = " ";
const GEMINI_THOUGHT_SIGNATURE_VALIDATOR_SKIP = "skip_thought_signature_validator";
const AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS = 30_000;
const MODEL_STREAM_COOPERATIVE_YIELD_INTERVAL_MS = 12;
const MODEL_STREAM_COOPERATIVE_YIELD_MAX_EVENTS = 64;
const log = createSubsystemLogger("openai-transport");
type ReplayableResponseOutputMessage = Omit<ResponseOutputMessage, "id"> & { id?: string };
@@ -92,6 +94,42 @@ type BaseStreamOptions = {
responseFormat?: Record<string, unknown>;
};
type ModelStreamCooperativeScheduler = {
afterEvent: () => Promise<void>;
};
function throwIfModelStreamAborted(signal?: AbortSignal): void {
if (signal?.aborted) {
throw new Error("Request was aborted");
}
}
function createModelStreamCooperativeScheduler(
signal?: AbortSignal,
): ModelStreamCooperativeScheduler {
let lastYieldedAt = Date.now();
let eventsSinceYield = 0;
return {
async afterEvent() {
throwIfModelStreamAborted(signal);
eventsSinceYield += 1;
const now = Date.now();
if (
eventsSinceYield < MODEL_STREAM_COOPERATIVE_YIELD_MAX_EVENTS &&
now - lastYieldedAt < MODEL_STREAM_COOPERATIVE_YIELD_INTERVAL_MS
) {
return;
}
eventsSinceYield = 0;
lastYieldedAt = now;
await new Promise<void>((resolve) => {
setImmediate(resolve);
});
throwIfModelStreamAborted(signal);
},
};
}
type OpenAIResponsesOptions = BaseStreamOptions & {
reasoning?: OpenAIReasoningEffort;
reasoningEffort?: OpenAIReasoningEffort;
@@ -722,6 +760,7 @@ async function processResponsesStream(
serviceTier?: ResponseCreateParamsStreaming["service_tier"],
) => void;
firstEventTimeoutMs?: number;
signal?: AbortSignal;
},
) {
let currentItem: Record<string, unknown> | null = null;
@@ -736,7 +775,9 @@ async function processResponsesStream(
model,
options?.firstEventTimeoutMs,
);
const cooperativeScheduler = createModelStreamCooperativeScheduler(options?.signal);
for await (const rawEvent of guardedStream) {
throwIfModelStreamAborted(options?.signal);
const event = rawEvent as Record<string, unknown>;
const type = stringifyUnknown(event.type);
eventCount += 1;
@@ -933,6 +974,7 @@ async function processResponsesStream(
: "Unknown error (no error details in response)";
throw new Error(msg);
}
await cooperativeScheduler.afterEvent();
}
const eventTypeSummary = [...eventTypes.entries()]
.slice(0, 12)
@@ -1141,6 +1183,7 @@ export function createOpenAIResponsesTransportStreamFn(): StreamFn {
await processResponsesStream(responseStream, output, stream, model, {
serviceTier: (options as OpenAIResponsesOptions | undefined)?.serviceTier,
applyServiceTierPricing,
signal: options?.signal,
});
if (options?.signal?.aborted) {
throw new Error("Request was aborted");
@@ -1538,6 +1581,7 @@ export function createAzureOpenAIResponsesTransportStreamFn(): StreamFn {
stream.push({ type: "start", partial: output as never });
await processResponsesStream(responseStream, output, stream, model, {
firstEventTimeoutMs: AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS,
signal: options?.signal,
});
if (options?.signal?.aborted) {
throw new Error("Request was aborted");
@@ -1738,7 +1782,9 @@ export function createOpenAICompletionsTransportStreamFn(): StreamFn {
buildOpenAISdkRequestOptions(model, options?.signal),
)) as unknown as AsyncIterable<ChatCompletionChunk>;
stream.push({ type: "start", partial: output as never });
await processOpenAICompletionsStream(responseStream, output, model, stream);
await processOpenAICompletionsStream(responseStream, output, model, stream, {
signal: options?.signal,
});
if (options?.signal?.aborted) {
throw new Error("Request was aborted");
}
@@ -1760,6 +1806,7 @@ async function processOpenAICompletionsStream(
output: MutableAssistantOutput,
model: Model<Api>,
stream: { push(event: unknown): void },
options?: { signal?: AbortSignal },
) {
const MAX_POST_TOOL_CALL_BUFFER_BYTES = 256_000;
const MAX_TOOL_CALL_ARGUMENT_BUFFER_BYTES = 256_000;
@@ -1907,8 +1954,11 @@ async function processOpenAICompletionsStream(
appendVisibleTextDelta(part);
}
};
const cooperativeScheduler = createModelStreamCooperativeScheduler(options?.signal);
for await (const rawChunk of responseStream as AsyncIterable<unknown>) {
throwIfModelStreamAborted(options?.signal);
if (!rawChunk || typeof rawChunk !== "object") {
await cooperativeScheduler.afterEvent();
continue;
}
const chunk = rawChunk as ChatCompletionChunk;
@@ -1918,6 +1968,7 @@ async function processOpenAICompletionsStream(
}
const choice = Array.isArray(chunk.choices) ? chunk.choices[0] : undefined;
if (!choice) {
await cooperativeScheduler.afterEvent();
continue;
}
const choiceUsage = (choice as unknown as { usage?: ChatCompletionChunk["usage"] }).usage;
@@ -1935,6 +1986,7 @@ async function processOpenAICompletionsStream(
choice.delta ??
(choice as unknown as { message?: ChatCompletionChunk["choices"][number]["delta"] }).message;
if (!choiceDelta) {
await cooperativeScheduler.afterEvent();
continue;
}
if (choiceDelta.content) {
@@ -2026,6 +2078,7 @@ async function processOpenAICompletionsStream(
}
}
flushPendingPostToolCallDeltas();
await cooperativeScheduler.afterEvent();
}
flushDeepSeekTextFilterAtEnd();
finishCurrentBlock();