mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 18:50:42 +00:00
fix(telegram): recover sticky fallback transport
This commit is contained in:
@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Channels/streaming: make progress draft labels scroll away with other progress lines, render structured tool rows as compact emoji/title/details, show web-search queries from provider-native argument shapes, and skip empty Discord apply-patch starts until a patch summary exists. (#79146)
|
||||
- Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus.
|
||||
- Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip.
|
||||
- Telegram: re-probe the primary fetch transport after repeated sticky fallback success so transient IPv4 or pinned-IP fallback promotion can recover without a gateway restart. Fixes #77088. (#77157) Thanks @MkDev11.
|
||||
- Runtime/install: raise the supported Node 22 floor to `22.16+` so native SQLite query handling can rely on the `node:sqlite` statement metadata API while continuing to recommend Node 24. (#78921)
|
||||
- Discord/voice: include a bounded one-line STT transcript preview in verbose voice logs so live voice debugging shows what speakers said before the agent reply.
|
||||
- Codex app-server: pin the managed Codex harness and Codex CLI smoke package to `@openai/codex@0.129.0`, defer OpenClaw integration dynamic tools behind Codex tool search by default, and accept current Codex service-tier values so legacy `fast` settings survive the stable harness upgrade as `priority`.
|
||||
|
||||
@@ -787,8 +787,11 @@ describe("resolveTelegramFetch", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("retries once and then keeps sticky IPv4 dispatcher for subsequent requests", async () => {
|
||||
primeStickyFallbackRetry("ETIMEDOUT");
|
||||
it("retries once, keeps sticky IPv4, then recovers to primary dispatcher", async () => {
|
||||
undiciFetch.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"));
|
||||
for (let i = 0; i < 7; i += 1) {
|
||||
undiciFetch.mockResolvedValueOnce({ ok: true } as Response);
|
||||
}
|
||||
|
||||
const resolved = resolveTelegramFetchOrThrow(undefined, {
|
||||
network: {
|
||||
@@ -797,20 +800,30 @@ describe("resolveTelegramFetch", () => {
|
||||
});
|
||||
|
||||
await resolved("https://api.telegram.org/botx/sendMessage");
|
||||
await resolved("https://api.telegram.org/botx/sendChatAction");
|
||||
for (let i = 0; i < 4; i += 1) {
|
||||
await resolved(`https://api.telegram.org/botx/sendChatAction?sticky=${i}`);
|
||||
}
|
||||
await resolved("https://api.telegram.org/botx/getMe");
|
||||
await resolved("https://api.telegram.org/botx/deleteWebhook");
|
||||
|
||||
expect(undiciFetch).toHaveBeenCalledTimes(3);
|
||||
expect(undiciFetch).toHaveBeenCalledTimes(8);
|
||||
|
||||
const firstDispatcher = getDispatcherFromUndiciCall(1);
|
||||
const secondDispatcher = getDispatcherFromUndiciCall(2);
|
||||
const thirdDispatcher = getDispatcherFromUndiciCall(3);
|
||||
const sixthDispatcher = getDispatcherFromUndiciCall(6);
|
||||
const seventhDispatcher = getDispatcherFromUndiciCall(7);
|
||||
const eighthDispatcher = getDispatcherFromUndiciCall(8);
|
||||
|
||||
expect(firstDispatcher).toBeDefined();
|
||||
expect(secondDispatcher).toBeDefined();
|
||||
expect(thirdDispatcher).toBeDefined();
|
||||
expect(sixthDispatcher).toBeDefined();
|
||||
expect(seventhDispatcher).toBeDefined();
|
||||
expect(eighthDispatcher).toBeDefined();
|
||||
|
||||
expect(firstDispatcher).not.toBe(secondDispatcher);
|
||||
expect(secondDispatcher).toBe(thirdDispatcher);
|
||||
expect(secondDispatcher).toBe(sixthDispatcher);
|
||||
expect(seventhDispatcher).toBe(firstDispatcher);
|
||||
expect(eighthDispatcher).toBe(firstDispatcher);
|
||||
|
||||
expectStickyAutoSelectDispatcher(firstDispatcher);
|
||||
expect(secondDispatcher?.options?.connect).toEqual(
|
||||
@@ -822,17 +835,21 @@ describe("resolveTelegramFetch", () => {
|
||||
expect(loggerDebug).toHaveBeenCalledWith(
|
||||
expect.stringContaining("fetch fallback: enabling sticky IPv4-only dispatcher"),
|
||||
);
|
||||
expect(loggerDebug).toHaveBeenCalledWith(
|
||||
expect.stringContaining("fetch fallback: recovered from attempt 1 to attempt 0"),
|
||||
);
|
||||
expect(loggerWarn).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("fetch fallback: enabling sticky IPv4-only dispatcher"),
|
||||
);
|
||||
});
|
||||
|
||||
it("escalates from IPv4 fallback to pinned Telegram IP and keeps it sticky", async () => {
|
||||
it("escalates from IPv4 fallback to pinned Telegram IP and recovers to primary", async () => {
|
||||
undiciFetch
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"))
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockResolvedValueOnce({ ok: true } as Response);
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"));
|
||||
for (let i = 0; i < 7; i += 1) {
|
||||
undiciFetch.mockResolvedValueOnce({ ok: true } as Response);
|
||||
}
|
||||
|
||||
const resolved = resolveTelegramFetchOrThrow(undefined, {
|
||||
network: {
|
||||
@@ -842,20 +859,72 @@ describe("resolveTelegramFetch", () => {
|
||||
});
|
||||
|
||||
await resolved("https://api.telegram.org/botx/sendMessage");
|
||||
await resolved("https://api.telegram.org/botx/sendChatAction");
|
||||
for (let i = 0; i < 4; i += 1) {
|
||||
await resolved(`https://api.telegram.org/botx/sendChatAction?sticky=${i}`);
|
||||
}
|
||||
await resolved("https://api.telegram.org/botx/getMe");
|
||||
await resolved("https://api.telegram.org/botx/deleteWebhook");
|
||||
|
||||
expect(undiciFetch).toHaveBeenCalledTimes(4);
|
||||
expect(undiciFetch).toHaveBeenCalledTimes(9);
|
||||
|
||||
const firstDispatcher = getDispatcherFromUndiciCall(1);
|
||||
const secondDispatcher = getDispatcherFromUndiciCall(2);
|
||||
const thirdDispatcher = getDispatcherFromUndiciCall(3);
|
||||
const fourthDispatcher = getDispatcherFromUndiciCall(4);
|
||||
const seventhDispatcher = getDispatcherFromUndiciCall(7);
|
||||
const eighthDispatcher = getDispatcherFromUndiciCall(8);
|
||||
const ninthDispatcher = getDispatcherFromUndiciCall(9);
|
||||
|
||||
expect(secondDispatcher).not.toBe(thirdDispatcher);
|
||||
expect(thirdDispatcher).toBe(fourthDispatcher);
|
||||
expect(thirdDispatcher).toBe(seventhDispatcher);
|
||||
expect(eighthDispatcher).toBe(firstDispatcher);
|
||||
expect(ninthDispatcher).toBe(firstDispatcher);
|
||||
expectPinnedFallbackIpDispatcher(3);
|
||||
expect(loggerWarn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("fetch fallback: DNS-resolved IP unreachable"),
|
||||
);
|
||||
expect(loggerDebug).toHaveBeenCalledWith(
|
||||
expect.stringContaining("fetch fallback: recovered from attempt 2 to attempt 0"),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps sticky fallback after a failed primary recovery probe", async () => {
|
||||
undiciFetch
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockResolvedValueOnce({ ok: true } as Response);
|
||||
|
||||
const resolved = resolveTelegramFetchOrThrow(undefined, {
|
||||
network: {
|
||||
autoSelectFamily: true,
|
||||
},
|
||||
});
|
||||
|
||||
await resolved("https://api.telegram.org/botx/sendMessage");
|
||||
for (let i = 0; i < 4; i += 1) {
|
||||
await resolved(`https://api.telegram.org/botx/sendChatAction?sticky=${i}`);
|
||||
}
|
||||
await resolved("https://api.telegram.org/botx/getMe");
|
||||
await resolved("https://api.telegram.org/botx/deleteWebhook");
|
||||
|
||||
expect(undiciFetch).toHaveBeenCalledTimes(9);
|
||||
|
||||
const firstDispatcher = getDispatcherFromUndiciCall(1);
|
||||
const secondDispatcher = getDispatcherFromUndiciCall(2);
|
||||
|
||||
expect(firstDispatcher).not.toBe(secondDispatcher);
|
||||
expect(getDispatcherFromUndiciCall(6)).toBe(secondDispatcher);
|
||||
expect(getDispatcherFromUndiciCall(7)).toBe(firstDispatcher);
|
||||
expect(getDispatcherFromUndiciCall(8)).toBe(secondDispatcher);
|
||||
expect(getDispatcherFromUndiciCall(9)).toBe(secondDispatcher);
|
||||
expect(loggerDebug).toHaveBeenCalledWith(
|
||||
expect.stringContaining("fetch fallback: re-probing primary dispatcher"),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps the armed fallback sticky when all attempts fail", async () => {
|
||||
|
||||
@@ -41,6 +41,7 @@ const TELEGRAM_DISPATCHER_KEEP_ALIVE_TIMEOUT_MS = 30_000;
|
||||
const TELEGRAM_DISPATCHER_KEEP_ALIVE_MAX_TIMEOUT_MS = 600_000;
|
||||
const TELEGRAM_DISPATCHER_CONNECTIONS_PER_ORIGIN = 10;
|
||||
const TELEGRAM_DISPATCHER_PIPELINING = 1;
|
||||
const TELEGRAM_STICKY_FALLBACK_PRIMARY_PROBE_SUCCESS_THRESHOLD = 5;
|
||||
|
||||
type TelegramAgentPoolOptions = {
|
||||
allowH2: false;
|
||||
@@ -640,6 +641,14 @@ export function resolveTelegramTransport(
|
||||
});
|
||||
|
||||
let stickyAttemptIndex = 0;
|
||||
let stickySuccessCount = 0;
|
||||
let primaryProbeDue = false;
|
||||
|
||||
const resetStickyRecoveryProbe = (): void => {
|
||||
stickySuccessCount = 0;
|
||||
primaryProbeDue = false;
|
||||
};
|
||||
|
||||
const promoteStickyAttempt = (nextIndex: number, err: unknown, reason?: string): boolean => {
|
||||
if (nextIndex <= stickyAttemptIndex || nextIndex >= transportAttempts.length) {
|
||||
return false;
|
||||
@@ -655,14 +664,48 @@ export function resolveTelegramTransport(
|
||||
}
|
||||
}
|
||||
stickyAttemptIndex = nextIndex;
|
||||
resetStickyRecoveryProbe();
|
||||
return true;
|
||||
};
|
||||
|
||||
const recordSuccessfulAttempt = (attemptIndex: number): void => {
|
||||
if (stickyAttemptIndex === 0) {
|
||||
resetStickyRecoveryProbe();
|
||||
return;
|
||||
}
|
||||
|
||||
if (attemptIndex < stickyAttemptIndex) {
|
||||
log.debug(
|
||||
`fetch fallback: recovered from attempt ${stickyAttemptIndex} to attempt ${attemptIndex}`,
|
||||
);
|
||||
stickyAttemptIndex = attemptIndex;
|
||||
resetStickyRecoveryProbe();
|
||||
return;
|
||||
}
|
||||
|
||||
if (attemptIndex !== stickyAttemptIndex) {
|
||||
return;
|
||||
}
|
||||
|
||||
stickySuccessCount += 1;
|
||||
if (stickySuccessCount >= TELEGRAM_STICKY_FALLBACK_PRIMARY_PROBE_SUCCESS_THRESHOLD) {
|
||||
stickySuccessCount = 0;
|
||||
primaryProbeDue = true;
|
||||
log.debug("fetch fallback: scheduling primary dispatcher recovery probe");
|
||||
}
|
||||
};
|
||||
|
||||
const resolvedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => {
|
||||
const callerProvidedDispatcher = Boolean(
|
||||
(init as RequestInitWithDispatcher | undefined)?.dispatcher,
|
||||
);
|
||||
const startIndex = Math.min(stickyAttemptIndex, transportAttempts.length - 1);
|
||||
const stickyStartIndex = Math.min(stickyAttemptIndex, transportAttempts.length - 1);
|
||||
const primaryProbe = !callerProvidedDispatcher && primaryProbeDue && stickyStartIndex > 0;
|
||||
const startIndex = primaryProbe ? 0 : stickyStartIndex;
|
||||
if (primaryProbe) {
|
||||
primaryProbeDue = false;
|
||||
log.debug("fetch fallback: re-probing primary dispatcher after sticky fallback successes");
|
||||
}
|
||||
let err: unknown;
|
||||
|
||||
try {
|
||||
@@ -679,6 +722,9 @@ export function resolveTelegramTransport(
|
||||
flowId: randomUUID(),
|
||||
meta: { subsystem: "telegram-fetch" },
|
||||
});
|
||||
if (!callerProvidedDispatcher) {
|
||||
recordSuccessfulAttempt(startIndex);
|
||||
}
|
||||
return response;
|
||||
} catch (caught) {
|
||||
err = caught;
|
||||
@@ -708,6 +754,7 @@ export function resolveTelegramTransport(
|
||||
flowId: randomUUID(),
|
||||
meta: { subsystem: "telegram-fetch", fallbackAttempt: nextIndex },
|
||||
});
|
||||
recordSuccessfulAttempt(nextIndex);
|
||||
return response;
|
||||
} catch (caught) {
|
||||
err = caught;
|
||||
|
||||
Reference in New Issue
Block a user