mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 22:20:42 +00:00
Stop Mattermost preview placeholders from surviving successful finals
The latest hardening pass on PR #47838 still left `↓ See below.` in the transcript after healthy final delivery. This change moves Mattermost preview finalization into one helper that only clears the draft after a confirmed normal final send, keeps in-place finalization for safe single-final turns, and preserves the preview when final delivery fails. Constraint: Keep the fix scoped to the Mattermost preview state machine already under review in PR #47838 Rejected: Keep rewriting the preview to `↓ See below.` after successful normal sends | leaves transcript noise on healthy turns Rejected: Delete the preview before attempting the real final send | risks losing all visible output when delivery fails Confidence: high Scope-risk: narrow Directive: Successful normal finals must leave either the finalized preview or the real final post, never an extra placeholder note Tested: `pnpm test extensions/mattermost/src/mattermost/monitor.test.ts`; `pnpm test extensions/mattermost/src/mattermost/draft-stream.test.ts`; `pnpm lint -- extensions/mattermost/src/mattermost/monitor.ts extensions/mattermost/src/mattermost/monitor.test.ts`; `pnpm build` Not-tested: Fresh Mattermost smoke on the PR branch itself Related: PR #47838
This commit is contained in:
@@ -545,6 +545,15 @@ export async function updateMattermostPost(
|
||||
});
|
||||
}
|
||||
|
||||
export async function deleteMattermostPost(
|
||||
client: MattermostClient,
|
||||
postId: string,
|
||||
): Promise<void> {
|
||||
await client.request<void>(`/posts/${postId}`, {
|
||||
method: "DELETE",
|
||||
});
|
||||
}
|
||||
|
||||
export async function uploadMattermostFile(
|
||||
client: MattermostClient,
|
||||
params: {
|
||||
|
||||
@@ -10,11 +10,14 @@ type RequestRecord = {
|
||||
function createMockClient(): {
|
||||
client: MattermostClient;
|
||||
calls: RequestRecord[];
|
||||
request: ReturnType<typeof vi.fn>;
|
||||
requestMock: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
const calls: RequestRecord[] = [];
|
||||
let nextId = 1;
|
||||
const request = vi.fn(async <T>(path: string, init?: RequestInit): Promise<T> => {
|
||||
const requestImpl: MattermostClient["request"] = async <T>(
|
||||
path: string,
|
||||
init?: RequestInit,
|
||||
): Promise<T> => {
|
||||
calls.push({ path, init });
|
||||
if (path === "/posts") {
|
||||
return { id: `post-${nextId++}` } as T;
|
||||
@@ -23,15 +26,16 @@ function createMockClient(): {
|
||||
return { id: "patched" } as T;
|
||||
}
|
||||
return {} as T;
|
||||
});
|
||||
};
|
||||
const requestMock = vi.fn(requestImpl);
|
||||
const client: MattermostClient = {
|
||||
baseUrl: "https://chat.example.com",
|
||||
apiBaseUrl: "https://chat.example.com/api/v4",
|
||||
token: "token",
|
||||
request: request as MattermostClient["request"],
|
||||
fetchImpl: vi.fn(async () => new Response(null, { status: 204 })),
|
||||
request: requestMock as MattermostClient["request"],
|
||||
fetchImpl: vi.fn() as MattermostClient["fetchImpl"],
|
||||
};
|
||||
return { client, calls, request };
|
||||
return { client, calls, requestMock };
|
||||
}
|
||||
|
||||
describe("createMattermostDraftStream", () => {
|
||||
@@ -52,7 +56,7 @@ describe("createMattermostDraftStream", () => {
|
||||
expect(calls).toHaveLength(1);
|
||||
expect(calls[0]?.path).toBe("/posts");
|
||||
|
||||
const createBody = JSON.parse(calls[0]?.init?.body as string);
|
||||
const createBody = JSON.parse((calls[0]?.init?.body as string | undefined) ?? "{}");
|
||||
expect(createBody).toMatchObject({
|
||||
channel_id: "channel-1",
|
||||
root_id: "root-1",
|
||||
@@ -77,17 +81,61 @@ describe("createMattermostDraftStream", () => {
|
||||
expect(calls).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("clears the preview post when no final reply is delivered", async () => {
|
||||
const { client, calls } = createMockClient();
|
||||
const stream = createMattermostDraftStream({
|
||||
client,
|
||||
channelId: "channel-1",
|
||||
rootId: "root-1",
|
||||
throttleMs: 0,
|
||||
});
|
||||
|
||||
stream.update("Working...");
|
||||
await stream.flush();
|
||||
await stream.clear();
|
||||
|
||||
expect(calls).toHaveLength(2);
|
||||
expect(calls[1]?.path).toBe("/posts/post-1");
|
||||
expect(calls[1]?.init?.method).toBe("DELETE");
|
||||
expect(stream.postId()).toBeUndefined();
|
||||
});
|
||||
|
||||
it("stop flushes the last pending update and ignores later ones", async () => {
|
||||
const { client, calls } = createMockClient();
|
||||
const stream = createMattermostDraftStream({
|
||||
client,
|
||||
channelId: "channel-1",
|
||||
rootId: "root-1",
|
||||
throttleMs: 1000,
|
||||
});
|
||||
|
||||
stream.update("Working...");
|
||||
await stream.flush();
|
||||
stream.update("Stale partial");
|
||||
await stream.stop();
|
||||
stream.update("Late partial");
|
||||
await stream.flush();
|
||||
|
||||
expect(calls).toHaveLength(2);
|
||||
expect(calls[0]?.path).toBe("/posts");
|
||||
expect(calls[1]?.path).toBe("/posts/post-1");
|
||||
expect(JSON.parse((calls[1]?.init?.body as string | undefined) ?? "{}")).toMatchObject({
|
||||
message: "Stale partial",
|
||||
});
|
||||
});
|
||||
|
||||
it("warns and stops when preview creation fails", async () => {
|
||||
const warn = vi.fn();
|
||||
const request = vi.fn(async () => {
|
||||
const requestImpl: MattermostClient["request"] = async () => {
|
||||
throw new Error("boom");
|
||||
});
|
||||
};
|
||||
const requestMock = vi.fn(requestImpl);
|
||||
const client: MattermostClient = {
|
||||
baseUrl: "https://chat.example.com",
|
||||
apiBaseUrl: "https://chat.example.com/api/v4",
|
||||
token: "token",
|
||||
request: request as MattermostClient["request"],
|
||||
fetchImpl: vi.fn(async () => new Response(null, { status: 204 })),
|
||||
request: requestMock as MattermostClient["request"],
|
||||
fetchImpl: vi.fn() as MattermostClient["fetchImpl"],
|
||||
};
|
||||
const stream = createMattermostDraftStream({
|
||||
client,
|
||||
@@ -102,19 +150,65 @@ describe("createMattermostDraftStream", () => {
|
||||
await stream.flush();
|
||||
|
||||
expect(warn).toHaveBeenCalled();
|
||||
expect(request).toHaveBeenCalledTimes(1);
|
||||
expect(requestMock).toHaveBeenCalledTimes(1);
|
||||
expect(stream.postId()).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not resend after an update failure followed by stop", async () => {
|
||||
const warn = vi.fn();
|
||||
const calls: RequestRecord[] = [];
|
||||
let failNextPatch = true;
|
||||
const requestImpl: MattermostClient["request"] = async <T>(
|
||||
path: string,
|
||||
init?: RequestInit,
|
||||
): Promise<T> => {
|
||||
calls.push({ path, init });
|
||||
if (path === "/posts") {
|
||||
return { id: "post-1" } as T;
|
||||
}
|
||||
if (path === "/posts/post-1") {
|
||||
if (failNextPatch) {
|
||||
failNextPatch = false;
|
||||
throw new Error("patch failed");
|
||||
}
|
||||
return { id: "patched" } as T;
|
||||
}
|
||||
return {} as T;
|
||||
};
|
||||
const requestMock = vi.fn(requestImpl);
|
||||
const client: MattermostClient = {
|
||||
baseUrl: "https://chat.example.com",
|
||||
apiBaseUrl: "https://chat.example.com/api/v4",
|
||||
token: "token",
|
||||
request: requestMock as MattermostClient["request"],
|
||||
fetchImpl: vi.fn() as MattermostClient["fetchImpl"],
|
||||
};
|
||||
const stream = createMattermostDraftStream({
|
||||
client,
|
||||
channelId: "channel-1",
|
||||
throttleMs: 1000,
|
||||
warn,
|
||||
});
|
||||
|
||||
stream.update("Working...");
|
||||
await stream.flush();
|
||||
stream.update("Will fail");
|
||||
await stream.flush();
|
||||
await stream.stop();
|
||||
|
||||
expect(warn).toHaveBeenCalledWith("mattermost stream preview failed: patch failed");
|
||||
expect(calls).toHaveLength(2);
|
||||
expect(calls[0]?.path).toBe("/posts");
|
||||
expect(calls[1]?.path).toBe("/posts/post-1");
|
||||
});
|
||||
});
|
||||
|
||||
describe("buildMattermostToolStatusText", () => {
|
||||
it("renders a start status when phase is absent", () => {
|
||||
it("renders a status with the tool name", () => {
|
||||
expect(buildMattermostToolStatusText({ name: "read" })).toBe("Running `read`…");
|
||||
});
|
||||
|
||||
it("renders an update status when phase is update", () => {
|
||||
expect(buildMattermostToolStatusText({ name: "exec", phase: "update" })).toBe(
|
||||
"Running `exec`…",
|
||||
);
|
||||
it("falls back to a generic running tool status", () => {
|
||||
expect(buildMattermostToolStatusText({ name: "exec" })).toBe("Running `exec`…");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
import { createFinalizableDraftStreamControlsForState } from "openclaw/plugin-sdk/channel-lifecycle";
|
||||
import { createMattermostPost, updateMattermostPost, type MattermostClient } from "./client.js";
|
||||
import { createFinalizableDraftLifecycle } from "openclaw/plugin-sdk/channel-lifecycle";
|
||||
import {
|
||||
createMattermostPost,
|
||||
deleteMattermostPost,
|
||||
updateMattermostPost,
|
||||
type MattermostClient,
|
||||
} from "./client.js";
|
||||
|
||||
const MATTERMOST_STREAM_MAX_CHARS = 4000;
|
||||
const DEFAULT_THROTTLE_MS = 1000;
|
||||
@@ -8,6 +13,7 @@ export type MattermostDraftStream = {
|
||||
update: (text: string) => void;
|
||||
flush: () => Promise<void>;
|
||||
postId: () => string | undefined;
|
||||
clear: () => Promise<void>;
|
||||
stop: () => Promise<void>;
|
||||
forceNewMessage: () => void;
|
||||
};
|
||||
@@ -89,10 +95,20 @@ export function createMattermostDraftStream(params: {
|
||||
}
|
||||
};
|
||||
|
||||
const { loop, update, stop } = createFinalizableDraftStreamControlsForState({
|
||||
const { loop, update, stop, clear } = createFinalizableDraftLifecycle({
|
||||
throttleMs,
|
||||
state: streamState,
|
||||
sendOrEditStreamMessage,
|
||||
readMessageId: () => streamPostId,
|
||||
clearMessageId: () => {
|
||||
streamPostId = undefined;
|
||||
},
|
||||
isValidMessageId: (value): value is string => typeof value === "string" && value.length > 0,
|
||||
deleteMessage: async (postId) => {
|
||||
await deleteMattermostPost(params.client, postId);
|
||||
},
|
||||
warn: params.warn,
|
||||
warnPrefix: "mattermost stream preview cleanup failed",
|
||||
});
|
||||
|
||||
const forceNewMessage = () => {
|
||||
@@ -108,6 +124,7 @@ export function createMattermostDraftStream(params: {
|
||||
update,
|
||||
flush: loop.flush,
|
||||
postId: () => streamPostId,
|
||||
clear,
|
||||
stop,
|
||||
forceNewMessage,
|
||||
};
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../runtime-api.js";
|
||||
import { resolveMattermostAccount } from "./accounts.js";
|
||||
import * as clientModule from "./client.js";
|
||||
import type { MattermostClient } from "./client.js";
|
||||
import {
|
||||
buildMattermostModelPickerSelectMessageSid,
|
||||
didDeliverAllMattermostDeferredFinalReplies,
|
||||
canFinalizeMattermostPreviewInPlace,
|
||||
deliverMattermostReplyWithDraftPreview,
|
||||
evaluateMattermostMentionGate,
|
||||
MattermostRetryableInboundError,
|
||||
processMattermostReplayGuardedPost,
|
||||
@@ -12,6 +15,8 @@ import {
|
||||
resolveMattermostEffectiveReplyToId,
|
||||
resolveMattermostReplyRootId,
|
||||
resolveMattermostThreadSessionContext,
|
||||
shouldFinalizeMattermostPreviewAfterDispatch,
|
||||
shouldClearMattermostDraftPreview,
|
||||
type MattermostMentionGateInput,
|
||||
type MattermostRequireMentionResolverInput,
|
||||
} from "./monitor.js";
|
||||
@@ -42,6 +47,33 @@ function resolveRequireMentionForTest(params: MattermostRequireMentionResolverIn
|
||||
return true;
|
||||
}
|
||||
|
||||
const updateMattermostPostSpy = vi.spyOn(clientModule, "updateMattermostPost");
|
||||
|
||||
function createMattermostClientMock(): MattermostClient {
|
||||
return {
|
||||
baseUrl: "https://chat.example.com",
|
||||
apiBaseUrl: "https://chat.example.com/api/v4",
|
||||
token: "token",
|
||||
request: vi.fn(async () => ({})) as MattermostClient["request"],
|
||||
fetchImpl: vi.fn(
|
||||
async () => new Response(null, { status: 200 }),
|
||||
) as MattermostClient["fetchImpl"],
|
||||
};
|
||||
}
|
||||
|
||||
function createDraftStreamMock(postId: string | undefined = "preview-post-1") {
|
||||
return {
|
||||
flush: vi.fn(async () => {}),
|
||||
postId: vi.fn(() => postId),
|
||||
clear: vi.fn(async () => {}),
|
||||
};
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
updateMattermostPostSpy.mockResolvedValue({ id: "patched" } as never);
|
||||
});
|
||||
|
||||
function evaluateMentionGateForMessage(params: { cfg: OpenClawConfig; threadRootId?: string }) {
|
||||
const account = resolveMattermostAccount({ cfg: params.cfg, accountId: "default" });
|
||||
const resolver = vi.fn(resolveRequireMentionForTest);
|
||||
@@ -168,6 +200,182 @@ describe("resolveMattermostReplyRootId", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("canFinalizeMattermostPreviewInPlace", () => {
|
||||
it("allows in-place finalization when the final reply target matches the preview thread", () => {
|
||||
expect(
|
||||
canFinalizeMattermostPreviewInPlace({
|
||||
previewRootId: "thread-root-456",
|
||||
threadRootId: "thread-root-456",
|
||||
replyToId: "child-post-789",
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("prevents in-place finalization when a top-level preview would become a threaded reply", () => {
|
||||
expect(
|
||||
canFinalizeMattermostPreviewInPlace({
|
||||
replyToId: "child-post-789",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("shouldClearMattermostDraftPreview", () => {
|
||||
it("deletes the preview after successful normal final delivery", () => {
|
||||
expect(
|
||||
shouldClearMattermostDraftPreview({
|
||||
finalizedViaPreviewPost: false,
|
||||
finalReplyDelivered: true,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps the preview when final delivery failed", () => {
|
||||
expect(
|
||||
shouldClearMattermostDraftPreview({
|
||||
finalizedViaPreviewPost: false,
|
||||
finalReplyDelivered: false,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("keeps the preview when it already became the final reply", () => {
|
||||
expect(
|
||||
shouldClearMattermostDraftPreview({
|
||||
finalizedViaPreviewPost: true,
|
||||
finalReplyDelivered: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("deliverMattermostReplyWithDraftPreview", () => {
|
||||
it("deletes the preview after a successful normal final send", async () => {
|
||||
const draftStream = createDraftStreamMock();
|
||||
const deliverFinal = vi.fn(async () => {});
|
||||
|
||||
await deliverMattermostReplyWithDraftPreview({
|
||||
payload: { text: "All good", replyToId: "reply-1" } as never,
|
||||
info: { kind: "final" },
|
||||
client: createMattermostClientMock(),
|
||||
draftStream,
|
||||
resolvePreviewFinalText: (text) => text?.trim(),
|
||||
previewState: { finalizedViaPreviewPost: false },
|
||||
logVerboseMessage: vi.fn(),
|
||||
deliverFinal,
|
||||
});
|
||||
|
||||
expect(deliverFinal).toHaveBeenCalledTimes(1);
|
||||
expect(draftStream.clear).toHaveBeenCalledTimes(1);
|
||||
expect(updateMattermostPostSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("deletes the preview after a successful non-finalizable media final", async () => {
|
||||
const draftStream = createDraftStreamMock();
|
||||
const deliverFinal = vi.fn(async () => {});
|
||||
|
||||
await deliverMattermostReplyWithDraftPreview({
|
||||
payload: {
|
||||
text: "Photo",
|
||||
replyToId: "reply-1",
|
||||
mediaUrl: "https://example.com/a.png",
|
||||
} as never,
|
||||
info: { kind: "final" },
|
||||
client: createMattermostClientMock(),
|
||||
draftStream,
|
||||
effectiveReplyToId: "thread-root-1",
|
||||
resolvePreviewFinalText: (text) => text?.trim(),
|
||||
previewState: { finalizedViaPreviewPost: false },
|
||||
logVerboseMessage: vi.fn(),
|
||||
deliverFinal,
|
||||
});
|
||||
|
||||
expect(deliverFinal).toHaveBeenCalledTimes(1);
|
||||
expect(draftStream.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("finalizes the preview in place when the final targets the same thread", async () => {
|
||||
const draftStream = createDraftStreamMock();
|
||||
const deliverFinal = vi.fn(async () => {});
|
||||
|
||||
await deliverMattermostReplyWithDraftPreview({
|
||||
payload: { text: "Final answer", replyToId: "child-post-789" } as never,
|
||||
info: { kind: "final" },
|
||||
client: createMattermostClientMock(),
|
||||
draftStream,
|
||||
effectiveReplyToId: "thread-root-456",
|
||||
resolvePreviewFinalText: (text) => text?.trim(),
|
||||
previewState: { finalizedViaPreviewPost: false },
|
||||
logVerboseMessage: vi.fn(),
|
||||
deliverFinal,
|
||||
});
|
||||
|
||||
expect(updateMattermostPostSpy).toHaveBeenCalledWith(
|
||||
expect.anything(),
|
||||
"preview-post-1",
|
||||
expect.objectContaining({ message: "Final answer" }),
|
||||
);
|
||||
expect(deliverFinal).not.toHaveBeenCalled();
|
||||
expect(draftStream.clear).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps the existing preview unchanged when final delivery fails", async () => {
|
||||
const draftStream = createDraftStreamMock();
|
||||
const deliverFinal = vi.fn(async () => {
|
||||
throw new Error("send failed");
|
||||
});
|
||||
|
||||
await expect(
|
||||
deliverMattermostReplyWithDraftPreview({
|
||||
payload: { text: "Broken", replyToId: "reply-1" } as never,
|
||||
info: { kind: "final" },
|
||||
client: createMattermostClientMock(),
|
||||
draftStream,
|
||||
resolvePreviewFinalText: (text) => text?.trim(),
|
||||
previewState: { finalizedViaPreviewPost: false },
|
||||
logVerboseMessage: vi.fn(),
|
||||
deliverFinal,
|
||||
}),
|
||||
).rejects.toThrow("send failed");
|
||||
|
||||
expect(draftStream.clear).not.toHaveBeenCalled();
|
||||
expect(updateMattermostPostSpy).not.toHaveBeenCalledWith(
|
||||
expect.anything(),
|
||||
"preview-post-1",
|
||||
expect.objectContaining({ message: "↓ See below." }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("shouldFinalizeMattermostPreviewAfterDispatch", () => {
|
||||
it("reuses the preview only for a single eligible final payload", () => {
|
||||
expect(
|
||||
shouldFinalizeMattermostPreviewAfterDispatch({
|
||||
finalCount: 1,
|
||||
canFinalizeInPlace: true,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("falls back to normal sends for multi-payload finals", () => {
|
||||
expect(
|
||||
shouldFinalizeMattermostPreviewAfterDispatch({
|
||||
finalCount: 2,
|
||||
canFinalizeInPlace: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("falls back to normal sends when the final cannot be edited into the preview", () => {
|
||||
expect(
|
||||
shouldFinalizeMattermostPreviewAfterDispatch({
|
||||
finalCount: 1,
|
||||
canFinalizeInPlace: false,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveMattermostEffectiveReplyToId", () => {
|
||||
it("keeps an existing thread root", () => {
|
||||
expect(
|
||||
@@ -437,23 +645,3 @@ describe("resolveMattermostReactionChannelId", () => {
|
||||
expect(resolveMattermostReactionChannelId({})).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("didDeliverAllMattermostDeferredFinalReplies", () => {
|
||||
it("returns true when all deferred finals were delivered", () => {
|
||||
expect(
|
||||
didDeliverAllMattermostDeferredFinalReplies({
|
||||
deliveredCount: 2,
|
||||
deferredCount: 2,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false when a later deferred final failed", () => {
|
||||
expect(
|
||||
didDeliverAllMattermostDeferredFinalReplies({
|
||||
deliveredCount: 1,
|
||||
deferredCount: 2,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
fetchMattermostMe,
|
||||
normalizeMattermostBaseUrl,
|
||||
updateMattermostPost,
|
||||
type MattermostClient,
|
||||
type MattermostPost,
|
||||
type MattermostUser,
|
||||
} from "./client.js";
|
||||
@@ -238,11 +239,112 @@ export function resolveMattermostReplyRootId(params: {
|
||||
return normalizeOptionalString(params.replyToId);
|
||||
}
|
||||
|
||||
export function didDeliverAllMattermostDeferredFinalReplies(params: {
|
||||
deliveredCount: number;
|
||||
deferredCount: number;
|
||||
export function canFinalizeMattermostPreviewInPlace(params: {
|
||||
previewRootId?: string;
|
||||
threadRootId?: string;
|
||||
replyToId?: string;
|
||||
}): boolean {
|
||||
return params.deferredCount > 0 && params.deliveredCount === params.deferredCount;
|
||||
return (
|
||||
resolveMattermostReplyRootId({
|
||||
threadRootId: params.threadRootId,
|
||||
replyToId: params.replyToId,
|
||||
}) === params.previewRootId?.trim()
|
||||
);
|
||||
}
|
||||
|
||||
export function shouldClearMattermostDraftPreview(params: {
|
||||
finalizedViaPreviewPost: boolean;
|
||||
finalReplyDelivered: boolean;
|
||||
}): boolean {
|
||||
return params.finalReplyDelivered && !params.finalizedViaPreviewPost;
|
||||
}
|
||||
|
||||
export function shouldFinalizeMattermostPreviewAfterDispatch(params: {
|
||||
finalCount: number;
|
||||
canFinalizeInPlace: boolean;
|
||||
}): boolean {
|
||||
return params.finalCount === 1 && params.canFinalizeInPlace;
|
||||
}
|
||||
|
||||
type MattermostDraftPreviewState = {
|
||||
finalizedViaPreviewPost: boolean;
|
||||
};
|
||||
|
||||
type MattermostDraftPreviewDeliverParams = {
|
||||
payload: ReplyPayload;
|
||||
info: { kind: "tool" | "block" | "final" };
|
||||
client: MattermostClient;
|
||||
draftStream: Pick<ReturnType<typeof createMattermostDraftStream>, "flush" | "postId" | "clear">;
|
||||
effectiveReplyToId?: string;
|
||||
resolvePreviewFinalText: (text?: string) => string | undefined;
|
||||
previewState: MattermostDraftPreviewState;
|
||||
logVerboseMessage: (message: string) => void;
|
||||
deliverFinal: () => Promise<void>;
|
||||
};
|
||||
|
||||
export async function deliverMattermostReplyWithDraftPreview(
|
||||
params: MattermostDraftPreviewDeliverParams,
|
||||
): Promise<void> {
|
||||
if (params.payload.isReasoning) {
|
||||
return;
|
||||
}
|
||||
|
||||
const isFinal = params.info.kind === "final";
|
||||
let previewPostId: string | undefined;
|
||||
if (isFinal) {
|
||||
await params.draftStream.flush();
|
||||
const hasMedia =
|
||||
Boolean(params.payload.mediaUrl) || (params.payload.mediaUrls?.length ?? 0) > 0;
|
||||
const previewFinalText = params.resolvePreviewFinalText(params.payload.text);
|
||||
previewPostId = params.draftStream.postId();
|
||||
|
||||
if (
|
||||
typeof previewPostId === "string" &&
|
||||
!hasMedia &&
|
||||
typeof previewFinalText === "string" &&
|
||||
!params.payload.isError &&
|
||||
canFinalizeMattermostPreviewInPlace({
|
||||
previewRootId: params.effectiveReplyToId,
|
||||
threadRootId: params.effectiveReplyToId,
|
||||
replyToId: params.payload.replyToId,
|
||||
})
|
||||
) {
|
||||
try {
|
||||
await updateMattermostPost(params.client, previewPostId, {
|
||||
message: previewFinalText,
|
||||
});
|
||||
params.previewState.finalizedViaPreviewPost = true;
|
||||
return;
|
||||
} catch (err) {
|
||||
params.logVerboseMessage(
|
||||
`mattermost preview final edit failed; falling back to normal send (${String(err)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let finalReplyDelivered = false;
|
||||
try {
|
||||
await params.deliverFinal();
|
||||
finalReplyDelivered = true;
|
||||
} finally {
|
||||
if (
|
||||
isFinal &&
|
||||
typeof previewPostId === "string" &&
|
||||
shouldClearMattermostDraftPreview({
|
||||
finalizedViaPreviewPost: params.previewState.finalizedViaPreviewPost,
|
||||
finalReplyDelivered,
|
||||
})
|
||||
) {
|
||||
try {
|
||||
await params.draftStream.clear();
|
||||
} catch (err) {
|
||||
params.logVerboseMessage(
|
||||
`mattermost draft preview clear failed after successful final delivery (${String(err)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveMattermostEffectiveReplyToId(params: {
|
||||
@@ -1525,52 +1627,156 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
|
||||
},
|
||||
},
|
||||
});
|
||||
const { dispatcher, replyOptions, markDispatchIdle } =
|
||||
const draftStream = createMattermostDraftStream({
|
||||
client,
|
||||
channelId,
|
||||
rootId: effectiveReplyToId,
|
||||
throttleMs: 1200,
|
||||
log: logVerboseMessage,
|
||||
warn: logVerboseMessage,
|
||||
});
|
||||
let lastPartialText = "";
|
||||
const previewState: MattermostDraftPreviewState = {
|
||||
finalizedViaPreviewPost: false,
|
||||
};
|
||||
|
||||
const resolvePreviewFinalText = (text?: string) => {
|
||||
if (typeof text !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const formatted = core.channel.text.convertMarkdownTables(text, tableMode);
|
||||
const chunkMode = core.channel.text.resolveChunkMode(
|
||||
cfg,
|
||||
"mattermost",
|
||||
account.accountId,
|
||||
);
|
||||
const chunks = core.channel.text.chunkMarkdownTextWithMode(
|
||||
formatted,
|
||||
textLimit,
|
||||
chunkMode,
|
||||
);
|
||||
if (!chunks.length && formatted) {
|
||||
chunks.push(formatted);
|
||||
}
|
||||
if (chunks.length != 1) {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = chunks[0]?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
lastPartialText &&
|
||||
lastPartialText.startsWith(trimmed) &&
|
||||
trimmed.length < lastPartialText.length
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed;
|
||||
};
|
||||
|
||||
const updateDraftFromPartial = (text?: string) => {
|
||||
const cleaned = text?.trim();
|
||||
if (!cleaned) {
|
||||
return;
|
||||
}
|
||||
if (cleaned === lastPartialText) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
lastPartialText &&
|
||||
lastPartialText.startsWith(cleaned) &&
|
||||
cleaned.length < lastPartialText.length
|
||||
) {
|
||||
return;
|
||||
}
|
||||
lastPartialText = cleaned;
|
||||
draftStream.update(cleaned);
|
||||
};
|
||||
|
||||
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
|
||||
core.channel.reply.createReplyDispatcherWithTyping({
|
||||
...replyPipeline,
|
||||
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
|
||||
typingCallbacks,
|
||||
deliver: async (payload: ReplyPayload) => {
|
||||
await deliverMattermostReplyPayload({
|
||||
core,
|
||||
cfg,
|
||||
deliver: async (payload: ReplyPayload, info) => {
|
||||
await deliverMattermostReplyWithDraftPreview({
|
||||
payload,
|
||||
to,
|
||||
accountId: account.accountId,
|
||||
agentId: route.agentId,
|
||||
replyToId: resolveMattermostReplyRootId({
|
||||
threadRootId: effectiveReplyToId,
|
||||
replyToId: payload.replyToId,
|
||||
}),
|
||||
textLimit,
|
||||
tableMode,
|
||||
sendMessage: sendMessageMattermost,
|
||||
info,
|
||||
client,
|
||||
draftStream,
|
||||
effectiveReplyToId,
|
||||
resolvePreviewFinalText,
|
||||
previewState,
|
||||
logVerboseMessage,
|
||||
deliverFinal: async () => {
|
||||
await deliverMattermostReplyPayload({
|
||||
core,
|
||||
cfg,
|
||||
payload,
|
||||
to,
|
||||
accountId: account.accountId,
|
||||
agentId: route.agentId,
|
||||
replyToId: resolveMattermostReplyRootId({
|
||||
threadRootId: effectiveReplyToId,
|
||||
replyToId: payload.replyToId,
|
||||
}),
|
||||
textLimit,
|
||||
tableMode,
|
||||
sendMessage: sendMessageMattermost,
|
||||
});
|
||||
runtime.log?.(`delivered reply to ${to}`);
|
||||
},
|
||||
});
|
||||
runtime.log?.(`delivered reply to ${to}`);
|
||||
},
|
||||
onError: (err, info) => {
|
||||
runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`);
|
||||
},
|
||||
});
|
||||
|
||||
await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher,
|
||||
onSettled: () => {
|
||||
markDispatchIdle();
|
||||
},
|
||||
run: () =>
|
||||
core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
disableBlockStreaming:
|
||||
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
|
||||
onModelSelected,
|
||||
},
|
||||
}),
|
||||
});
|
||||
try {
|
||||
await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher,
|
||||
onSettled: () => {
|
||||
markDispatchIdle();
|
||||
},
|
||||
run: () =>
|
||||
core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
disableBlockStreaming: true,
|
||||
onModelSelected,
|
||||
onPartialReply: (payload) => {
|
||||
updateDraftFromPartial(payload.text);
|
||||
},
|
||||
onAssistantMessageStart: () => {
|
||||
lastPartialText = "";
|
||||
},
|
||||
onReasoningEnd: () => {
|
||||
lastPartialText = "";
|
||||
},
|
||||
onReasoningStream: async () => {
|
||||
if (!lastPartialText) {
|
||||
draftStream.update("Thinking…");
|
||||
}
|
||||
},
|
||||
onToolStart: async (payload) => {
|
||||
draftStream.update(buildMattermostToolStatusText(payload));
|
||||
},
|
||||
},
|
||||
}),
|
||||
});
|
||||
} finally {
|
||||
try {
|
||||
await draftStream.stop();
|
||||
} catch (err) {
|
||||
logVerboseMessage(`mattermost draft preview cleanup failed: ${String(err)}`);
|
||||
}
|
||||
markRunComplete();
|
||||
}
|
||||
if (historyKey) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: channelHistories,
|
||||
@@ -1586,675 +1792,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
|
||||
);
|
||||
return;
|
||||
}
|
||||
const senderId = post.user_id ?? payload.broadcast?.user_id;
|
||||
if (!senderId) {
|
||||
logVerboseMessage("mattermost: drop post (missing sender id)");
|
||||
return;
|
||||
}
|
||||
if (senderId === botUserId) {
|
||||
logVerboseMessage(`mattermost: drop post (self sender=${senderId})`);
|
||||
return;
|
||||
}
|
||||
if (isSystemPost(post)) {
|
||||
logVerboseMessage(`mattermost: drop post (system post type=${post.type ?? "unknown"})`);
|
||||
return;
|
||||
}
|
||||
|
||||
const channelInfo = await resolveChannelInfo(channelId);
|
||||
const channelType = payload.data?.channel_type ?? channelInfo?.type ?? undefined;
|
||||
const kind = mapMattermostChannelTypeToChatType(channelType);
|
||||
const chatType = channelChatType(kind);
|
||||
|
||||
const senderName =
|
||||
payload.data?.sender_name?.trim() ||
|
||||
(await resolveUserInfo(senderId))?.username?.trim() ||
|
||||
senderId;
|
||||
const rawText = post.message?.trim() || "";
|
||||
const dmPolicy = account.config.dmPolicy ?? "pairing";
|
||||
const normalizedAllowFrom = normalizeMattermostAllowList(account.config.allowFrom ?? []);
|
||||
const normalizedGroupAllowFrom = normalizeMattermostAllowList(
|
||||
account.config.groupAllowFrom ?? [],
|
||||
);
|
||||
const storeAllowFrom = normalizeMattermostAllowList(
|
||||
await readStoreAllowFromForDmPolicy({
|
||||
provider: "mattermost",
|
||||
accountId: account.accountId,
|
||||
dmPolicy,
|
||||
readStore: pairing.readStoreForDmPolicy,
|
||||
}),
|
||||
);
|
||||
const accessDecision = resolveDmGroupAccessWithLists({
|
||||
isGroup: kind !== "direct",
|
||||
dmPolicy,
|
||||
groupPolicy,
|
||||
allowFrom: normalizedAllowFrom,
|
||||
groupAllowFrom: normalizedGroupAllowFrom,
|
||||
storeAllowFrom,
|
||||
isSenderAllowed: (allowFrom) =>
|
||||
isMattermostSenderAllowed({
|
||||
senderId,
|
||||
senderName,
|
||||
allowFrom,
|
||||
allowNameMatching,
|
||||
}),
|
||||
});
|
||||
const effectiveAllowFrom = accessDecision.effectiveAllowFrom;
|
||||
const effectiveGroupAllowFrom = accessDecision.effectiveGroupAllowFrom;
|
||||
const allowTextCommands = core.channel.commands.shouldHandleTextCommands({
|
||||
cfg,
|
||||
surface: "mattermost",
|
||||
});
|
||||
const hasControlCommand = core.channel.text.hasControlCommand(rawText, cfg);
|
||||
const isControlCommand = allowTextCommands && hasControlCommand;
|
||||
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
|
||||
const commandDmAllowFrom = kind === "direct" ? effectiveAllowFrom : normalizedAllowFrom;
|
||||
const senderAllowedForCommands = isMattermostSenderAllowed({
|
||||
senderId,
|
||||
senderName,
|
||||
allowFrom: commandDmAllowFrom,
|
||||
allowNameMatching,
|
||||
});
|
||||
const groupAllowedForCommands = isMattermostSenderAllowed({
|
||||
senderId,
|
||||
senderName,
|
||||
allowFrom: effectiveGroupAllowFrom,
|
||||
allowNameMatching,
|
||||
});
|
||||
const commandGate = resolveControlCommandGate({
|
||||
useAccessGroups,
|
||||
authorizers: [
|
||||
{ configured: commandDmAllowFrom.length > 0, allowed: senderAllowedForCommands },
|
||||
{
|
||||
configured: effectiveGroupAllowFrom.length > 0,
|
||||
allowed: groupAllowedForCommands,
|
||||
},
|
||||
],
|
||||
allowTextCommands,
|
||||
hasControlCommand,
|
||||
});
|
||||
const commandAuthorized = commandGate.commandAuthorized;
|
||||
|
||||
if (accessDecision.decision !== "allow") {
|
||||
if (kind === "direct") {
|
||||
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) {
|
||||
logVerboseMessage(`mattermost: drop dm (dmPolicy=disabled sender=${senderId})`);
|
||||
return;
|
||||
}
|
||||
if (accessDecision.decision === "pairing") {
|
||||
const { code, created } = await pairing.upsertPairingRequest({
|
||||
id: senderId,
|
||||
meta: { name: senderName },
|
||||
});
|
||||
logVerboseMessage(`mattermost: pairing request sender=${senderId} created=${created}`);
|
||||
if (created) {
|
||||
try {
|
||||
await sendMessageMattermost(
|
||||
`user:${senderId}`,
|
||||
core.channel.pairing.buildPairingReply({
|
||||
channel: "mattermost",
|
||||
idLine: `Your Mattermost user id: ${senderId}`,
|
||||
code,
|
||||
}),
|
||||
{ cfg, accountId: account.accountId },
|
||||
);
|
||||
opts.statusSink?.({ lastOutboundAt: Date.now() });
|
||||
} catch (err) {
|
||||
logVerboseMessage(`mattermost: pairing reply failed for ${senderId}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
logVerboseMessage(`mattermost: drop dm sender=${senderId} (dmPolicy=${dmPolicy})`);
|
||||
return;
|
||||
}
|
||||
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_DISABLED) {
|
||||
logVerboseMessage("mattermost: drop group message (groupPolicy=disabled)");
|
||||
return;
|
||||
}
|
||||
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) {
|
||||
logVerboseMessage("mattermost: drop group message (no group allowlist)");
|
||||
return;
|
||||
}
|
||||
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) {
|
||||
logVerboseMessage(`mattermost: drop group sender=${senderId} (not in groupAllowFrom)`);
|
||||
return;
|
||||
}
|
||||
logVerboseMessage(
|
||||
`mattermost: drop group message (groupPolicy=${groupPolicy} reason=${accessDecision.reason})`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (kind !== "direct" && commandGate.shouldBlock) {
|
||||
logInboundDrop({
|
||||
log: logVerboseMessage,
|
||||
channel: "mattermost",
|
||||
reason: "control command (unauthorized)",
|
||||
target: senderId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const teamId = payload.data?.team_id ?? channelInfo?.team_id ?? undefined;
|
||||
const channelName = payload.data?.channel_name ?? channelInfo?.name ?? "";
|
||||
const channelDisplay =
|
||||
payload.data?.channel_display_name ?? channelInfo?.display_name ?? channelName;
|
||||
const roomLabel = channelName ? `#${channelName}` : channelDisplay || `#${channelId}`;
|
||||
|
||||
const route = core.channel.routing.resolveAgentRoute({
|
||||
cfg,
|
||||
channel: "mattermost",
|
||||
accountId: account.accountId,
|
||||
teamId,
|
||||
peer: {
|
||||
kind,
|
||||
id: kind === "direct" ? senderId : channelId,
|
||||
},
|
||||
});
|
||||
|
||||
const baseSessionKey = route.sessionKey;
|
||||
const threadRootId = post.root_id?.trim() || undefined;
|
||||
const replyToMode = resolveMattermostReplyToMode(account, kind);
|
||||
const threadContext = resolveMattermostThreadSessionContext({
|
||||
baseSessionKey,
|
||||
kind,
|
||||
postId: post.id,
|
||||
replyToMode,
|
||||
threadRootId,
|
||||
});
|
||||
const { effectiveReplyToId, sessionKey, parentSessionKey } = threadContext;
|
||||
const historyKey = kind === "direct" ? null : sessionKey;
|
||||
|
||||
const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg, route.agentId);
|
||||
const wasMentioned =
|
||||
kind !== "direct" &&
|
||||
((botUsername ? rawText.toLowerCase().includes(`@${botUsername.toLowerCase()}`) : false) ||
|
||||
core.channel.mentions.matchesMentionPatterns(rawText, mentionRegexes));
|
||||
const pendingBody =
|
||||
rawText ||
|
||||
(post.file_ids?.length
|
||||
? `[Mattermost ${post.file_ids.length === 1 ? "file" : "files"}]`
|
||||
: "");
|
||||
const pendingSender = senderName;
|
||||
const recordPendingHistory = () => {
|
||||
const trimmed = pendingBody.trim();
|
||||
recordPendingHistoryEntryIfEnabled({
|
||||
historyMap: channelHistories,
|
||||
limit: historyLimit,
|
||||
historyKey: historyKey ?? "",
|
||||
entry:
|
||||
historyKey && trimmed
|
||||
? {
|
||||
sender: pendingSender,
|
||||
body: trimmed,
|
||||
timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
|
||||
messageId: post.id ?? undefined,
|
||||
}
|
||||
: null,
|
||||
});
|
||||
};
|
||||
|
||||
const oncharEnabled = account.chatmode === "onchar" && kind !== "direct";
|
||||
const oncharPrefixes = oncharEnabled ? resolveOncharPrefixes(account.oncharPrefixes) : [];
|
||||
const oncharResult = oncharEnabled
|
||||
? stripOncharPrefix(rawText, oncharPrefixes)
|
||||
: { triggered: false, stripped: rawText };
|
||||
const oncharTriggered = oncharResult.triggered;
|
||||
const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0;
|
||||
const mentionDecision = evaluateMattermostMentionGate({
|
||||
kind,
|
||||
cfg,
|
||||
accountId: account.accountId,
|
||||
channelId,
|
||||
threadRootId,
|
||||
requireMentionOverride: account.requireMention,
|
||||
resolveRequireMention: core.channel.groups.resolveRequireMention,
|
||||
wasMentioned,
|
||||
isControlCommand,
|
||||
commandAuthorized,
|
||||
oncharEnabled,
|
||||
oncharTriggered,
|
||||
canDetectMention,
|
||||
});
|
||||
const { shouldRequireMention, shouldBypassMention } = mentionDecision;
|
||||
|
||||
if (mentionDecision.dropReason === "onchar-not-triggered") {
|
||||
logVerboseMessage(
|
||||
`mattermost: drop group message (onchar not triggered channel=${channelId} sender=${senderId})`,
|
||||
);
|
||||
recordPendingHistory();
|
||||
return;
|
||||
}
|
||||
|
||||
if (mentionDecision.dropReason === "missing-mention") {
|
||||
logVerboseMessage(
|
||||
`mattermost: drop group message (missing mention channel=${channelId} sender=${senderId} requireMention=${shouldRequireMention} bypass=${shouldBypassMention} canDetectMention=${canDetectMention})`,
|
||||
);
|
||||
recordPendingHistory();
|
||||
return;
|
||||
}
|
||||
const mediaList = await resolveMattermostMedia(post.file_ids);
|
||||
const mediaPlaceholder = buildMattermostAttachmentPlaceholder(mediaList);
|
||||
const bodySource = oncharTriggered ? oncharResult.stripped : rawText;
|
||||
const baseText = [bodySource, mediaPlaceholder].filter(Boolean).join("\n").trim();
|
||||
const bodyText = normalizeMention(baseText, botUsername);
|
||||
if (!bodyText) {
|
||||
logVerboseMessage(
|
||||
`mattermost: drop group message (empty body after normalization channel=${channelId} sender=${senderId})`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
core.channel.activity.record({
|
||||
channel: "mattermost",
|
||||
accountId: account.accountId,
|
||||
direction: "inbound",
|
||||
});
|
||||
|
||||
const fromLabel = formatInboundFromLabel({
|
||||
isGroup: kind !== "direct",
|
||||
groupLabel: channelDisplay || roomLabel,
|
||||
groupId: channelId,
|
||||
groupFallback: roomLabel || "Channel",
|
||||
directLabel: senderName,
|
||||
directId: senderId,
|
||||
});
|
||||
|
||||
const preview = bodyText.replace(/\s+/g, " ").slice(0, 160);
|
||||
const inboundLabel =
|
||||
kind === "direct"
|
||||
? `Mattermost DM from ${senderName}`
|
||||
: `Mattermost message in ${roomLabel} from ${senderName}`;
|
||||
core.system.enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
|
||||
sessionKey,
|
||||
contextKey: `mattermost:message:${channelId}:${post.id ?? "unknown"}`,
|
||||
});
|
||||
|
||||
const textWithId = `${bodyText}\n[mattermost message id: ${post.id ?? "unknown"} channel: ${channelId}]`;
|
||||
const body = core.channel.reply.formatInboundEnvelope({
|
||||
channel: "Mattermost",
|
||||
from: fromLabel,
|
||||
timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
|
||||
body: textWithId,
|
||||
chatType,
|
||||
sender: { name: senderName, id: senderId },
|
||||
});
|
||||
let combinedBody = body;
|
||||
if (historyKey) {
|
||||
combinedBody = buildPendingHistoryContextFromMap({
|
||||
historyMap: channelHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
currentMessage: combinedBody,
|
||||
formatEntry: (entry) =>
|
||||
core.channel.reply.formatInboundEnvelope({
|
||||
channel: "Mattermost",
|
||||
from: fromLabel,
|
||||
timestamp: entry.timestamp,
|
||||
body: `${entry.body}${
|
||||
entry.messageId ? ` [id:${entry.messageId} channel:${channelId}]` : ""
|
||||
}`,
|
||||
chatType,
|
||||
senderLabel: entry.sender,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
const to = kind === "direct" ? `user:${senderId}` : `channel:${channelId}`;
|
||||
const mediaPayload = buildAgentMediaPayload(mediaList);
|
||||
const commandBody = rawText.trim();
|
||||
const inboundHistory =
|
||||
historyKey && historyLimit > 0
|
||||
? (channelHistories.get(historyKey) ?? []).map((entry) => ({
|
||||
sender: entry.sender,
|
||||
body: entry.body,
|
||||
timestamp: entry.timestamp,
|
||||
}))
|
||||
: undefined;
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext({
|
||||
Body: combinedBody,
|
||||
BodyForAgent: bodyText,
|
||||
InboundHistory: inboundHistory,
|
||||
RawBody: bodyText,
|
||||
CommandBody: commandBody,
|
||||
BodyForCommands: commandBody,
|
||||
From:
|
||||
kind === "direct"
|
||||
? `mattermost:${senderId}`
|
||||
: kind === "group"
|
||||
? `mattermost:group:${channelId}`
|
||||
: `mattermost:channel:${channelId}`,
|
||||
To: to,
|
||||
SessionKey: sessionKey,
|
||||
ParentSessionKey: parentSessionKey,
|
||||
AccountId: route.accountId,
|
||||
ChatType: chatType,
|
||||
ConversationLabel: fromLabel,
|
||||
GroupSubject: kind !== "direct" ? channelDisplay || roomLabel : undefined,
|
||||
GroupChannel: channelName ? `#${channelName}` : undefined,
|
||||
GroupSpace: teamId,
|
||||
SenderName: senderName,
|
||||
SenderId: senderId,
|
||||
Provider: "mattermost" as const,
|
||||
Surface: "mattermost" as const,
|
||||
MessageSid: post.id ?? undefined,
|
||||
MessageSids: allMessageIds.length > 1 ? allMessageIds : undefined,
|
||||
MessageSidFirst: allMessageIds.length > 1 ? allMessageIds[0] : undefined,
|
||||
MessageSidLast:
|
||||
allMessageIds.length > 1 ? allMessageIds[allMessageIds.length - 1] : undefined,
|
||||
ReplyToId: effectiveReplyToId,
|
||||
MessageThreadId: effectiveReplyToId,
|
||||
Timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
|
||||
WasMentioned: kind !== "direct" ? mentionDecision.effectiveWasMentioned : undefined,
|
||||
CommandAuthorized: commandAuthorized,
|
||||
OriginatingChannel: "mattermost" as const,
|
||||
OriginatingTo: to,
|
||||
...mediaPayload,
|
||||
});
|
||||
|
||||
if (kind === "direct") {
|
||||
const sessionCfg = cfg.session;
|
||||
const storePath = core.channel.session.resolveStorePath(sessionCfg?.store, {
|
||||
agentId: route.agentId,
|
||||
});
|
||||
await core.channel.session.updateLastRoute({
|
||||
storePath,
|
||||
sessionKey: route.mainSessionKey,
|
||||
deliveryContext: {
|
||||
channel: "mattermost",
|
||||
to,
|
||||
accountId: route.accountId,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const previewLine = bodyText.slice(0, 200).replace(/\n/g, "\\n");
|
||||
logVerboseMessage(
|
||||
`mattermost inbound: from=${ctxPayload.From} len=${bodyText.length} preview="${previewLine}"`,
|
||||
);
|
||||
|
||||
const textLimit = core.channel.text.resolveTextChunkLimit(
|
||||
cfg,
|
||||
"mattermost",
|
||||
account.accountId,
|
||||
{
|
||||
fallbackLimit: account.textChunkLimit ?? 4000,
|
||||
},
|
||||
);
|
||||
const tableMode = core.channel.text.resolveMarkdownTableMode({
|
||||
cfg,
|
||||
channel: "mattermost",
|
||||
accountId: account.accountId,
|
||||
});
|
||||
|
||||
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
|
||||
cfg,
|
||||
agentId: route.agentId,
|
||||
channel: "mattermost",
|
||||
accountId: account.accountId,
|
||||
typing: {
|
||||
start: () => sendTypingIndicator(channelId, effectiveReplyToId),
|
||||
onStartError: (err) => {
|
||||
logTypingFailure({
|
||||
log: (message) => logger.debug?.(message),
|
||||
channel: "mattermost",
|
||||
target: channelId,
|
||||
error: err,
|
||||
});
|
||||
},
|
||||
},
|
||||
});
|
||||
const draftStream = createMattermostDraftStream({
|
||||
client,
|
||||
channelId,
|
||||
rootId: effectiveReplyToId,
|
||||
throttleMs: 1200,
|
||||
log: logVerboseMessage,
|
||||
warn: logVerboseMessage,
|
||||
});
|
||||
let lastPartialText = "";
|
||||
let finalizedViaPreviewPost = false;
|
||||
let previewCompletionNotePosted = false;
|
||||
type DeferredMattermostFinal = {
|
||||
payload: ReplyPayload;
|
||||
replyRootId?: string;
|
||||
previewPostId?: string;
|
||||
previewFinalText?: string;
|
||||
canFinalizeInPlace: boolean;
|
||||
};
|
||||
const deferredFinalReplies: DeferredMattermostFinal[] = [];
|
||||
|
||||
const resolvePreviewFinalText = (text?: string) => {
|
||||
if (typeof text !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const formatted = core.channel.text.convertMarkdownTables(text, tableMode);
|
||||
const chunkMode = core.channel.text.resolveChunkMode(cfg, "mattermost", account.accountId);
|
||||
const chunks = core.channel.text.chunkMarkdownTextWithMode(formatted, textLimit, chunkMode);
|
||||
if (!chunks.length && formatted) {
|
||||
chunks.push(formatted);
|
||||
}
|
||||
if (chunks.length !== 1) {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = chunks[0]?.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
lastPartialText &&
|
||||
lastPartialText.startsWith(trimmed) &&
|
||||
trimmed.length < lastPartialText.length
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed;
|
||||
};
|
||||
|
||||
const updateDraftPreviewToNormalSend = async (previewPostId?: string) => {
|
||||
if (
|
||||
previewCompletionNotePosted ||
|
||||
finalizedViaPreviewPost ||
|
||||
typeof previewPostId !== "string"
|
||||
) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await updateMattermostPost(client, previewPostId, {
|
||||
message: "↓ See below.",
|
||||
});
|
||||
previewCompletionNotePosted = true;
|
||||
} catch (err) {
|
||||
logVerboseMessage(
|
||||
`mattermost preview completion update failed; continuing with normal send (${String(err)})`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const deliverDeferredFinalReply = async (entry: DeferredMattermostFinal) => {
|
||||
await deliverMattermostReplyPayload({
|
||||
core,
|
||||
cfg,
|
||||
payload: entry.payload,
|
||||
to,
|
||||
accountId: account.accountId,
|
||||
agentId: route.agentId,
|
||||
replyToId: entry.replyRootId,
|
||||
textLimit,
|
||||
tableMode,
|
||||
sendMessage: sendMessageMattermost,
|
||||
});
|
||||
runtime.log?.(`delivered reply to ${to}`);
|
||||
};
|
||||
|
||||
const finalizeOrDeliverDeferredFinalReplies = async (finalCount: number) => {
|
||||
if (!deferredFinalReplies.length) {
|
||||
return;
|
||||
}
|
||||
await draftStream.flush();
|
||||
const pendingFinalReplies = deferredFinalReplies.splice(0);
|
||||
const firstFinal = pendingFinalReplies[0];
|
||||
if (
|
||||
finalCount === 1 &&
|
||||
firstFinal?.canFinalizeInPlace === true &&
|
||||
typeof firstFinal.previewPostId === "string" &&
|
||||
typeof firstFinal.previewFinalText === "string"
|
||||
) {
|
||||
try {
|
||||
await updateMattermostPost(client, firstFinal.previewPostId, {
|
||||
message: firstFinal.previewFinalText,
|
||||
});
|
||||
finalizedViaPreviewPost = true;
|
||||
return;
|
||||
} catch (err) {
|
||||
logVerboseMessage(
|
||||
`mattermost preview final edit failed; falling back to normal send (${String(err)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const previewPostId = pendingFinalReplies.find(
|
||||
(entry) => typeof entry.previewPostId === "string",
|
||||
)?.previewPostId;
|
||||
await updateDraftPreviewToNormalSend(previewPostId);
|
||||
let deliveredDeferredCount = 0;
|
||||
for (const entry of pendingFinalReplies) {
|
||||
await deliverDeferredFinalReply(entry);
|
||||
deliveredDeferredCount += 1;
|
||||
}
|
||||
if (
|
||||
didDeliverAllMattermostDeferredFinalReplies({
|
||||
deliveredCount: deliveredDeferredCount,
|
||||
deferredCount: pendingFinalReplies.length,
|
||||
})
|
||||
) {
|
||||
}
|
||||
};
|
||||
|
||||
const updateDraftFromPartial = (text?: string) => {
|
||||
const cleaned = text?.trim();
|
||||
if (!cleaned) {
|
||||
return;
|
||||
}
|
||||
if (cleaned === lastPartialText) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
lastPartialText &&
|
||||
lastPartialText.startsWith(cleaned) &&
|
||||
cleaned.length < lastPartialText.length
|
||||
) {
|
||||
return;
|
||||
}
|
||||
lastPartialText = cleaned;
|
||||
draftStream.update(cleaned);
|
||||
};
|
||||
|
||||
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
|
||||
core.channel.reply.createReplyDispatcherWithTyping({
|
||||
...replyPipeline,
|
||||
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
|
||||
typingCallbacks,
|
||||
deliver: async (payload: ReplyPayload, info) => {
|
||||
if (payload.isReasoning) {
|
||||
return;
|
||||
}
|
||||
const isFinal = info.kind === "final";
|
||||
if (isFinal) {
|
||||
await draftStream.flush();
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
const previewFinalText = resolvePreviewFinalText(payload.text);
|
||||
const previewPostId = draftStream.postId();
|
||||
deferredFinalReplies.push({
|
||||
payload,
|
||||
replyRootId: resolveMattermostReplyRootId({
|
||||
threadRootId: effectiveReplyToId,
|
||||
replyToId: payload.replyToId,
|
||||
}),
|
||||
previewPostId,
|
||||
previewFinalText,
|
||||
canFinalizeInPlace:
|
||||
typeof previewPostId === "string" &&
|
||||
!hasMedia &&
|
||||
typeof previewFinalText === "string" &&
|
||||
!payload.isError,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await deliverMattermostReplyPayload({
|
||||
core,
|
||||
cfg,
|
||||
payload,
|
||||
to,
|
||||
accountId: account.accountId,
|
||||
agentId: route.agentId,
|
||||
replyToId: resolveMattermostReplyRootId({
|
||||
threadRootId: effectiveReplyToId,
|
||||
replyToId: payload.replyToId,
|
||||
}),
|
||||
textLimit,
|
||||
tableMode,
|
||||
sendMessage: sendMessageMattermost,
|
||||
});
|
||||
runtime.log?.(`delivered reply to ${to}`);
|
||||
},
|
||||
onError: (err, info) => {
|
||||
runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`);
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
const dispatchResult = await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher,
|
||||
onSettled: () => {
|
||||
markDispatchIdle();
|
||||
},
|
||||
run: () =>
|
||||
core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
disableBlockStreaming: true,
|
||||
onModelSelected,
|
||||
onPartialReply: (payload) => {
|
||||
updateDraftFromPartial(payload.text);
|
||||
},
|
||||
onAssistantMessageStart: () => {
|
||||
lastPartialText = "";
|
||||
},
|
||||
onReasoningEnd: () => {
|
||||
lastPartialText = "";
|
||||
},
|
||||
onReasoningStream: async () => {
|
||||
if (!lastPartialText) {
|
||||
draftStream.update("Thinking…");
|
||||
}
|
||||
},
|
||||
onToolStart: async (payload) => {
|
||||
draftStream.update(buildMattermostToolStatusText(payload));
|
||||
},
|
||||
},
|
||||
}),
|
||||
});
|
||||
await finalizeOrDeliverDeferredFinalReplies(dispatchResult.counts.final);
|
||||
} finally {
|
||||
try {
|
||||
await draftStream.stop();
|
||||
} catch (err) {
|
||||
logVerboseMessage(`mattermost draft preview cleanup failed: ${String(err)}`);
|
||||
}
|
||||
markRunComplete();
|
||||
}
|
||||
if (historyKey) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: channelHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const handleReactionEvent = async (payload: MattermostEventPayload) => {
|
||||
|
||||
Reference in New Issue
Block a user