fix: preserve canonical restart sentinel routes (#64391)

Merged via squash.

Prepared head SHA: 0183c1782f
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana
2026-04-10 12:44:07 -04:00
committed by GitHub
parent dffad08529
commit 9c44f10026
5 changed files with 180 additions and 24 deletions

View File

@@ -161,4 +161,58 @@ describe("extractDeliveryInfo", () => {
threadId: undefined,
});
});
it("derives delivery info from stored last route metadata when deliveryContext is missing", () => {
const sessionKey = "agent:main:matrix:channel:!lowercased:example.org";
storeState.store[sessionKey] = {
sessionId: "session-1",
updatedAt: Date.now(),
origin: {
provider: "matrix",
},
lastChannel: "matrix",
lastTo: "room:!MixedCase:example.org",
};
const result = extractDeliveryInfo(sessionKey);
expect(result).toEqual({
deliveryContext: {
channel: "matrix",
to: "room:!MixedCase:example.org",
accountId: undefined,
},
threadId: undefined,
});
});
it("falls back to the base session when a thread entry only has partial route metadata", () => {
const baseKey = "agent:main:matrix:channel:!MixedCase:example.org";
const threadKey = `${baseKey}:thread:$thread-event`;
storeState.store[threadKey] = {
sessionId: "thread-session",
updatedAt: Date.now(),
origin: {
provider: "matrix",
threadId: "$thread-event",
},
};
storeState.store[baseKey] = {
sessionId: "base-session",
updatedAt: Date.now(),
lastChannel: "matrix",
lastTo: "room:!MixedCase:example.org",
};
const result = extractDeliveryInfo(threadKey);
expect(result).toEqual({
deliveryContext: {
channel: "matrix",
to: "room:!MixedCase:example.org",
accountId: undefined,
},
threadId: "$thread-event",
});
});
});

View File

