fix(telegram): restore active-run steering

This commit is contained in:
Eva
2026-07-01 09:34:35 -07:00
committed by Ayaan Zaidi
parent 4895a00b94
commit bfc4e1dcb5
7 changed files with 278 additions and 10 deletions

View File

@@ -17,6 +17,30 @@ import {
turnStartResult,
} from "./run-attempt-test-harness.js";
const activeRunRegistrationMocks = vi.hoisted(() => ({
clearActiveEmbeddedRun: vi.fn(),
setActiveEmbeddedRun: vi.fn(),
}));
vi.mock("openclaw/plugin-sdk/agent-harness-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/agent-harness-runtime")>();
return {
...actual,
clearActiveEmbeddedRun: (
...args: Parameters<typeof actual.clearActiveEmbeddedRun>
): ReturnType<typeof actual.clearActiveEmbeddedRun> => {
activeRunRegistrationMocks.clearActiveEmbeddedRun(...args);
return actual.clearActiveEmbeddedRun(...args);
},
setActiveEmbeddedRun: (
...args: Parameters<typeof actual.setActiveEmbeddedRun>
): ReturnType<typeof actual.setActiveEmbeddedRun> => {
activeRunRegistrationMocks.setActiveEmbeddedRun(...args);
return actual.setActiveEmbeddedRun(...args);
},
};
});
setupRunAttemptTestHooks();
let steeringSessionIndex = 0;
@@ -121,6 +145,52 @@ describe("runCodexAppServerAttempt steering", () => {
await run;
});
it("passes session files through active Codex app-server registration for command lookup", async () => {
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
const params = createSteeringParams();
activeRunRegistrationMocks.setActiveEmbeddedRun.mockClear();
activeRunRegistrationMocks.clearActiveEmbeddedRun.mockClear();
const run = runCodexAppServerAttempt(params);
await waitForMethod("turn/start");
expect(activeRunRegistrationMocks.setActiveEmbeddedRun).toHaveBeenCalledWith(
params.sessionId,
expect.anything(),
params.sessionKey,
params.sessionFile,
);
await waitAndQueueActiveRunMessage(params.sessionId, "session-file registered", {
debounceMs: 0,
});
await vi.waitFor(
() =>
expect(requests.filter((entry) => entry.method === "turn/steer")).toEqual([
{
method: "turn/steer",
params: {
threadId: "thread-1",
expectedTurnId: "turn-1",
input: [{ type: "text", text: "session-file registered", text_elements: [] }],
},
},
]),
fastWait,
);
await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
expect(activeRunRegistrationMocks.clearActiveEmbeddedRun).toHaveBeenCalledWith(
params.sessionId,
expect.anything(),
params.sessionKey,
params.sessionFile,
);
});
it("flushes batched default queued steering during normal turn cleanup", async () => {
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
const params = createSteeringParams();

View File

@@ -2886,12 +2886,13 @@ export async function runCodexAppServerAttempt(
queueMessage: async (text: string, optionsLocal?: CodexSteeringQueueOptions) =>
activeSteeringQueue.queue(text, optionsLocal),
isStreaming: () => !completed && !runAbortController.signal.aborted,
isStopped: () => completed || timedOut || runAbortController.signal.aborted,
isCompacting: () => projectorRef.current?.isCompacting() ?? false,
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
cancel: () => runAbortController.abort("cancelled"),
abort: () => runAbortController.abort("aborted"),
};
setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey, params.sessionFile);
const notifyUserMessagePersisted = createCodexAppServerUserMessagePersistenceNotifier(params);
void mirrorPromptAtTurnStartBestEffort({
params,
@@ -3281,7 +3282,7 @@ export async function runCodexAppServerAttempt(
runAbortController.signal.removeEventListener("abort", abortListener);
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
steeringQueueRef.current?.cancel();
clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
clearActiveEmbeddedRun(params.sessionId, handle, params.sessionKey, params.sessionFile);
}
}

View File

@@ -79,6 +79,18 @@ describe("getTelegramSequentialKey", () => {
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/stop" }) },
"telegram:123:control",
],
[
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/steer keep going" }) },
"telegram:123:control",
],
[
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/tell use the cache" }) },
"telegram:123:control",
],
[
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/queue status" }) },
"telegram:123:control",
],
[
{
message: mockMessage({
@@ -90,6 +102,41 @@ describe("getTelegramSequentialKey", () => {
},
"telegram:-100:control",
],
[
{
message: mockMessage({
chat: mockChat({ id: -100, type: "supergroup", is_forum: true }),
is_topic_message: true,
message_thread_id: 5907,
text: "/steer@vacs_tars_bot keep going",
}),
},
"telegram:-100:control",
],
[
{
me: { username: "openclaw_bot" } as never,
message: mockMessage({
chat: mockChat({ id: -100, type: "supergroup", is_forum: true }),
is_topic_message: true,
message_thread_id: 5907,
text: "/tell@openclaw_bot keep going!",
}),
},
"telegram:-100:control",
],
[
{
me: { username: "openclaw_bot" } as never,
message: mockMessage({
chat: mockChat({ id: -100, type: "supergroup", is_forum: true }),
is_topic_message: true,
message_thread_id: 5907,
text: "/queue@some_other_bot status",
}),
},
"telegram:-100:topic:5907",
],
[
{
me: { username: "openclaw_bot" } as never,

View File

@@ -25,6 +25,8 @@ const TELEGRAM_READ_ONLY_STATUS_COMMAND_KEYS = new Set([
"whoami",
]);
const TELEGRAM_ACTIVE_RUN_CONTROL_COMMAND_KEYS = new Set(["queue", "steer"]);
type TelegramSequentialKeyContext = {
chat?: { id?: number };
me?: UserFromGetMe;
@@ -80,6 +82,50 @@ function isTelegramTargetedStopCommand(rawText?: string, botUsername?: string):
return match[1]?.toLowerCase() === normalizedBotUsername;
}
function resolveTelegramCommandAliasForControlLane(
rawText?: string,
botUsername?: string,
): string | undefined {
const trimmed = rawText?.trim();
if (!trimmed?.startsWith("/")) {
return undefined;
}
const targetedMatch = trimmed.match(
/^\/([A-Za-z0-9_-]+)(?:@([A-Za-z0-9_]+))?(?:$|\s|[.!?,;:'")\]}])/iu,
);
const targetBotUsername = targetedMatch?.[2]?.trim().toLowerCase();
const normalizedBotUsername = botUsername?.trim().toLowerCase();
if (targetBotUsername && normalizedBotUsername && targetBotUsername !== normalizedBotUsername) {
return undefined;
}
if (targetBotUsername && !normalizedBotUsername) {
const commandAlias = `/${targetedMatch?.[1]?.toLowerCase() ?? ""}`;
return commandAlias === "/" ? undefined : commandAlias;
}
return (
maybeResolveTextAlias(
normalizeCommandBody(trimmed, botUsername ? { botUsername } : undefined),
) ?? undefined
);
}
function isTelegramActiveRunControlLaneText(params: {
rawText?: string;
botUsername?: string;
}): boolean {
const alias = resolveTelegramCommandAliasForControlLane(params.rawText, params.botUsername);
if (!alias) {
return false;
}
const command = listChatCommands().find((entry) =>
entry.textAliases.some((candidate) => candidate.trim().toLowerCase() === alias),
);
return command ? TELEGRAM_ACTIVE_RUN_CONTROL_COMMAND_KEYS.has(command.key) : false;
}
export function isTelegramControlLaneText(params: {
rawText?: string;
botUsername?: string;
@@ -95,6 +141,9 @@ export function isTelegramControlLaneText(params: {
if (isTelegramTargetedStopCommand(params.rawText, params.botUsername)) {
return true;
}
if (isTelegramActiveRunControlLaneText(params)) {
return true;
}
return isTelegramReadOnlyControlLaneText(params);
}

View File

@@ -5,4 +5,5 @@ export {
queueEmbeddedAgentMessage,
queueEmbeddedAgentMessageWithOutcomeAsync,
resolveActiveEmbeddedRunSessionId,
resolveActiveEmbeddedRunSessionIdBySessionFile,
} from "../../agents/embedded-agent-runner/runs.js";

View File

@@ -8,6 +8,7 @@ const steerRuntimeMocks = vi.hoisted(() => ({
isEmbeddedAgentRunActive: vi.fn(),
queueEmbeddedAgentMessageWithOutcomeAsync: vi.fn(),
resolveActiveEmbeddedRunSessionId: vi.fn(),
resolveActiveEmbeddedRunSessionIdBySessionFile: vi.fn(),
}));
vi.mock("./commands-steer.runtime.js", () => steerRuntimeMocks);
@@ -38,6 +39,9 @@ describe("handleSteerCommand", () => {
gatewayHealth: "live",
});
steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReset().mockReturnValue(undefined);
steerRuntimeMocks.resolveActiveEmbeddedRunSessionIdBySessionFile
.mockReset()
.mockReturnValue(undefined);
});
it("queues steering for the active current text-command session", async () => {
@@ -107,6 +111,72 @@ describe("handleSteerCommand", () => {
);
});
it("resolves an active run from the target session file before stored session id fallback", async () => {
steerRuntimeMocks.resolveActiveEmbeddedRunSessionIdBySessionFile.mockReturnValue(
"session-file-active",
);
const params = buildParams("/steer check the active file");
params.ctx.CommandSource = "native";
params.ctx.CommandTargetSessionKey = "agent:main:telegram:topic:5907";
params.sessionKey = "agent:main:telegram:control";
params.sessionStore = {
"agent:main:telegram:topic:5907": {
sessionId: "stored-session-id",
sessionFile: "/tmp/openclaw-topic-5907.jsonl",
updatedAt: Date.now(),
},
};
await handleSteerCommand(params, true);
expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenCalledWith(
"agent:main:telegram:topic:5907",
);
expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionIdBySessionFile).toHaveBeenCalledWith(
"/tmp/openclaw-topic-5907.jsonl",
);
expect(steerRuntimeMocks.isEmbeddedAgentRunActive).not.toHaveBeenCalledWith(
"stored-session-id",
);
expect(steerRuntimeMocks.queueEmbeddedAgentMessageWithOutcomeAsync).toHaveBeenCalledWith(
"session-file-active",
"check the active file",
{
steeringMode: "all",
debounceMs: 0,
},
);
});
it("falls back from a slash-lane command session to an active direct sibling", async () => {
steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockImplementation((key: string) =>
key === "agent:main:telegram:direct:123" ? "session-direct-active" : undefined,
);
const params = buildParams("/steer use the active direct lane");
params.sessionKey = "agent:main:telegram:slash:123";
await handleSteerCommand(params, true);
expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenNthCalledWith(
1,
"agent:main:telegram:slash:123",
);
expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenNthCalledWith(
2,
"agent:main:telegram:direct:123",
);
expect(steerRuntimeMocks.queueEmbeddedAgentMessageWithOutcomeAsync).toHaveBeenCalledWith(
"session-direct-active",
"use the active direct lane",
{
steeringMode: "all",
debounceMs: 0,
},
);
});
it("returns usage for an empty steer command", async () => {
const result = await handleSteerCommand(buildParams("/steer"), true);

View File

@@ -13,6 +13,7 @@ import {
isEmbeddedAgentRunActive,
queueEmbeddedAgentMessageWithOutcomeAsync,
resolveActiveEmbeddedRunSessionId,
resolveActiveEmbeddedRunSessionIdBySessionFile,
} from "./commands-steer.runtime.js";
import type {
CommandHandler,
@@ -57,21 +58,50 @@ function resolveStoredSessionEntry(
return undefined;
}
function listSteerCandidateSessionKeys(targetSessionKey: string): string[] {
const candidates = [targetSessionKey];
if (targetSessionKey.includes(":slash:")) {
candidates.push(
targetSessionKey.replace(":slash:", ":direct:"),
targetSessionKey.replace(":slash:", ":dm:"),
);
}
return [...new Set(candidates)];
}
function resolveSteerSessionId(params: {
commandParams: HandleCommandsParams;
targetSessionKey: string;
}): string | undefined {
const activeSessionId = resolveActiveEmbeddedRunSessionId(params.targetSessionKey);
if (activeSessionId) {
return activeSessionId;
const candidateKeys = listSteerCandidateSessionKeys(params.targetSessionKey);
for (const candidateKey of candidateKeys) {
const activeSessionId = resolveActiveEmbeddedRunSessionId(candidateKey);
if (activeSessionId) {
return activeSessionId;
}
}
const entry = resolveStoredSessionEntry(params.commandParams, params.targetSessionKey);
const sessionId = normalizeOptionalString(entry?.sessionId);
if (!sessionId || !isEmbeddedAgentRunActive(sessionId)) {
return undefined;
for (const candidateKey of candidateKeys) {
const entry = resolveStoredSessionEntry(params.commandParams, candidateKey);
const sessionFile = normalizeOptionalString(entry?.sessionFile);
if (!sessionFile) {
continue;
}
const activeSessionId = resolveActiveEmbeddedRunSessionIdBySessionFile(sessionFile);
if (activeSessionId) {
return activeSessionId;
}
}
return sessionId;
for (const candidateKey of candidateKeys) {
const entry = resolveStoredSessionEntry(params.commandParams, candidateKey);
const sessionId = normalizeOptionalString(entry?.sessionId);
if (sessionId && isEmbeddedAgentRunActive(sessionId)) {
return sessionId;
}
}
return undefined;
}
function applySteerFallbackPrompt(ctx: HandleCommandsParams["ctx"], message: string): void {