mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:40:44 +00:00
fix(feishu): back off failed streaming card starts
This commit is contained in:
@@ -89,6 +89,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Feishu: back off streaming-card creation after HTTP 400 startup failures, so unsupported card setups fall back without delaying every message. Fixes #56981. Thanks @JinnanDuan.
|
||||
- Feishu: suppress duplicate final card delivery when idle closes a streaming card before the final payload arrives. (#68491) Thanks @MoerAI.
|
||||
- Signal: preserve sender attachment filenames and resolve missing MIME types from those filenames, so Linux `signal-cli` voice notes without `contentType` still enter audio transcription. Fixes #48614. Thanks @mindfury.
|
||||
- Telegram/agents: suppress the phantom "Agent couldn't generate a response" fallback after a reply was already committed through the messaging tool. (#70623) Thanks @chinar-amrutkar.
|
||||
|
||||
@@ -12,7 +12,102 @@
|
||||
"skills": ["./skills"],
|
||||
"configSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {}
|
||||
"additionalProperties": true,
|
||||
"$defs": {
|
||||
"secretRef": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"source": {
|
||||
"type": "string",
|
||||
"enum": ["env", "file", "exec"]
|
||||
},
|
||||
"provider": { "type": "string" },
|
||||
"id": { "type": "string" }
|
||||
},
|
||||
"required": ["source", "provider", "id"]
|
||||
},
|
||||
"secretInput": {
|
||||
"anyOf": [{ "type": "string", "minLength": 1 }, { "$ref": "#/$defs/secretRef" }]
|
||||
},
|
||||
"account": {
|
||||
"type": "object",
|
||||
"additionalProperties": true,
|
||||
"properties": {
|
||||
"enabled": { "type": "boolean" },
|
||||
"name": { "type": "string" },
|
||||
"appId": { "type": "string" },
|
||||
"appSecret": { "$ref": "#/$defs/secretInput" },
|
||||
"encryptKey": { "$ref": "#/$defs/secretInput" },
|
||||
"verificationToken": { "$ref": "#/$defs/secretInput" },
|
||||
"domain": {
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string",
|
||||
"enum": ["feishu", "lark"]
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"format": "uri"
|
||||
}
|
||||
]
|
||||
},
|
||||
"connectionMode": {
|
||||
"type": "string",
|
||||
"enum": ["websocket", "webhook"]
|
||||
},
|
||||
"renderMode": {
|
||||
"type": "string",
|
||||
"enum": ["auto", "raw", "card"]
|
||||
},
|
||||
"streaming": { "type": "boolean" },
|
||||
"replyInThread": {
|
||||
"type": "string",
|
||||
"enum": ["disabled", "enabled"]
|
||||
},
|
||||
"typingIndicator": { "type": "boolean" }
|
||||
}
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"enabled": { "type": "boolean" },
|
||||
"defaultAccount": { "type": "string" },
|
||||
"appId": { "type": "string" },
|
||||
"appSecret": { "$ref": "#/$defs/secretInput" },
|
||||
"encryptKey": { "$ref": "#/$defs/secretInput" },
|
||||
"verificationToken": { "$ref": "#/$defs/secretInput" },
|
||||
"domain": {
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string",
|
||||
"enum": ["feishu", "lark"]
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"format": "uri"
|
||||
}
|
||||
]
|
||||
},
|
||||
"connectionMode": {
|
||||
"type": "string",
|
||||
"enum": ["websocket", "webhook"]
|
||||
},
|
||||
"renderMode": {
|
||||
"type": "string",
|
||||
"enum": ["auto", "raw", "card"]
|
||||
},
|
||||
"streaming": { "type": "boolean" },
|
||||
"replyInThread": {
|
||||
"type": "string",
|
||||
"enum": ["disabled", "enabled"]
|
||||
},
|
||||
"typingIndicator": { "type": "boolean" },
|
||||
"accounts": {
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"$ref": "#/$defs/account"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,13 +86,17 @@ vi.mock("./streaming-card.js", () => {
|
||||
};
|
||||
});
|
||||
|
||||
import { createFeishuReplyDispatcher } from "./reply-dispatcher.js";
|
||||
import {
|
||||
clearFeishuStreamingStartBackoffForTests,
|
||||
createFeishuReplyDispatcher,
|
||||
} from "./reply-dispatcher.js";
|
||||
|
||||
describe("createFeishuReplyDispatcher streaming behavior", () => {
|
||||
type ReplyDispatcherArgs = Parameters<typeof createFeishuReplyDispatcher>[0];
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
clearFeishuStreamingStartBackoffForTests();
|
||||
streamingInstances.length = 0;
|
||||
sendMediaFeishuMock.mockResolvedValue(undefined);
|
||||
sendStructuredCardFeishuMock.mockResolvedValue(undefined);
|
||||
@@ -731,9 +735,10 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("recovers streaming after start() throws (HTTP 400)", async () => {
|
||||
it("backs off streaming retries after start() throws (HTTP 400)", async () => {
|
||||
const errorMock = vi.fn();
|
||||
let shouldFailStart = true;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_000);
|
||||
|
||||
// Intercept streaming instance creation to make first start() reject
|
||||
const origPush = streamingInstances.push.bind(streamingInstances);
|
||||
@@ -758,22 +763,33 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
|
||||
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
|
||||
|
||||
// First deliver with markdown triggers startStreaming - which will fail
|
||||
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "block" });
|
||||
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" });
|
||||
|
||||
// Wait for the async error to propagate
|
||||
await vi.waitFor(() => {
|
||||
expect(errorMock).toHaveBeenCalledWith(expect.stringContaining("streaming start failed"));
|
||||
});
|
||||
expect(streamingInstances).toHaveLength(1);
|
||||
expect(sendStructuredCardFeishuMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Second deliver should create a NEW streaming session (not stuck)
|
||||
// Immediate next markdown reply should skip a new streaming start and
|
||||
// fall back directly to a normal card instead of paying the 400 latency.
|
||||
await options.deliver({ text: "```ts\nconst y = 2\n```" }, { kind: "final" });
|
||||
|
||||
// Two instances created: first failed, second succeeded and closed
|
||||
expect(streamingInstances).toHaveLength(1);
|
||||
expect(sendStructuredCardFeishuMock).toHaveBeenCalledTimes(2);
|
||||
|
||||
// After the short backoff expires, retry streaming so fixed permissions
|
||||
// or transient Feishu failures recover without a process restart.
|
||||
nowSpy.mockReturnValue(62_000);
|
||||
await options.deliver({ text: "```ts\nconst z = 3\n```" }, { kind: "final" });
|
||||
|
||||
expect(streamingInstances).toHaveLength(2);
|
||||
expect(streamingInstances[1].start).toHaveBeenCalled();
|
||||
expect(streamingInstances[1].close).toHaveBeenCalled();
|
||||
} finally {
|
||||
streamingInstances.push = origPush;
|
||||
nowSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -32,6 +32,30 @@ function shouldUseCard(text: string): boolean {
|
||||
* Messages older than this are likely replays after context compaction (#30418). */
|
||||
const TYPING_INDICATOR_MAX_AGE_MS = 2 * 60_000;
|
||||
const MS_EPOCH_MIN = 1_000_000_000_000;
|
||||
const STREAMING_START_FAILURE_BACKOFF_MS = 60_000;
|
||||
const streamingStartBackoffUntilByAccount = new Map<string, number>();
|
||||
|
||||
function isStreamingStartBackedOff(accountId: string, now = Date.now()): boolean {
|
||||
const backoffUntil = streamingStartBackoffUntilByAccount.get(accountId);
|
||||
if (backoffUntil === undefined) {
|
||||
return false;
|
||||
}
|
||||
if (backoffUntil <= now) {
|
||||
streamingStartBackoffUntilByAccount.delete(accountId);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function rememberStreamingStartFailure(accountId: string, now = Date.now()): number {
|
||||
const backoffUntil = now + STREAMING_START_FAILURE_BACKOFF_MS;
|
||||
streamingStartBackoffUntilByAccount.set(accountId, backoffUntil);
|
||||
return backoffUntil;
|
||||
}
|
||||
|
||||
export function clearFeishuStreamingStartBackoffForTests() {
|
||||
streamingStartBackoffUntilByAccount.clear();
|
||||
}
|
||||
|
||||
function normalizeEpochMs(timestamp: number | undefined): number | undefined {
|
||||
if (!Number.isFinite(timestamp) || timestamp === undefined || timestamp <= 0) {
|
||||
@@ -266,7 +290,12 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
|
||||
};
|
||||
|
||||
const startStreaming = () => {
|
||||
if (!streamingEnabled || streamingStartPromise || streaming) {
|
||||
if (
|
||||
!streamingEnabled ||
|
||||
streamingStartPromise ||
|
||||
streaming ||
|
||||
isStreamingStartBackedOff(account.accountId)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
streamingStartPromise = (async () => {
|
||||
@@ -291,10 +320,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
|
||||
header: cardHeader,
|
||||
note: cardNote,
|
||||
});
|
||||
streamingStartBackoffUntilByAccount.delete(account.accountId);
|
||||
} catch (error) {
|
||||
params.runtime.error?.(`feishu: streaming start failed: ${String(error)}`);
|
||||
rememberStreamingStartFailure(account.accountId);
|
||||
params.runtime.error?.(
|
||||
`feishu[${account.accountId}]: streaming start failed; using non-streaming card fallback for ${
|
||||
STREAMING_START_FAILURE_BACKOFF_MS / 1000
|
||||
}s: ${String(error)}`,
|
||||
);
|
||||
streaming = null;
|
||||
streamingStartPromise = null; // allow retry on next deliver
|
||||
streamingStartPromise = null;
|
||||
}
|
||||
})();
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user