@@ -1,3 +1,4 @@
import { deliveryContextFromSession } from "../../utils/delivery-context.js";
import { loadConfig } from "../io.js";
import { resolveStorePath } from "./paths.js";
import { loadSessionStore } from "./store.js";
@@ -10,6 +11,17 @@ export function extractDeliveryInfo(sessionKey: string | undefined): {
| undefined;
threadId: string | undefined;
} {
const hasRoutableDeliveryContext = (context?: {
channel?: string;
to?: string;
accountId?: string;
threadId?: string | number;
}): context is {
channel: string;
to: string;
accountId?: string;
threadId?: string | number;
} => Boolean(context?.channel && context?.to);
const { baseSessionKey, threadId } = parseSessionThreadInfo(sessionKey);
if (!sessionKey || !baseSessionKey) {
return { deliveryContext: undefined, threadId };
@@ -23,17 +35,20 @@ export function extractDeliveryInfo(sessionKey: string | undefined): {
const storePath = resolveStorePath(cfg.session?.store);
const store = loadSessionStore(storePath);
let entry = store[sessionKey];
if (!entry?.deliveryContext && baseSessionKey !== sessionKey) {
let storedDeliveryContext = deliveryContextFromSession(entry);
if (!hasRoutableDeliveryContext(storedDeliveryContext) && baseSessionKey !== sessionKey) {
entry = store[baseSessionKey];
storedDeliveryContext = deliveryContextFromSession(entry);
}
if (entry?.deliveryContext) {
const resolvedThreadId =
entry.deliveryContext.threadId ?? entry.lastThreadId ?? entry.origin?.threadId;
if (hasRoutableDeliveryContext(storedDeliveryContext)) {
deliveryContext = {
channel: entry.deliveryContext.channel,
to: entry.deliveryContext.to,
accountId: entry.deliveryContext.accountId,
threadId: resolvedThreadId != null ? String(resolvedThreadId) : undefined,
channel: storedDeliveryContext.channel,
to: storedDeliveryContext.to,
accountId: storedDeliveryContext.accountId,
threadId:
storedDeliveryContext.threadId != null
? String(storedDeliveryContext.threadId)
: undefined,
};
}
} catch {

View File

@@ -16,10 +16,18 @@ const mocks = vi.hoisted(() => ({
formatRestartSentinelMessage: vi.fn(() => "restart message"),
summarizeRestartSentinel: vi.fn(() => "restart summary"),
resolveMainSessionKeyFromConfig: vi.fn(() => "agent:main:main"),
parseSessionThreadInfo: vi.fn(() => ({ baseSessionKey: null, threadId: undefined })),
parseSessionThreadInfo: vi.fn(
(): { baseSessionKey: string | null | undefined; threadId: string | undefined } => ({
baseSessionKey: null,
threadId: undefined,
}),
),
loadSessionEntry: vi.fn(() => ({ cfg: {}, entry: {} })),
resolveAnnounceTargetFromKey: vi.fn(() => null),
deliveryContextFromSession: vi.fn(() => undefined),
deliveryContextFromSession: vi.fn(
():
| { channel?: string; to?: string; accountId?: string; threadId?: string | number }
| undefined => undefined,
),
mergeDeliveryContext: vi.fn((a?: Record<string, unknown>, b?: Record<string, unknown>) => ({
...b,
...a,
@@ -50,7 +58,7 @@ vi.mock("../config/sessions.js", () => ({
resolveMainSessionKeyFromConfig: mocks.resolveMainSessionKeyFromConfig,
}));
vi.mock("../config/sessions/delivery-info.js", () => ({
vi.mock("../config/sessions/thread-info.js", () => ({
parseSessionThreadInfo: mocks.parseSessionThreadInfo,
}));
@@ -58,10 +66,6 @@ vi.mock("./session-utils.js", () => ({
loadSessionEntry: mocks.loadSessionEntry,
}));
vi.mock("../agents/tools/sessions-send-helpers.js", () => ({
resolveAnnounceTargetFromKey: mocks.resolveAnnounceTargetFromKey,
}));
vi.mock("../utils/delivery-context.js", () => ({
deliveryContextFromSession: mocks.deliveryContextFromSession,
mergeDeliveryContext: mocks.mergeDeliveryContext,
@@ -126,6 +130,14 @@ describe("scheduleRestartSentinelWake", () => {
},
},
});
mocks.parseSessionThreadInfo.mockReset();
mocks.parseSessionThreadInfo.mockReturnValue({ baseSessionKey: null, threadId: undefined });
mocks.loadSessionEntry.mockReset();
mocks.loadSessionEntry.mockReturnValue({ cfg: {}, entry: {} });
mocks.deliveryContextFromSession.mockReset();
mocks.deliveryContextFromSession.mockReturnValue(undefined);
mocks.resolveOutboundTarget.mockReset();
mocks.resolveOutboundTarget.mockReturnValue({ ok: true as const, to: "+15550002" });
mocks.deliverOutboundPayloads.mockReset();
mocks.deliverOutboundPayloads.mockResolvedValue([{ channel: "whatsapp", messageId: "msg-1" }]);
mocks.enqueueDelivery.mockReset();
@@ -278,4 +290,73 @@ describe("scheduleRestartSentinelWake", () => {
expect(mocks.requestHeartbeatNow).not.toHaveBeenCalled();
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
});
it("skips outbound restart notice when no canonical delivery context survives restart", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:matrix:channel:!lowercased:example.org",
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.parseSessionThreadInfo.mockReturnValue({
baseSessionKey: "agent:main:matrix:channel:!lowercased:example.org",
threadId: undefined,
});
mocks.deliveryContextFromSession.mockReturnValue(undefined);
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
"restart message",
expect.objectContaining({
sessionKey: "agent:main:matrix:channel:!lowercased:example.org",
}),
);
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
expect(mocks.enqueueDelivery).not.toHaveBeenCalled();
expect(mocks.resolveOutboundTarget).not.toHaveBeenCalled();
});
it("falls back to the base session when the thread entry only has partial route metadata", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event",
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.parseSessionThreadInfo.mockReturnValue({
baseSessionKey: "agent:main:matrix:channel:!lowercased:example.org",
threadId: "$thread-event",
});
mocks.loadSessionEntry
.mockReturnValueOnce({
cfg: {},
entry: { origin: { provider: "matrix", threadId: "$thread-event" } },
})
.mockReturnValueOnce({
cfg: {},
entry: { lastChannel: "matrix", lastTo: "room:!MixedCase:example.org" },
});
mocks.deliveryContextFromSession
.mockReturnValueOnce({ channel: "matrix", threadId: "$thread-event" })
.mockReturnValueOnce({ channel: "matrix", to: "room:!MixedCase:example.org" });
mocks.resolveOutboundTarget.mockReturnValue({
ok: true as const,
to: "room:!MixedCase:example.org",
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
to: "room:!MixedCase:example.org",
}),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
to: "room:!MixedCase:example.org",
threadId: "$thread-event",
}),
);
});
});

View File

@@ -1,4 +1,3 @@
import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js";
import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js";
import type { CliDeps } from "../cli/deps.js";
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
@@ -23,6 +22,13 @@ const log = createSubsystemLogger("gateway/restart-sentinel");
const OUTBOUND_RETRY_DELAY_MS = 750;
const OUTBOUND_MAX_ATTEMPTS = 2;
function hasRoutableDeliveryContext(context?: {
channel?: string;
to?: string;
}): context is { channel: string; to: string } {
return Boolean(context?.channel && context?.to);
}
function enqueueRestartSentinelWake(
message: string,
sessionKey: string,
@@ -144,21 +150,21 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey);
const { cfg, entry } = loadSessionEntry(sessionKey);
const parsedTarget = resolveAnnounceTargetFromKey(baseSessionKey ?? sessionKey);
// Prefer delivery context from sentinel (captured at restart) over session store
// Handles race condition where store wasn't flushed before restart
const sentinelContext = payload.deliveryContext;
let sessionDeliveryContext = deliveryContextFromSession(entry);
if (!sessionDeliveryContext && baseSessionKey && baseSessionKey !== sessionKey) {
if (
!hasRoutableDeliveryContext(sessionDeliveryContext) &&
baseSessionKey &&
baseSessionKey !== sessionKey
) {
const { entry: baseEntry } = loadSessionEntry(baseSessionKey);
sessionDeliveryContext = deliveryContextFromSession(baseEntry);
}
const origin = mergeDeliveryContext(
sentinelContext,
mergeDeliveryContext(sessionDeliveryContext, parsedTarget ?? undefined),
);
const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext);
const channelRaw = origin?.channel;
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
@@ -180,7 +186,6 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
const threadId =
payload.threadId ??
parsedTarget?.threadId ?? // From resolveAnnounceTargetFromKey (extracts :topic:N)
sessionThreadId ??
(origin?.threadId != null ? String(origin.threadId) : undefined);