From bfc4e1dcb54c3bb9a611c9990c4e99c8ff9d06d8 Mon Sep 17 00:00:00 2001 From: Eva Date: Wed, 1 Jul 2026 09:34:35 -0700 Subject: [PATCH] fix(telegram): restore active-run steering --- .../app-server/run-attempt.steering.test.ts | 70 +++++++++++++++++++ .../codex/src/app-server/run-attempt.ts | 5 +- .../telegram/src/sequential-key.test.ts | 47 +++++++++++++ extensions/telegram/src/sequential-key.ts | 49 +++++++++++++ .../reply/commands-steer.runtime.ts | 1 + src/auto-reply/reply/commands-steer.test.ts | 70 +++++++++++++++++++ src/auto-reply/reply/commands-steer.ts | 46 +++++++++--- 7 files changed, 278 insertions(+), 10 deletions(-) diff --git a/extensions/codex/src/app-server/run-attempt.steering.test.ts b/extensions/codex/src/app-server/run-attempt.steering.test.ts index a26be137e343..0f39fc21e83a 100644 --- a/extensions/codex/src/app-server/run-attempt.steering.test.ts +++ b/extensions/codex/src/app-server/run-attempt.steering.test.ts @@ -17,6 +17,30 @@ import { turnStartResult, } from "./run-attempt-test-harness.js"; +const activeRunRegistrationMocks = vi.hoisted(() => ({ + clearActiveEmbeddedRun: vi.fn(), + setActiveEmbeddedRun: vi.fn(), +})); + +vi.mock("openclaw/plugin-sdk/agent-harness-runtime", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + clearActiveEmbeddedRun: ( + ...args: Parameters + ): ReturnType => { + activeRunRegistrationMocks.clearActiveEmbeddedRun(...args); + return actual.clearActiveEmbeddedRun(...args); + }, + setActiveEmbeddedRun: ( + ...args: Parameters + ): ReturnType => { + activeRunRegistrationMocks.setActiveEmbeddedRun(...args); + return actual.setActiveEmbeddedRun(...args); + }, + }; +}); + setupRunAttemptTestHooks(); let steeringSessionIndex = 0; @@ -121,6 +145,52 @@ describe("runCodexAppServerAttempt steering", () => { await run; }); + it("passes session files through active Codex app-server registration for command lookup", async () => { + const { requests, waitForMethod, completeTurn } = createStartedThreadHarness(); + const params = createSteeringParams(); + activeRunRegistrationMocks.setActiveEmbeddedRun.mockClear(); + activeRunRegistrationMocks.clearActiveEmbeddedRun.mockClear(); + + const run = runCodexAppServerAttempt(params); + await waitForMethod("turn/start"); + + expect(activeRunRegistrationMocks.setActiveEmbeddedRun).toHaveBeenCalledWith( + params.sessionId, + expect.anything(), + params.sessionKey, + params.sessionFile, + ); + + await waitAndQueueActiveRunMessage(params.sessionId, "session-file registered", { + debounceMs: 0, + }); + + await vi.waitFor( + () => + expect(requests.filter((entry) => entry.method === "turn/steer")).toEqual([ + { + method: "turn/steer", + params: { + threadId: "thread-1", + expectedTurnId: "turn-1", + input: [{ type: "text", text: "session-file registered", text_elements: [] }], + }, + }, + ]), + fastWait, + ); + + await completeTurn({ threadId: "thread-1", turnId: "turn-1" }); + await run; + + expect(activeRunRegistrationMocks.clearActiveEmbeddedRun).toHaveBeenCalledWith( + params.sessionId, + expect.anything(), + params.sessionKey, + params.sessionFile, + ); + }); + it("flushes batched default queued steering during normal turn cleanup", async () => { const { requests, waitForMethod, completeTurn } = createStartedThreadHarness(); const params = createSteeringParams(); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 7f7a6383c1a0..209301dd7ed8 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -2886,12 +2886,13 @@ export async function runCodexAppServerAttempt( queueMessage: async (text: string, optionsLocal?: CodexSteeringQueueOptions) => activeSteeringQueue.queue(text, optionsLocal), isStreaming: () => !completed && !runAbortController.signal.aborted, + isStopped: () => completed || timedOut || runAbortController.signal.aborted, isCompacting: () => projectorRef.current?.isCompacting() ?? false, sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, cancel: () => runAbortController.abort("cancelled"), abort: () => runAbortController.abort("aborted"), }; - setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey); + setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey, params.sessionFile); const notifyUserMessagePersisted = createCodexAppServerUserMessagePersistenceNotifier(params); void mirrorPromptAtTurnStartBestEffort({ params, @@ -3281,7 +3282,7 @@ export async function runCodexAppServerAttempt( runAbortController.signal.removeEventListener("abort", abortListener); params.abortSignal?.removeEventListener("abort", abortFromUpstream); steeringQueueRef.current?.cancel(); - clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey); + clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey, params.sessionFile); } } diff --git a/extensions/telegram/src/sequential-key.test.ts b/extensions/telegram/src/sequential-key.test.ts index 4bb1ba07eb7e..49d694735f57 100644 --- a/extensions/telegram/src/sequential-key.test.ts +++ b/extensions/telegram/src/sequential-key.test.ts @@ -79,6 +79,18 @@ describe("getTelegramSequentialKey", () => { { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/stop" }) }, "telegram:123:control", ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/steer keep going" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/tell use the cache" }) }, + "telegram:123:control", + ], + [ + { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/queue status" }) }, + "telegram:123:control", + ], [ { message: mockMessage({ @@ -90,6 +102,41 @@ describe("getTelegramSequentialKey", () => { }, "telegram:-100:control", ], + [ + { + message: mockMessage({ + chat: mockChat({ id: -100, type: "supergroup", is_forum: true }), + is_topic_message: true, + message_thread_id: 5907, + text: "/steer@vacs_tars_bot keep going", + }), + }, + "telegram:-100:control", + ], + [ + { + me: { username: "openclaw_bot" } as never, + message: mockMessage({ + chat: mockChat({ id: -100, type: "supergroup", is_forum: true }), + is_topic_message: true, + message_thread_id: 5907, + text: "/tell@openclaw_bot keep going!", + }), + }, + "telegram:-100:control", + ], + [ + { + me: { username: "openclaw_bot" } as never, + message: mockMessage({ + chat: mockChat({ id: -100, type: "supergroup", is_forum: true }), + is_topic_message: true, + message_thread_id: 5907, + text: "/queue@some_other_bot status", + }), + }, + "telegram:-100:topic:5907", + ], [ { me: { username: "openclaw_bot" } as never, diff --git a/extensions/telegram/src/sequential-key.ts b/extensions/telegram/src/sequential-key.ts index 6fd5c7e9ae00..736760daa141 100644 --- a/extensions/telegram/src/sequential-key.ts +++ b/extensions/telegram/src/sequential-key.ts @@ -25,6 +25,8 @@ const TELEGRAM_READ_ONLY_STATUS_COMMAND_KEYS = new Set([ "whoami", ]); +const TELEGRAM_ACTIVE_RUN_CONTROL_COMMAND_KEYS = new Set(["queue", "steer"]); + type TelegramSequentialKeyContext = { chat?: { id?: number }; me?: UserFromGetMe; @@ -80,6 +82,50 @@ function isTelegramTargetedStopCommand(rawText?: string, botUsername?: string): return match[1]?.toLowerCase() === normalizedBotUsername; } +function resolveTelegramCommandAliasForControlLane( + rawText?: string, + botUsername?: string, +): string | undefined { + const trimmed = rawText?.trim(); + if (!trimmed?.startsWith("/")) { + return undefined; + } + + const targetedMatch = trimmed.match( + /^\/([A-Za-z0-9_-]+)(?:@([A-Za-z0-9_]+))?(?:$|\s|[.!?…,,。;;::'"’”)\]}])/iu, + ); + const targetBotUsername = targetedMatch?.[2]?.trim().toLowerCase(); + const normalizedBotUsername = botUsername?.trim().toLowerCase(); + if (targetBotUsername && normalizedBotUsername && targetBotUsername !== normalizedBotUsername) { + return undefined; + } + + if (targetBotUsername && !normalizedBotUsername) { + const commandAlias = `/${targetedMatch?.[1]?.toLowerCase() ?? ""}`; + return commandAlias === "/" ? undefined : commandAlias; + } + + return ( + maybeResolveTextAlias( + normalizeCommandBody(trimmed, botUsername ? { botUsername } : undefined), + ) ?? undefined + ); +} + +function isTelegramActiveRunControlLaneText(params: { + rawText?: string; + botUsername?: string; +}): boolean { + const alias = resolveTelegramCommandAliasForControlLane(params.rawText, params.botUsername); + if (!alias) { + return false; + } + const command = listChatCommands().find((entry) => + entry.textAliases.some((candidate) => candidate.trim().toLowerCase() === alias), + ); + return command ? TELEGRAM_ACTIVE_RUN_CONTROL_COMMAND_KEYS.has(command.key) : false; +} + export function isTelegramControlLaneText(params: { rawText?: string; botUsername?: string; @@ -95,6 +141,9 @@ export function isTelegramControlLaneText(params: { if (isTelegramTargetedStopCommand(params.rawText, params.botUsername)) { return true; } + if (isTelegramActiveRunControlLaneText(params)) { + return true; + } return isTelegramReadOnlyControlLaneText(params); } diff --git a/src/auto-reply/reply/commands-steer.runtime.ts b/src/auto-reply/reply/commands-steer.runtime.ts index 7c8f784ea0a6..9c4ca4458b10 100644 --- a/src/auto-reply/reply/commands-steer.runtime.ts +++ b/src/auto-reply/reply/commands-steer.runtime.ts @@ -5,4 +5,5 @@ export { queueEmbeddedAgentMessage, queueEmbeddedAgentMessageWithOutcomeAsync, resolveActiveEmbeddedRunSessionId, + resolveActiveEmbeddedRunSessionIdBySessionFile, } from "../../agents/embedded-agent-runner/runs.js"; diff --git a/src/auto-reply/reply/commands-steer.test.ts b/src/auto-reply/reply/commands-steer.test.ts index eec6c039d7c9..b0d18dcbcd66 100644 --- a/src/auto-reply/reply/commands-steer.test.ts +++ b/src/auto-reply/reply/commands-steer.test.ts @@ -8,6 +8,7 @@ const steerRuntimeMocks = vi.hoisted(() => ({ isEmbeddedAgentRunActive: vi.fn(), queueEmbeddedAgentMessageWithOutcomeAsync: vi.fn(), resolveActiveEmbeddedRunSessionId: vi.fn(), + resolveActiveEmbeddedRunSessionIdBySessionFile: vi.fn(), })); vi.mock("./commands-steer.runtime.js", () => steerRuntimeMocks); @@ -38,6 +39,9 @@ describe("handleSteerCommand", () => { gatewayHealth: "live", }); steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReset().mockReturnValue(undefined); + steerRuntimeMocks.resolveActiveEmbeddedRunSessionIdBySessionFile + .mockReset() + .mockReturnValue(undefined); }); it("queues steering for the active current text-command session", async () => { @@ -107,6 +111,72 @@ describe("handleSteerCommand", () => { ); }); + it("resolves an active run from the target session file before stored session id fallback", async () => { + steerRuntimeMocks.resolveActiveEmbeddedRunSessionIdBySessionFile.mockReturnValue( + "session-file-active", + ); + + const params = buildParams("/steer check the active file"); + params.ctx.CommandSource = "native"; + params.ctx.CommandTargetSessionKey = "agent:main:telegram:topic:5907"; + params.sessionKey = "agent:main:telegram:control"; + params.sessionStore = { + "agent:main:telegram:topic:5907": { + sessionId: "stored-session-id", + sessionFile: "/tmp/openclaw-topic-5907.jsonl", + updatedAt: Date.now(), + }, + }; + + await handleSteerCommand(params, true); + + expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenCalledWith( + "agent:main:telegram:topic:5907", + ); + expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionIdBySessionFile).toHaveBeenCalledWith( + "/tmp/openclaw-topic-5907.jsonl", + ); + expect(steerRuntimeMocks.isEmbeddedAgentRunActive).not.toHaveBeenCalledWith( + "stored-session-id", + ); + expect(steerRuntimeMocks.queueEmbeddedAgentMessageWithOutcomeAsync).toHaveBeenCalledWith( + "session-file-active", + "check the active file", + { + steeringMode: "all", + debounceMs: 0, + }, + ); + }); + + it("falls back from a slash-lane command session to an active direct sibling", async () => { + steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockImplementation((key: string) => + key === "agent:main:telegram:direct:123" ? "session-direct-active" : undefined, + ); + + const params = buildParams("/steer use the active direct lane"); + params.sessionKey = "agent:main:telegram:slash:123"; + + await handleSteerCommand(params, true); + + expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenNthCalledWith( + 1, + "agent:main:telegram:slash:123", + ); + expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenNthCalledWith( + 2, + "agent:main:telegram:direct:123", + ); + expect(steerRuntimeMocks.queueEmbeddedAgentMessageWithOutcomeAsync).toHaveBeenCalledWith( + "session-direct-active", + "use the active direct lane", + { + steeringMode: "all", + debounceMs: 0, + }, + ); + }); + it("returns usage for an empty steer command", async () => { const result = await handleSteerCommand(buildParams("/steer"), true); diff --git a/src/auto-reply/reply/commands-steer.ts b/src/auto-reply/reply/commands-steer.ts index 392aefa31627..e317917587cb 100644 --- a/src/auto-reply/reply/commands-steer.ts +++ b/src/auto-reply/reply/commands-steer.ts @@ -13,6 +13,7 @@ import { isEmbeddedAgentRunActive, queueEmbeddedAgentMessageWithOutcomeAsync, resolveActiveEmbeddedRunSessionId, + resolveActiveEmbeddedRunSessionIdBySessionFile, } from "./commands-steer.runtime.js"; import type { CommandHandler, @@ -57,21 +58,50 @@ function resolveStoredSessionEntry( return undefined; } +function listSteerCandidateSessionKeys(targetSessionKey: string): string[] { + const candidates = [targetSessionKey]; + if (targetSessionKey.includes(":slash:")) { + candidates.push( + targetSessionKey.replace(":slash:", ":direct:"), + targetSessionKey.replace(":slash:", ":dm:"), + ); + } + return [...new Set(candidates)]; +} + function resolveSteerSessionId(params: { commandParams: HandleCommandsParams; targetSessionKey: string; }): string | undefined { - const activeSessionId = resolveActiveEmbeddedRunSessionId(params.targetSessionKey); - if (activeSessionId) { - return activeSessionId; + const candidateKeys = listSteerCandidateSessionKeys(params.targetSessionKey); + for (const candidateKey of candidateKeys) { + const activeSessionId = resolveActiveEmbeddedRunSessionId(candidateKey); + if (activeSessionId) { + return activeSessionId; + } } - const entry = resolveStoredSessionEntry(params.commandParams, params.targetSessionKey); - const sessionId = normalizeOptionalString(entry?.sessionId); - if (!sessionId || !isEmbeddedAgentRunActive(sessionId)) { - return undefined; + for (const candidateKey of candidateKeys) { + const entry = resolveStoredSessionEntry(params.commandParams, candidateKey); + const sessionFile = normalizeOptionalString(entry?.sessionFile); + if (!sessionFile) { + continue; + } + const activeSessionId = resolveActiveEmbeddedRunSessionIdBySessionFile(sessionFile); + if (activeSessionId) { + return activeSessionId; + } } - return sessionId; + + for (const candidateKey of candidateKeys) { + const entry = resolveStoredSessionEntry(params.commandParams, candidateKey); + const sessionId = normalizeOptionalString(entry?.sessionId); + if (sessionId && isEmbeddedAgentRunActive(sessionId)) { + return sessionId; + } + } + + return undefined; } function applySteerFallbackPrompt(ctx: HandleCommandsParams["ctx"], message: string): void {