diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 17d6eabf000..ea3031a6cc4 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -129,6 +129,44 @@ function collectMessagingMediaUrlsFromToolResult(result: unknown): string[] { return urls; } +function emitToolResultOutput(params: { + ctx: ToolHandlerContext; + toolName: string; + meta?: string; + isToolError: boolean; + result: unknown; + sanitizedResult: unknown; +}) { + const { ctx, toolName, meta, isToolError, result, sanitizedResult } = params; + if (!ctx.params.onToolResult) { + return; + } + + if (ctx.shouldEmitToolOutput()) { + const outputText = extractToolResultText(sanitizedResult); + if (outputText) { + ctx.emitToolOutput(toolName, meta, outputText); + } + return; + } + + if (isToolError) { + return; + } + + // emitToolOutput() already handles MEDIA: directives when enabled; this path + // only sends raw media URLs for non-verbose delivery mode. + const mediaPaths = filterToolResultMediaUrls(toolName, extractToolResultMediaPaths(result)); + if (mediaPaths.length === 0) { + return; + } + try { + void ctx.params.onToolResult({ mediaUrls: mediaPaths }); + } catch { + // ignore delivery failures + } +} + export async function handleToolExecutionStart( ctx: ToolHandlerContext, evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown }, @@ -371,26 +409,7 @@ export async function handleToolExecutionEnd( `embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`, ); - if (ctx.params.onToolResult && ctx.shouldEmitToolOutput()) { - const outputText = extractToolResultText(sanitizedResult); - if (outputText) { - ctx.emitToolOutput(toolName, meta, outputText); - } - } - - // Deliver media from tool results when the verbose emitToolOutput path is off. - // When shouldEmitToolOutput() is true, emitToolOutput already delivers media - // via parseReplyDirectives (MEDIA: text extraction), so skip to avoid duplicates. - if (ctx.params.onToolResult && !isToolError && !ctx.shouldEmitToolOutput()) { - const mediaPaths = filterToolResultMediaUrls(toolName, extractToolResultMediaPaths(result)); - if (mediaPaths.length > 0) { - try { - void ctx.params.onToolResult({ mediaUrls: mediaPaths }); - } catch { - // ignore delivery failures - } - } - } + emitToolResultOutput({ ctx, toolName, meta, isToolError, result, sanitizedResult }); // Run after_tool_call plugin hook (fire-and-forget) const hookRunnerAfter = ctx.hookRunner ?? getGlobalHookRunner(); diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 373bb66a5bf..443555fdd73 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -27,6 +27,13 @@ import type { TelegramStreamMode } from "./bot/types.js"; import type { TelegramInlineButtons } from "./button-types.js"; import { createTelegramDraftStream } from "./draft-stream.js"; import { renderTelegramHtmlText } from "./format.js"; +import { + type ArchivedPreview, + createLaneDeliveryStateTracker, + createLaneTextDeliverer, + type DraftLaneState, + type LaneName, +} from "./lane-delivery.js"; import { createTelegramReasoningStepState, splitTelegramReasoningText, @@ -149,13 +156,6 @@ export const dispatchTelegramMessage = async ({ replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); - type LaneName = "answer" | "reasoning"; - type DraftLaneState = { - stream: ReturnType | undefined; - lastPartialText: string; - hasStreamedMessage: boolean; - }; - type ArchivedPreview = { messageId: number; textSnapshot: string }; const archivedAnswerPreviews: ArchivedPreview[] = []; const archivedReasoningPreviewIds: number[] = []; const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => { @@ -332,11 +332,7 @@ export const dispatchTelegramMessage = async ({ ctxPayload.ReplyToIsQuote && ctxPayload.ReplyToBody ? ctxPayload.ReplyToBody.trim() || undefined : undefined; - const deliveryState = { - delivered: false, - skippedNonSilent: 0, - failedNonSilent: 0, - }; + const deliveryState = createLaneDeliveryStateTracker(); const finalizedPreviewByLane: Record = { answer: false, reasoning: false, @@ -360,78 +356,6 @@ export const dispatchTelegramMessage = async ({ linkPreview: telegramCfg.linkPreview, replyQuoteText, }; - const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText; - const tryUpdatePreviewForLane = async (params: { - lane: DraftLaneState; - laneName: LaneName; - text: string; - previewButtons?: TelegramInlineButtons; - stopBeforeEdit?: boolean; - updateLaneSnapshot?: boolean; - skipRegressive: "always" | "existingOnly"; - context: "final" | "update"; - previewMessageId?: number; - previewTextSnapshot?: string; - }): Promise => { - const { - lane, - laneName, - text, - previewButtons, - stopBeforeEdit = false, - updateLaneSnapshot = false, - skipRegressive, - context, - previewMessageId: previewMessageIdOverride, - previewTextSnapshot, - } = params; - if (!lane.stream) { - return false; - } - const lanePreviewMessageId = lane.stream.messageId(); - const hadPreviewMessage = - typeof previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number"; - if (stopBeforeEdit) { - await lane.stream.stop(); - } - const previewMessageId = - typeof previewMessageIdOverride === "number" - ? previewMessageIdOverride - : lane.stream.messageId(); - if (typeof previewMessageId !== "number") { - return false; - } - const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane); - const shouldSkipRegressive = - Boolean(currentPreviewText) && - currentPreviewText.startsWith(text) && - text.length < currentPreviewText.length && - (skipRegressive === "always" || hadPreviewMessage); - if (shouldSkipRegressive) { - // Avoid regressive punctuation/wording flicker from occasional shorter finals. - deliveryState.delivered = true; - return true; - } - try { - await editMessageTelegram(chatId, previewMessageId, text, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); - if (updateLaneSnapshot) { - lane.lastPartialText = text; - } - deliveryState.delivered = true; - return true; - } catch (err) { - logVerbose( - `telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`, - ); - return false; - } - }; const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => { if (payload.text === text) { return payload; @@ -445,138 +369,38 @@ export const dispatchTelegramMessage = async ({ onVoiceRecording: sendRecordVoice, }); if (result.delivered) { - deliveryState.delivered = true; + deliveryState.markDelivered(); } return result.delivered; }; - type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped"; - const consumeArchivedAnswerPreviewForFinal = async (params: { - lane: DraftLaneState; - text: string; - payload: ReplyPayload; - previewButtons?: TelegramInlineButtons; - canEditViaPreview: boolean; - }): Promise => { - const archivedPreview = archivedAnswerPreviews.shift(); - if (!archivedPreview) { - return undefined; - } - if (params.canEditViaPreview) { - const finalized = await tryUpdatePreviewForLane({ - lane: params.lane, - laneName: "answer", - text: params.text, - previewButtons: params.previewButtons, - stopBeforeEdit: false, - skipRegressive: "existingOnly", - context: "final", - previewMessageId: archivedPreview.messageId, - previewTextSnapshot: archivedPreview.textSnapshot, - }); - if (finalized) { - return "preview-finalized"; - } - } - try { - await bot.api.deleteMessage(chatId, archivedPreview.messageId); - } catch (err) { - logVerbose( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } - const delivered = await sendPayload(applyTextToPayload(params.payload, params.text)); - return delivered ? "sent" : "skipped"; - }; - const deliverLaneText = async (params: { - laneName: LaneName; - text: string; - payload: ReplyPayload; - infoKind: string; - previewButtons?: TelegramInlineButtons; - allowPreviewUpdateForNonFinal?: boolean; - }): Promise => { - const { - laneName, - text, - payload, - infoKind, - previewButtons, - allowPreviewUpdateForNonFinal = false, - } = params; - const lane = lanes[laneName]; - const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const canEditViaPreview = - !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; - - if (infoKind === "final") { - if (laneName === "answer") { - const archivedResult = await consumeArchivedAnswerPreviewForFinal({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }); - if (archivedResult) { - return archivedResult; - } - } - if (canEditViaPreview && !finalizedPreviewByLane[laneName]) { - await flushDraftLane(lane); - if (laneName === "answer") { - const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }); - if (archivedResultAfterFlush) { - return archivedResultAfterFlush; - } - } - const finalized = await tryUpdatePreviewForLane({ - lane, - laneName, - text, - previewButtons, - stopBeforeEdit: true, - skipRegressive: "existingOnly", - context: "final", - }); - if (finalized) { - finalizedPreviewByLane[laneName] = true; - return "preview-finalized"; - } - } else if (!hasMedia && !payload.isError && text.length > draftMaxChars) { - logVerbose( - `telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`, - ); - } + const deliverLaneText = createLaneTextDeliverer({ + lanes, + archivedAnswerPreviews, + finalizedPreviewByLane, + draftMaxChars, + applyTextToPayload, + sendPayload, + flushDraftLane, + stopDraftLane: async (lane) => { await lane.stream?.stop(); - const delivered = await sendPayload(applyTextToPayload(payload, text)); - return delivered ? "sent" : "skipped"; - } - - if (allowPreviewUpdateForNonFinal && canEditViaPreview) { - const updated = await tryUpdatePreviewForLane({ - lane, - laneName, - text, - previewButtons, - stopBeforeEdit: false, - updateLaneSnapshot: true, - skipRegressive: "always", - context: "update", + }, + editPreview: async ({ messageId, text, previewButtons }) => { + await editMessageTelegram(chatId, messageId, text, { + api: bot.api, + cfg, + accountId: route.accountId, + linkPreview: telegramCfg.linkPreview, + buttons: previewButtons, }); - if (updated) { - return "preview-updated"; - } - } - - const delivered = await sendPayload(applyTextToPayload(payload, text)); - return delivered ? "sent" : "skipped"; - }; + }, + deletePreviewMessage: async (messageId) => { + await bot.api.deleteMessage(chatId, messageId); + }, + log: logVerbose, + markDelivered: () => { + deliveryState.markDelivered(); + }, + }); let queuedFinal = false; @@ -675,11 +499,11 @@ export const dispatchTelegramMessage = async ({ }, onSkip: (_payload, info) => { if (info.reason !== "silent") { - deliveryState.skippedNonSilent += 1; + deliveryState.markNonSilentSkip(); } }, onError: (err, info) => { - deliveryState.failedNonSilent += 1; + deliveryState.markNonSilentFailure(); runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: createTypingCallbacks({ @@ -793,9 +617,10 @@ export const dispatchTelegramMessage = async ({ } } let sentFallback = false; + const deliverySummary = deliveryState.snapshot(); if ( - !deliveryState.delivered && - (deliveryState.skippedNonSilent > 0 || deliveryState.failedNonSilent > 0) + !deliverySummary.delivered && + (deliverySummary.skippedNonSilent > 0 || deliverySummary.failedNonSilent > 0) ) { const result = await deliverReplies({ replies: [{ text: EMPTY_RESPONSE_FALLBACK }], diff --git a/src/telegram/lane-delivery.ts b/src/telegram/lane-delivery.ts new file mode 100644 index 00000000000..91aa59dc888 --- /dev/null +++ b/src/telegram/lane-delivery.ts @@ -0,0 +1,286 @@ +import type { ReplyPayload } from "../auto-reply/types.js"; +import type { TelegramInlineButtons } from "./button-types.js"; +import type { TelegramDraftStream } from "./draft-stream.js"; + +export type LaneName = "answer" | "reasoning"; + +export type DraftLaneState = { + stream: TelegramDraftStream | undefined; + lastPartialText: string; + hasStreamedMessage: boolean; +}; + +export type ArchivedPreview = { + messageId: number; + textSnapshot: string; +}; + +export type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped"; + +export type LaneDeliverySnapshot = { + delivered: boolean; + skippedNonSilent: number; + failedNonSilent: number; +}; + +export type LaneDeliveryStateTracker = { + markDelivered: () => void; + markNonSilentSkip: () => void; + markNonSilentFailure: () => void; + snapshot: () => LaneDeliverySnapshot; +}; + +export function createLaneDeliveryStateTracker(): LaneDeliveryStateTracker { + const state: LaneDeliverySnapshot = { + delivered: false, + skippedNonSilent: 0, + failedNonSilent: 0, + }; + return { + markDelivered: () => { + state.delivered = true; + }, + markNonSilentSkip: () => { + state.skippedNonSilent += 1; + }, + markNonSilentFailure: () => { + state.failedNonSilent += 1; + }, + snapshot: () => ({ ...state }), + }; +} + +type CreateLaneTextDelivererParams = { + lanes: Record; + archivedAnswerPreviews: ArchivedPreview[]; + finalizedPreviewByLane: Record; + draftMaxChars: number; + applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload; + sendPayload: (payload: ReplyPayload) => Promise; + flushDraftLane: (lane: DraftLaneState) => Promise; + stopDraftLane: (lane: DraftLaneState) => Promise; + editPreview: (params: { + laneName: LaneName; + messageId: number; + text: string; + context: "final" | "update"; + previewButtons?: TelegramInlineButtons; + }) => Promise; + deletePreviewMessage: (messageId: number) => Promise; + log: (message: string) => void; + markDelivered: () => void; +}; + +type DeliverLaneTextParams = { + laneName: LaneName; + text: string; + payload: ReplyPayload; + infoKind: string; + previewButtons?: TelegramInlineButtons; + allowPreviewUpdateForNonFinal?: boolean; +}; + +type TryUpdatePreviewParams = { + lane: DraftLaneState; + laneName: LaneName; + text: string; + previewButtons?: TelegramInlineButtons; + stopBeforeEdit?: boolean; + updateLaneSnapshot?: boolean; + skipRegressive: "always" | "existingOnly"; + context: "final" | "update"; + previewMessageId?: number; + previewTextSnapshot?: string; +}; + +type ConsumeArchivedAnswerPreviewParams = { + lane: DraftLaneState; + text: string; + payload: ReplyPayload; + previewButtons?: TelegramInlineButtons; + canEditViaPreview: boolean; +}; + +export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { + const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText; + + const tryUpdatePreviewForLane = async ({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit = false, + updateLaneSnapshot = false, + skipRegressive, + context, + previewMessageId: previewMessageIdOverride, + previewTextSnapshot, + }: TryUpdatePreviewParams): Promise => { + if (!lane.stream) { + return false; + } + const lanePreviewMessageId = lane.stream.messageId(); + const hadPreviewMessage = + typeof previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number"; + if (stopBeforeEdit) { + await params.stopDraftLane(lane); + } + const previewMessageId = + typeof previewMessageIdOverride === "number" + ? previewMessageIdOverride + : lane.stream.messageId(); + if (typeof previewMessageId !== "number") { + return false; + } + const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane); + const shouldSkipRegressive = + Boolean(currentPreviewText) && + currentPreviewText.startsWith(text) && + text.length < currentPreviewText.length && + (skipRegressive === "always" || hadPreviewMessage); + if (shouldSkipRegressive) { + params.markDelivered(); + return true; + } + try { + await params.editPreview({ + laneName, + messageId: previewMessageId, + text, + previewButtons, + context, + }); + if (updateLaneSnapshot) { + lane.lastPartialText = text; + } + params.markDelivered(); + return true; + } catch (err) { + params.log( + `telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`, + ); + return false; + } + }; + + const consumeArchivedAnswerPreviewForFinal = async ({ + lane, + text, + payload, + previewButtons, + canEditViaPreview, + }: ConsumeArchivedAnswerPreviewParams): Promise => { + const archivedPreview = params.archivedAnswerPreviews.shift(); + if (!archivedPreview) { + return undefined; + } + if (canEditViaPreview) { + const finalized = await tryUpdatePreviewForLane({ + lane, + laneName: "answer", + text, + previewButtons, + stopBeforeEdit: false, + skipRegressive: "existingOnly", + context: "final", + previewMessageId: archivedPreview.messageId, + previewTextSnapshot: archivedPreview.textSnapshot, + }); + if (finalized) { + return "preview-finalized"; + } + } + try { + await params.deletePreviewMessage(archivedPreview.messageId); + } catch (err) { + params.log( + `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, + ); + } + const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; + }; + + return async ({ + laneName, + text, + payload, + infoKind, + previewButtons, + allowPreviewUpdateForNonFinal = false, + }: DeliverLaneTextParams): Promise => { + const lane = params.lanes[laneName]; + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + const canEditViaPreview = + !hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError; + + if (infoKind === "final") { + if (laneName === "answer") { + const archivedResult = await consumeArchivedAnswerPreviewForFinal({ + lane, + text, + payload, + previewButtons, + canEditViaPreview, + }); + if (archivedResult) { + return archivedResult; + } + } + if (canEditViaPreview && !params.finalizedPreviewByLane[laneName]) { + await params.flushDraftLane(lane); + if (laneName === "answer") { + const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({ + lane, + text, + payload, + previewButtons, + canEditViaPreview, + }); + if (archivedResultAfterFlush) { + return archivedResultAfterFlush; + } + } + const finalized = await tryUpdatePreviewForLane({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit: true, + skipRegressive: "existingOnly", + context: "final", + }); + if (finalized) { + params.finalizedPreviewByLane[laneName] = true; + return "preview-finalized"; + } + } else if (!hasMedia && !payload.isError && text.length > params.draftMaxChars) { + params.log( + `telegram: preview final too long for edit (${text.length} > ${params.draftMaxChars}); falling back to standard send`, + ); + } + await params.stopDraftLane(lane); + const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; + } + + if (allowPreviewUpdateForNonFinal && canEditViaPreview) { + const updated = await tryUpdatePreviewForLane({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit: false, + updateLaneSnapshot: true, + skipRegressive: "always", + context: "update", + }); + if (updated) { + return "preview-updated"; + } + } + + const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; + }; +} diff --git a/test/gateway.multi.e2e.test.ts b/test/gateway.multi.e2e.test.ts index 61f9de79b25..9d020f754d0 100644 --- a/test/gateway.multi.e2e.test.ts +++ b/test/gateway.multi.e2e.test.ts @@ -1,398 +1,32 @@ -import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import { randomUUID } from "node:crypto"; -import fs from "node:fs/promises"; -import { request as httpRequest } from "node:http"; -import net from "node:net"; -import os from "node:os"; -import path from "node:path"; import { afterAll, describe, expect, it } from "vitest"; import { GatewayClient } from "../src/gateway/client.js"; import { connectGatewayClient } from "../src/gateway/test-helpers.e2e.js"; -import { loadOrCreateDeviceIdentity } from "../src/infra/device-identity.js"; -import { sleep } from "../src/utils.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../src/utils/message-channel.js"; +import { + type ChatEventPayload, + type GatewayInstance, + connectNode, + extractFirstTextBlock, + postJson, + spawnGatewayInstance, + stopGatewayInstance, + waitForChatFinalEvent, + waitForNodeStatus, +} from "./helpers/gateway-e2e-harness.js"; -type GatewayInstance = { - name: string; - port: number; - hookToken: string; - gatewayToken: string; - homeDir: string; - stateDir: string; - configPath: string; - child: ChildProcessWithoutNullStreams; - stdout: string[]; - stderr: string[]; -}; - -type NodeListPayload = { - nodes?: Array<{ nodeId?: string; connected?: boolean; paired?: boolean }>; -}; - -type ChatEventPayload = { - runId?: string; - sessionKey?: string; - state?: string; - message?: unknown; -}; - -const GATEWAY_START_TIMEOUT_MS = 20_000; -const GATEWAY_STOP_TIMEOUT_MS = 1_500; const E2E_TIMEOUT_MS = 120_000; -const GATEWAY_CONNECT_STATUS_TIMEOUT_MS = 2_000; -const GATEWAY_NODE_STATUS_TIMEOUT_MS = 4_000; -const GATEWAY_NODE_STATUS_POLL_MS = 20; - -const getFreePort = async () => { - const srv = net.createServer(); - await new Promise((resolve) => srv.listen(0, "127.0.0.1", resolve)); - const addr = srv.address(); - if (!addr || typeof addr === "string") { - srv.close(); - throw new Error("failed to bind ephemeral port"); - } - await new Promise((resolve) => srv.close(() => resolve())); - return addr.port; -}; - -const waitForPortOpen = async ( - proc: ChildProcessWithoutNullStreams, - chunksOut: string[], - chunksErr: string[], - port: number, - timeoutMs: number, -) => { - const startedAt = Date.now(); - while (Date.now() - startedAt < timeoutMs) { - if (proc.exitCode !== null) { - const stdout = chunksOut.join(""); - const stderr = chunksErr.join(""); - throw new Error( - `gateway exited before listening (code=${String(proc.exitCode)} signal=${String(proc.signalCode)})\n` + - `--- stdout ---\n${stdout}\n--- stderr ---\n${stderr}`, - ); - } - - try { - await new Promise((resolve, reject) => { - const socket = net.connect({ host: "127.0.0.1", port }); - socket.once("connect", () => { - socket.destroy(); - resolve(); - }); - socket.once("error", (err) => { - socket.destroy(); - reject(err); - }); - }); - return; - } catch { - // keep polling - } - - await sleep(10); - } - const stdout = chunksOut.join(""); - const stderr = chunksErr.join(""); - throw new Error( - `timeout waiting for gateway to listen on port ${port}\n` + - `--- stdout ---\n${stdout}\n--- stderr ---\n${stderr}`, - ); -}; - -const spawnGatewayInstance = async (name: string): Promise => { - const port = await getFreePort(); - const hookToken = `token-${name}-${randomUUID()}`; - const gatewayToken = `gateway-${name}-${randomUUID()}`; - const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), `openclaw-e2e-${name}-`)); - const configDir = path.join(homeDir, ".openclaw"); - await fs.mkdir(configDir, { recursive: true }); - const configPath = path.join(configDir, "openclaw.json"); - const stateDir = path.join(configDir, "state"); - const config = { - gateway: { port, auth: { mode: "token", token: gatewayToken } }, - hooks: { enabled: true, token: hookToken, path: "/hooks" }, - }; - await fs.writeFile(configPath, JSON.stringify(config, null, 2), "utf8"); - - const stdout: string[] = []; - const stderr: string[] = []; - let child: ChildProcessWithoutNullStreams | null = null; - - try { - child = spawn( - "node", - [ - "dist/index.js", - "gateway", - "--port", - String(port), - "--bind", - "loopback", - "--allow-unconfigured", - ], - { - cwd: process.cwd(), - env: { - ...process.env, - HOME: homeDir, - OPENCLAW_CONFIG_PATH: configPath, - OPENCLAW_STATE_DIR: stateDir, - OPENCLAW_GATEWAY_TOKEN: "", - OPENCLAW_GATEWAY_PASSWORD: "", - OPENCLAW_SKIP_CHANNELS: "1", - OPENCLAW_SKIP_BROWSER_CONTROL_SERVER: "1", - OPENCLAW_SKIP_CANVAS_HOST: "1", - }, - stdio: ["ignore", "pipe", "pipe"], - }, - ); - - child.stdout?.setEncoding("utf8"); - child.stderr?.setEncoding("utf8"); - child.stdout?.on("data", (d) => stdout.push(String(d))); - child.stderr?.on("data", (d) => stderr.push(String(d))); - - await waitForPortOpen(child, stdout, stderr, port, GATEWAY_START_TIMEOUT_MS); - - return { - name, - port, - hookToken, - gatewayToken, - homeDir, - stateDir, - configPath, - child, - stdout, - stderr, - }; - } catch (err) { - if (child && child.exitCode === null && !child.killed) { - try { - child.kill("SIGKILL"); - } catch { - // ignore - } - } - await fs.rm(homeDir, { recursive: true, force: true }); - throw err; - } -}; - -const stopGatewayInstance = async (inst: GatewayInstance) => { - if (inst.child.exitCode === null && !inst.child.killed) { - try { - inst.child.kill("SIGTERM"); - } catch { - // ignore - } - } - const exited = await Promise.race([ - new Promise((resolve) => { - if (inst.child.exitCode !== null) { - return resolve(true); - } - inst.child.once("exit", () => resolve(true)); - }), - sleep(GATEWAY_STOP_TIMEOUT_MS).then(() => false), - ]); - if (!exited && inst.child.exitCode === null && !inst.child.killed) { - try { - inst.child.kill("SIGKILL"); - } catch { - // ignore - } - } - await fs.rm(inst.homeDir, { recursive: true, force: true }); -}; - -const postJson = async (url: string, body: unknown, headers?: Record) => { - const payload = JSON.stringify(body); - const parsed = new URL(url); - return await new Promise<{ status: number; json: unknown }>((resolve, reject) => { - const req = httpRequest( - { - method: "POST", - hostname: parsed.hostname, - port: Number(parsed.port), - path: `${parsed.pathname}${parsed.search}`, - headers: { - "Content-Type": "application/json", - "Content-Length": Buffer.byteLength(payload), - ...headers, - }, - }, - (res) => { - let data = ""; - res.setEncoding("utf8"); - res.on("data", (chunk) => { - data += chunk; - }); - res.on("end", () => { - let json: unknown = null; - if (data.trim()) { - try { - json = JSON.parse(data); - } catch { - json = data; - } - } - resolve({ status: res.statusCode ?? 0, json }); - }); - }, - ); - req.on("error", reject); - req.write(payload); - req.end(); - }); -}; - -const connectNode = async ( - inst: GatewayInstance, - label: string, -): Promise<{ client: GatewayClient; nodeId: string }> => { - const identityPath = path.join(inst.homeDir, `${label}-device.json`); - const deviceIdentity = loadOrCreateDeviceIdentity(identityPath); - const nodeId = deviceIdentity.deviceId; - const client = await connectGatewayClient({ - url: `ws://127.0.0.1:${inst.port}`, - token: inst.gatewayToken, - clientName: GATEWAY_CLIENT_NAMES.NODE_HOST, - clientDisplayName: label, - clientVersion: "1.0.0", - platform: "ios", - mode: GATEWAY_CLIENT_MODES.NODE, - role: "node", - scopes: [], - caps: ["system"], - commands: ["system.run"], - deviceIdentity, - timeoutMessage: `timeout waiting for ${label} to connect`, - }); - return { client, nodeId }; -}; - -const connectStatusClient = async ( - inst: GatewayInstance, - timeoutMs = GATEWAY_CONNECT_STATUS_TIMEOUT_MS, -): Promise => { - let settled = false; - let timer: NodeJS.Timeout | null = null; - - return await new Promise((resolve, reject) => { - const finish = (err?: Error) => { - if (settled) { - return; - } - settled = true; - if (timer) { - clearTimeout(timer); - } - if (err) { - reject(err); - return; - } - resolve(client); - }; - - const client = new GatewayClient({ - url: `ws://127.0.0.1:${inst.port}`, - connectDelayMs: 0, - token: inst.gatewayToken, - clientName: GATEWAY_CLIENT_NAMES.CLI, - clientDisplayName: `status-${inst.name}`, - clientVersion: "1.0.0", - platform: "test", - mode: GATEWAY_CLIENT_MODES.CLI, - onHelloOk: () => { - finish(); - }, - onConnectError: (err) => finish(err), - onClose: (code, reason) => { - finish(new Error(`gateway closed (${code}): ${reason}`)); - }, - }); - - timer = setTimeout(() => { - finish(new Error("timeout waiting for node.list")); - }, timeoutMs); - - client.start(); - }); -}; - -const waitForNodeStatus = async ( - inst: GatewayInstance, - nodeId: string, - timeoutMs = GATEWAY_NODE_STATUS_TIMEOUT_MS, -) => { - const deadline = Date.now() + timeoutMs; - const client = await connectStatusClient( - inst, - Math.min(GATEWAY_CONNECT_STATUS_TIMEOUT_MS, timeoutMs), - ); - try { - while (Date.now() < deadline) { - const list = await client.request("node.list", {}); - const match = list.nodes?.find((n) => n.nodeId === nodeId); - if (match?.connected && match?.paired) { - return; - } - await sleep(GATEWAY_NODE_STATUS_POLL_MS); - } - } finally { - client.stop(); - } - throw new Error(`timeout waiting for node status for ${nodeId}`); -}; - -function extractFirstTextBlock(message: unknown): string | undefined { - if (!message || typeof message !== "object") { - return undefined; - } - const content = (message as { content?: unknown }).content; - if (!Array.isArray(content) || content.length === 0) { - return undefined; - } - const first = content[0]; - if (!first || typeof first !== "object") { - return undefined; - } - const text = (first as { text?: unknown }).text; - return typeof text === "string" ? text : undefined; -} - -const waitForChatFinalEvent = async (params: { - events: ChatEventPayload[]; - runId: string; - sessionKey: string; - timeoutMs?: number; -}): Promise => { - const deadline = Date.now() + (params.timeoutMs ?? 15_000); - while (Date.now() < deadline) { - const match = params.events.find( - (evt) => - evt.runId === params.runId && evt.sessionKey === params.sessionKey && evt.state === "final", - ); - if (match) { - return match; - } - await sleep(20); - } - throw new Error(`timeout waiting for final chat event (runId=${params.runId})`); -}; describe("gateway multi-instance e2e", () => { const instances: GatewayInstance[] = []; const nodeClients: GatewayClient[] = []; - const webchatClients: GatewayClient[] = []; + const chatClients: GatewayClient[] = []; afterAll(async () => { for (const client of nodeClients) { client.stop(); } - for (const client of webchatClients) { + for (const client of chatClients) { client.stop(); } for (const inst of instances) { @@ -451,32 +85,29 @@ describe("gateway multi-instance e2e", () => { instances.push(gw); const chatEvents: ChatEventPayload[] = []; - const webchatClient = await connectGatewayClient({ + const chatClient = await connectGatewayClient({ url: `ws://127.0.0.1:${gw.port}`, token: gw.gatewayToken, - clientName: GATEWAY_CLIENT_NAMES.CONTROL_UI, - clientDisplayName: "chat-e2e", + clientName: GATEWAY_CLIENT_NAMES.CLI, + clientDisplayName: "chat-e2e-cli", clientVersion: "1.0.0", - platform: "web", - mode: GATEWAY_CLIENT_MODES.WEBCHAT, + platform: "test", + mode: GATEWAY_CLIENT_MODES.CLI, onEvent: (evt) => { if (evt.event === "chat" && evt.payload && typeof evt.payload === "object") { chatEvents.push(evt.payload as ChatEventPayload); } }, }); - webchatClients.push(webchatClient); + chatClients.push(chatClient); const sessionKey = "agent:main:telegram:direct:123456"; const idempotencyKey = `idem-${randomUUID()}`; - const sendRes = await webchatClient.request<{ runId?: string; status?: string }>( - "chat.send", - { - sessionKey, - message: "/context list", - idempotencyKey, - }, - ); + const sendRes = await chatClient.request<{ runId?: string; status?: string }>("chat.send", { + sessionKey, + message: "/context list", + idempotencyKey, + }); expect(sendRes.status).toBe("started"); const runId = sendRes.runId; expect(typeof runId).toBe("string"); diff --git a/test/helpers/gateway-e2e-harness.ts b/test/helpers/gateway-e2e-harness.ts new file mode 100644 index 00000000000..8a0990a18e7 --- /dev/null +++ b/test/helpers/gateway-e2e-harness.ts @@ -0,0 +1,395 @@ +import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import { request as httpRequest } from "node:http"; +import net from "node:net"; +import os from "node:os"; +import path from "node:path"; +import { GatewayClient } from "../../src/gateway/client.js"; +import { connectGatewayClient } from "../../src/gateway/test-helpers.e2e.js"; +import { loadOrCreateDeviceIdentity } from "../../src/infra/device-identity.js"; +import { sleep } from "../../src/utils.js"; +import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../../src/utils/message-channel.js"; + +type NodeListPayload = { + nodes?: Array<{ nodeId?: string; connected?: boolean; paired?: boolean }>; +}; + +export type ChatEventPayload = { + runId?: string; + sessionKey?: string; + state?: string; + message?: unknown; +}; + +export type GatewayInstance = { + name: string; + port: number; + hookToken: string; + gatewayToken: string; + homeDir: string; + stateDir: string; + configPath: string; + child: ChildProcessWithoutNullStreams; + stdout: string[]; + stderr: string[]; +}; + +const GATEWAY_START_TIMEOUT_MS = 60_000; +const GATEWAY_STOP_TIMEOUT_MS = 1_500; +const GATEWAY_CONNECT_STATUS_TIMEOUT_MS = 2_000; +const GATEWAY_NODE_STATUS_TIMEOUT_MS = 4_000; +const GATEWAY_NODE_STATUS_POLL_MS = 20; + +const getFreePort = async () => { + const srv = net.createServer(); + await new Promise((resolve) => srv.listen(0, "127.0.0.1", resolve)); + const addr = srv.address(); + if (!addr || typeof addr === "string") { + srv.close(); + throw new Error("failed to bind ephemeral port"); + } + await new Promise((resolve) => srv.close(() => resolve())); + return addr.port; +}; + +async function waitForPortOpen( + proc: ChildProcessWithoutNullStreams, + chunksOut: string[], + chunksErr: string[], + port: number, + timeoutMs: number, +) { + const startedAt = Date.now(); + while (Date.now() - startedAt < timeoutMs) { + if (proc.exitCode !== null) { + const stdout = chunksOut.join(""); + const stderr = chunksErr.join(""); + throw new Error( + `gateway exited before listening (code=${String(proc.exitCode)} signal=${String(proc.signalCode)})\n` + + `--- stdout ---\n${stdout}\n--- stderr ---\n${stderr}`, + ); + } + + try { + await new Promise((resolve, reject) => { + const socket = net.connect({ host: "127.0.0.1", port }); + socket.once("connect", () => { + socket.destroy(); + resolve(); + }); + socket.once("error", (err) => { + socket.destroy(); + reject(err); + }); + }); + return; + } catch { + // keep polling + } + + await sleep(10); + } + const stdout = chunksOut.join(""); + const stderr = chunksErr.join(""); + throw new Error( + `timeout waiting for gateway to listen on port ${port}\n` + + `--- stdout ---\n${stdout}\n--- stderr ---\n${stderr}`, + ); +} + +export async function spawnGatewayInstance(name: string): Promise { + const port = await getFreePort(); + const hookToken = `token-${name}-${randomUUID()}`; + const gatewayToken = `gateway-${name}-${randomUUID()}`; + const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), `openclaw-e2e-${name}-`)); + const configDir = path.join(homeDir, ".openclaw"); + await fs.mkdir(configDir, { recursive: true }); + const configPath = path.join(configDir, "openclaw.json"); + const stateDir = path.join(configDir, "state"); + const config = { + gateway: { + port, + auth: { mode: "token", token: gatewayToken }, + controlUi: { enabled: false }, + }, + hooks: { enabled: true, token: hookToken, path: "/hooks" }, + }; + await fs.writeFile(configPath, JSON.stringify(config, null, 2), "utf8"); + + const stdout: string[] = []; + const stderr: string[] = []; + let child: ChildProcessWithoutNullStreams | null = null; + + try { + child = spawn( + "node", + [ + "dist/index.js", + "gateway", + "--port", + String(port), + "--bind", + "loopback", + "--allow-unconfigured", + ], + { + cwd: process.cwd(), + env: { + ...process.env, + HOME: homeDir, + OPENCLAW_CONFIG_PATH: configPath, + OPENCLAW_STATE_DIR: stateDir, + OPENCLAW_GATEWAY_TOKEN: "", + OPENCLAW_GATEWAY_PASSWORD: "", + OPENCLAW_SKIP_CHANNELS: "1", + OPENCLAW_SKIP_PROVIDERS: "1", + OPENCLAW_SKIP_GMAIL_WATCHER: "1", + OPENCLAW_SKIP_CRON: "1", + OPENCLAW_SKIP_BROWSER_CONTROL_SERVER: "1", + OPENCLAW_SKIP_CANVAS_HOST: "1", + OPENCLAW_TEST_MINIMAL_GATEWAY: "1", + VITEST: "1", + }, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + + child.stdout?.setEncoding("utf8"); + child.stderr?.setEncoding("utf8"); + child.stdout?.on("data", (d) => stdout.push(String(d))); + child.stderr?.on("data", (d) => stderr.push(String(d))); + + await waitForPortOpen(child, stdout, stderr, port, GATEWAY_START_TIMEOUT_MS); + + return { + name, + port, + hookToken, + gatewayToken, + homeDir, + stateDir, + configPath, + child, + stdout, + stderr, + }; + } catch (err) { + if (child && child.exitCode === null && !child.killed) { + try { + child.kill("SIGKILL"); + } catch { + // ignore + } + } + await fs.rm(homeDir, { recursive: true, force: true }); + throw err; + } +} + +export async function stopGatewayInstance(inst: GatewayInstance) { + if (inst.child.exitCode === null && !inst.child.killed) { + try { + inst.child.kill("SIGTERM"); + } catch { + // ignore + } + } + const exited = await Promise.race([ + new Promise((resolve) => { + if (inst.child.exitCode !== null) { + return resolve(true); + } + inst.child.once("exit", () => resolve(true)); + }), + sleep(GATEWAY_STOP_TIMEOUT_MS).then(() => false), + ]); + if (!exited && inst.child.exitCode === null && !inst.child.killed) { + try { + inst.child.kill("SIGKILL"); + } catch { + // ignore + } + } + await fs.rm(inst.homeDir, { recursive: true, force: true }); +} + +export async function postJson( + url: string, + body: unknown, + headers?: Record, +): Promise<{ status: number; json: unknown }> { + const payload = JSON.stringify(body); + const parsed = new URL(url); + return await new Promise<{ status: number; json: unknown }>((resolve, reject) => { + const req = httpRequest( + { + method: "POST", + hostname: parsed.hostname, + port: Number(parsed.port), + path: `${parsed.pathname}${parsed.search}`, + headers: { + "Content-Type": "application/json", + "Content-Length": Buffer.byteLength(payload), + ...headers, + }, + }, + (res) => { + let data = ""; + res.setEncoding("utf8"); + res.on("data", (chunk) => { + data += chunk; + }); + res.on("end", () => { + let json: unknown = null; + if (data.trim()) { + try { + json = JSON.parse(data); + } catch { + json = data; + } + } + resolve({ status: res.statusCode ?? 0, json }); + }); + }, + ); + req.on("error", reject); + req.write(payload); + req.end(); + }); +} + +export async function connectNode( + inst: GatewayInstance, + label: string, +): Promise<{ client: GatewayClient; nodeId: string }> { + const identityPath = path.join(inst.homeDir, `${label}-device.json`); + const deviceIdentity = loadOrCreateDeviceIdentity(identityPath); + const nodeId = deviceIdentity.deviceId; + const client = await connectGatewayClient({ + url: `ws://127.0.0.1:${inst.port}`, + token: inst.gatewayToken, + clientName: GATEWAY_CLIENT_NAMES.NODE_HOST, + clientDisplayName: label, + clientVersion: "1.0.0", + platform: "ios", + mode: GATEWAY_CLIENT_MODES.NODE, + role: "node", + scopes: [], + caps: ["system"], + commands: ["system.run"], + deviceIdentity, + timeoutMessage: `timeout waiting for ${label} to connect`, + }); + return { client, nodeId }; +} + +async function connectStatusClient( + inst: GatewayInstance, + timeoutMs = GATEWAY_CONNECT_STATUS_TIMEOUT_MS, +): Promise { + let settled = false; + let timer: NodeJS.Timeout | null = null; + + return await new Promise((resolve, reject) => { + const finish = (err?: Error) => { + if (settled) { + return; + } + settled = true; + if (timer) { + clearTimeout(timer); + } + if (err) { + reject(err); + return; + } + resolve(client); + }; + + const client = new GatewayClient({ + url: `ws://127.0.0.1:${inst.port}`, + connectDelayMs: 0, + token: inst.gatewayToken, + clientName: GATEWAY_CLIENT_NAMES.CLI, + clientDisplayName: `status-${inst.name}`, + clientVersion: "1.0.0", + platform: "test", + mode: GATEWAY_CLIENT_MODES.CLI, + onHelloOk: () => { + finish(); + }, + onConnectError: (err) => finish(err), + onClose: (code, reason) => { + finish(new Error(`gateway closed (${code}): ${reason}`)); + }, + }); + + timer = setTimeout(() => { + finish(new Error("timeout waiting for node.list")); + }, timeoutMs); + + client.start(); + }); +} + +export async function waitForNodeStatus( + inst: GatewayInstance, + nodeId: string, + timeoutMs = GATEWAY_NODE_STATUS_TIMEOUT_MS, +) { + const deadline = Date.now() + timeoutMs; + const client = await connectStatusClient( + inst, + Math.min(GATEWAY_CONNECT_STATUS_TIMEOUT_MS, timeoutMs), + ); + try { + while (Date.now() < deadline) { + const list = await client.request("node.list", {}); + const match = list.nodes?.find((n) => n.nodeId === nodeId); + if (match?.connected && match?.paired) { + return; + } + await sleep(GATEWAY_NODE_STATUS_POLL_MS); + } + } finally { + client.stop(); + } + throw new Error(`timeout waiting for node status for ${nodeId}`); +} + +export function extractFirstTextBlock(message: unknown): string | undefined { + if (!message || typeof message !== "object") { + return undefined; + } + const content = (message as { content?: unknown }).content; + if (!Array.isArray(content) || content.length === 0) { + return undefined; + } + const first = content[0]; + if (!first || typeof first !== "object") { + return undefined; + } + const text = (first as { text?: unknown }).text; + return typeof text === "string" ? text : undefined; +} + +export async function waitForChatFinalEvent(params: { + events: ChatEventPayload[]; + runId: string; + sessionKey: string; + timeoutMs?: number; +}): Promise { + const deadline = Date.now() + (params.timeoutMs ?? 15_000); + while (Date.now() < deadline) { + const match = params.events.find( + (evt) => + evt.runId === params.runId && evt.sessionKey === params.sessionKey && evt.state === "final", + ); + if (match) { + return match; + } + await sleep(20); + } + throw new Error(`timeout waiting for final chat event (runId=${params.runId})`); +}