mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-29 10:02:04 +00:00
Implement reply routing based on OriginatingChannel/OriginatingTo fields. This ensures replies go back to the provider where the message originated instead of using the session's lastChannel. Changes: - Add OriginatingChannel/OriginatingTo fields to MsgContext (templating.ts) - Add originatingChannel/originatingTo fields to FollowupRun (queue.ts) - Create route-reply.ts with provider-agnostic router - Update all providers (Telegram, Slack, Discord, Signal, iMessage) to pass originating channel info - Update reply.ts to pass originating channel to followupRun - Update followup-runner.ts to use route-reply for originating channels This addresses the issue where messages from one provider (e.g., Slack) would receive replies on a different provider (e.g., Telegram) because the queue used the last active dispatcher instead of the originating one.
253 lines
9.0 KiB
TypeScript
253 lines
9.0 KiB
TypeScript
import crypto from "node:crypto";
|
|
import { lookupContextTokens } from "../../agents/context.js";
|
|
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
|
|
import { runWithModelFallback } from "../../agents/model-fallback.js";
|
|
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
|
import { hasNonzeroUsage } from "../../agents/usage.js";
|
|
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
|
|
import { logVerbose } from "../../globals.js";
|
|
import { registerAgentRunContext } from "../../infra/agent-events.js";
|
|
import { defaultRuntime } from "../../runtime.js";
|
|
import { stripHeartbeatToken } from "../heartbeat.js";
|
|
import { SILENT_REPLY_TOKEN } from "../tokens.js";
|
|
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
|
import type { FollowupRun } from "./queue.js";
|
|
import { extractReplyToTag } from "./reply-tags.js";
|
|
import { isRoutableChannel, routeReply } from "./route-reply.js";
|
|
import { incrementCompactionCount } from "./session-updates.js";
|
|
import type { TypingController } from "./typing.js";
|
|
|
|
export function createFollowupRunner(params: {
|
|
opts?: GetReplyOptions;
|
|
typing: TypingController;
|
|
sessionEntry?: SessionEntry;
|
|
sessionStore?: Record<string, SessionEntry>;
|
|
sessionKey?: string;
|
|
storePath?: string;
|
|
defaultModel: string;
|
|
agentCfgContextTokens?: number;
|
|
}): (queued: FollowupRun) => Promise<void> {
|
|
const {
|
|
opts,
|
|
typing,
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
defaultModel,
|
|
agentCfgContextTokens,
|
|
} = params;
|
|
|
|
/**
|
|
* Sends followup payloads, routing to the originating channel if set.
|
|
*
|
|
* When originatingChannel/originatingTo are set on the queued run,
|
|
* replies are routed directly to that provider instead of using the
|
|
* session's current dispatcher. This ensures replies go back to
|
|
* where the message originated.
|
|
*/
|
|
const sendFollowupPayloads = async (
|
|
payloads: ReplyPayload[],
|
|
queued: FollowupRun,
|
|
) => {
|
|
// Check if we should route to originating channel.
|
|
const { originatingChannel, originatingTo } = queued;
|
|
const shouldRouteToOriginating =
|
|
isRoutableChannel(originatingChannel) && originatingTo;
|
|
|
|
if (!shouldRouteToOriginating && !opts?.onBlockReply) {
|
|
logVerbose("followup queue: no onBlockReply handler; dropping payloads");
|
|
return;
|
|
}
|
|
|
|
for (const payload of payloads) {
|
|
if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) {
|
|
continue;
|
|
}
|
|
if (
|
|
payload.text?.trim() === SILENT_REPLY_TOKEN &&
|
|
!payload.mediaUrl &&
|
|
!payload.mediaUrls?.length
|
|
) {
|
|
continue;
|
|
}
|
|
await typing.startTypingOnText(payload.text);
|
|
|
|
// Route to originating channel if set, otherwise fall back to dispatcher.
|
|
if (shouldRouteToOriginating) {
|
|
const result = await routeReply({
|
|
payload,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
cfg: queued.run.config,
|
|
});
|
|
if (!result.ok) {
|
|
logVerbose(
|
|
`followup queue: route-reply failed: ${result.error ?? "unknown error"}`,
|
|
);
|
|
}
|
|
} else if (opts?.onBlockReply) {
|
|
await opts.onBlockReply(payload);
|
|
}
|
|
}
|
|
};
|
|
|
|
return async (queued: FollowupRun) => {
|
|
try {
|
|
const runId = crypto.randomUUID();
|
|
if (queued.run.sessionKey) {
|
|
registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey });
|
|
}
|
|
let autoCompactionCompleted = false;
|
|
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
|
|
let fallbackProvider = queued.run.provider;
|
|
let fallbackModel = queued.run.model;
|
|
try {
|
|
const fallbackResult = await runWithModelFallback({
|
|
cfg: queued.run.config,
|
|
provider: queued.run.provider,
|
|
model: queued.run.model,
|
|
run: (provider, model) =>
|
|
runEmbeddedPiAgent({
|
|
sessionId: queued.run.sessionId,
|
|
sessionKey: queued.run.sessionKey,
|
|
messageProvider: queued.run.messageProvider,
|
|
sessionFile: queued.run.sessionFile,
|
|
workspaceDir: queued.run.workspaceDir,
|
|
config: queued.run.config,
|
|
skillsSnapshot: queued.run.skillsSnapshot,
|
|
prompt: queued.prompt,
|
|
extraSystemPrompt: queued.run.extraSystemPrompt,
|
|
ownerNumbers: queued.run.ownerNumbers,
|
|
enforceFinalTag: queued.run.enforceFinalTag,
|
|
provider,
|
|
model,
|
|
authProfileId: queued.run.authProfileId,
|
|
thinkLevel: queued.run.thinkLevel,
|
|
verboseLevel: queued.run.verboseLevel,
|
|
bashElevated: queued.run.bashElevated,
|
|
timeoutMs: queued.run.timeoutMs,
|
|
runId,
|
|
blockReplyBreak: queued.run.blockReplyBreak,
|
|
onAgentEvent: (evt) => {
|
|
if (evt.stream !== "compaction") return;
|
|
const phase =
|
|
typeof evt.data.phase === "string" ? evt.data.phase : "";
|
|
const willRetry = Boolean(evt.data.willRetry);
|
|
if (phase === "end" && !willRetry) {
|
|
autoCompactionCompleted = true;
|
|
}
|
|
},
|
|
}),
|
|
});
|
|
runResult = fallbackResult.result;
|
|
fallbackProvider = fallbackResult.provider;
|
|
fallbackModel = fallbackResult.model;
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
defaultRuntime.error?.(
|
|
`Followup agent failed before reply: ${message}`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
const payloadArray = runResult.payloads ?? [];
|
|
if (payloadArray.length === 0) return;
|
|
const sanitizedPayloads = payloadArray.flatMap((payload) => {
|
|
const text = payload.text;
|
|
if (!text || !text.includes("HEARTBEAT_OK")) return [payload];
|
|
const stripped = stripHeartbeatToken(text, { mode: "message" });
|
|
const hasMedia =
|
|
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
|
if (stripped.shouldSkip && !hasMedia) return [];
|
|
return [{ ...payload, text: stripped.text }];
|
|
});
|
|
|
|
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
|
|
.map((payload) => {
|
|
const { cleaned, replyToId } = extractReplyToTag(payload.text);
|
|
return {
|
|
...payload,
|
|
text: cleaned ? cleaned : undefined,
|
|
replyToId: replyToId ?? payload.replyToId,
|
|
};
|
|
})
|
|
.filter(
|
|
(payload) =>
|
|
payload.text ||
|
|
payload.mediaUrl ||
|
|
(payload.mediaUrls && payload.mediaUrls.length > 0),
|
|
);
|
|
|
|
if (replyTaggedPayloads.length === 0) return;
|
|
|
|
if (autoCompactionCompleted) {
|
|
const count = await incrementCompactionCount({
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
});
|
|
if (queued.run.verboseLevel === "on") {
|
|
const suffix = typeof count === "number" ? ` (count ${count})` : "";
|
|
replyTaggedPayloads.unshift({
|
|
text: `🧹 Auto-compaction complete${suffix}.`,
|
|
});
|
|
}
|
|
}
|
|
|
|
if (sessionStore && sessionKey) {
|
|
const usage = runResult.meta.agentMeta?.usage;
|
|
const modelUsed =
|
|
runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
|
|
const contextTokensUsed =
|
|
agentCfgContextTokens ??
|
|
lookupContextTokens(modelUsed) ??
|
|
sessionEntry?.contextTokens ??
|
|
DEFAULT_CONTEXT_TOKENS;
|
|
|
|
if (hasNonzeroUsage(usage)) {
|
|
const entry = sessionStore[sessionKey];
|
|
if (entry) {
|
|
const input = usage.input ?? 0;
|
|
const output = usage.output ?? 0;
|
|
const promptTokens =
|
|
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
|
|
sessionStore[sessionKey] = {
|
|
...entry,
|
|
inputTokens: input,
|
|
outputTokens: output,
|
|
totalTokens:
|
|
promptTokens > 0 ? promptTokens : (usage.total ?? input),
|
|
modelProvider: fallbackProvider ?? entry.modelProvider,
|
|
model: modelUsed,
|
|
contextTokens: contextTokensUsed ?? entry.contextTokens,
|
|
updatedAt: Date.now(),
|
|
};
|
|
if (storePath) {
|
|
await saveSessionStore(storePath, sessionStore);
|
|
}
|
|
}
|
|
} else if (modelUsed || contextTokensUsed) {
|
|
const entry = sessionStore[sessionKey];
|
|
if (entry) {
|
|
sessionStore[sessionKey] = {
|
|
...entry,
|
|
modelProvider: fallbackProvider ?? entry.modelProvider,
|
|
model: modelUsed ?? entry.model,
|
|
contextTokens: contextTokensUsed ?? entry.contextTokens,
|
|
};
|
|
if (storePath) {
|
|
await saveSessionStore(storePath, sessionStore);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
await sendFollowupPayloads(replyTaggedPayloads, queued);
|
|
} finally {
|
|
typing.markRunComplete();
|
|
}
|
|
};
|
|
}
|