diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index 67bc4af93ba..37d72ec6a22 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -83,6 +83,77 @@ function createDurableSendResult(messageIds: string[]) { }; } +type DurableSendRequest = { + accountId?: string; + channel?: string; + durability?: string; + payloads?: ReplyPayload[]; + replyToMode?: string; + session?: { + key?: string; + agentId?: string; + requesterAccountId?: string; + requesterSenderId?: string; + conversationType?: string; + }; + threadId?: string | number | null; + to?: string; +}; + +type DurableSupportRequest = { + channel?: string; + requirements?: Record; +}; + +type DeliveryResult = { + messageIds?: string[]; + receipt?: { platformMessageIds?: string[] }; + visibleReplySent?: boolean; +}; + +type FinalizeResult = { + admission?: unknown; + dispatched?: boolean; + routeSessionKey?: string; +}; + +type TurnLogEvent = { + event?: string; + messageId?: string; + stage?: string; +}; + +function latestDurableSendRequest(): DurableSendRequest { + const [request] = sendDurableMessageBatch.mock.calls.at(-1) as unknown as [DurableSendRequest]; + return request; +} + +function latestDurableSupportRequest(): DurableSupportRequest { + const [request] = resolveOutboundDurableFinalDeliverySupport.mock.calls.at(-1) as unknown as [ + DurableSupportRequest, + ]; + return request; +} + +function deliveryResult(value: unknown): DeliveryResult { + return value as DeliveryResult; +} + +function finalizeResult(value: unknown): FinalizeResult { + return value as FinalizeResult; +} + +function loggedEvents(log: ReturnType): TurnLogEvent[] { + return log.mock.calls.map(([event]) => { + const entry = event as TurnLogEvent; + return { + stage: entry.stage, + event: entry.event, + ...(entry.messageId === undefined ? {} : { messageId: entry.messageId }), + }; + }); +} + describe("channel turn kernel", () => { beforeEach(() => { vi.clearAllMocks(); @@ -117,34 +188,30 @@ describe("channel turn kernel", () => { expect(result.dispatched).toBe(true); expect(deliver).not.toHaveBeenCalled(); - expect(sendDurableMessageBatch).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "telegram", - to: "123", - accountId: "acct", - payloads: [expect.objectContaining({ text: "reply" })], - durability: "best_effort", - replyToMode: "first", - threadId: 777, - session: expect.objectContaining({ - key: "agent:main:test:peer", - agentId: "main", - requesterAccountId: "acct", - requesterSenderId: "sender-1", - conversationType: "group", - }), - }), - ); - expect(resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "telegram", - requirements: { - text: true, - thread: true, - messageSendingHooks: true, - }, - }), - ); + expect(sendDurableMessageBatch).toHaveBeenCalledTimes(1); + const sendRequest = latestDurableSendRequest(); + expect(sendRequest.channel).toBe("telegram"); + expect(sendRequest.to).toBe("123"); + expect(sendRequest.accountId).toBe("acct"); + expect(sendRequest.payloads?.[0]?.text).toBe("reply"); + expect(sendRequest.durability).toBe("best_effort"); + expect(sendRequest.replyToMode).toBe("first"); + expect(sendRequest.threadId).toBe(777); + expect(sendRequest.session).toEqual({ + key: "agent:main:test:peer", + agentId: "main", + requesterAccountId: "acct", + requesterSenderId: "sender-1", + conversationType: "group", + }); + expect(resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledTimes(1); + const supportRequest = latestDurableSupportRequest(); + expect(supportRequest.channel).toBe("telegram"); + expect(supportRequest.requirements).toEqual({ + text: true, + thread: true, + messageSendingHooks: true, + }); }); it("returns durable delivery result to the buffered dispatcher", async () => { @@ -176,15 +243,10 @@ describe("channel turn kernel", () => { delivery: { deliver: vi.fn(), durable: { replyToMode: "first" } }, }); - expect(deliveredResult).toEqual( - expect.objectContaining({ - messageIds: ["tg-1", "tg-2"], - receipt: expect.objectContaining({ - platformMessageIds: ["tg-1", "tg-2"], - }), - visibleReplySent: true, - }), - ); + const delivered = deliveryResult(deliveredResult); + expect(delivered.messageIds).toEqual(["tg-1", "tg-2"]); + expect(delivered.receipt?.platformMessageIds).toEqual(["tg-1", "tg-2"]); + expect(delivered.visibleReplySent).toBe(true); }); it("prepares payloads before durable enqueue and observes handled delivery", async () => { @@ -216,23 +278,18 @@ describe("channel turn kernel", () => { }, }); - expect(sendDurableMessageBatch).toHaveBeenCalledWith( - expect.objectContaining({ - payloads: [expect.objectContaining({ text: "reply\n\n_[Generated by test]_" })], - }), - ); - expect(resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledWith( - expect.objectContaining({ - requirements: { - text: true, - }, - }), - ); - expect(onDelivered).toHaveBeenCalledWith( - expect.objectContaining({ text: "reply\n\n_[Generated by test]_" }), - { kind: "final" }, - expect.objectContaining({ visibleReplySent: true }), - ); + expect(sendDurableMessageBatch).toHaveBeenCalledTimes(1); + expect(latestDurableSendRequest().payloads?.[0]?.text).toBe("reply\n\n_[Generated by test]_"); + expect(resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledTimes(1); + expect(latestDurableSupportRequest().requirements).toEqual({ + text: true, + }); + expect(onDelivered).toHaveBeenCalledTimes(1); + const [deliveredPayload, deliveredInfo, deliveredResult] = onDelivered.mock + .calls[0] as unknown as [ReplyPayload, unknown, DeliveryResult]; + expect(deliveredPayload.text).toBe("reply\n\n_[Generated by test]_"); + expect(deliveredInfo).toEqual({ kind: "final" }); + expect(deliveredResult.visibleReplySent).toBe(true); }); it("falls back before queueing when durable outbound delivery is unsupported", async () => { @@ -268,23 +325,18 @@ describe("channel turn kernel", () => { delivery: { deliver, durable: { replyToMode: "first" } }, }); - expect(resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "telegram", - requirements: { - text: true, - messageSendingHooks: true, - }, - }), - ); + expect(resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledTimes(1); + const supportRequest = latestDurableSupportRequest(); + expect(supportRequest.channel).toBe("telegram"); + expect(supportRequest.requirements).toEqual({ + text: true, + messageSendingHooks: true, + }); expect(deliverOutboundPayloads).not.toHaveBeenCalled(); expect(deliver).toHaveBeenCalledWith({ text: "reply" }, { kind: "final" }); - expect(deliveredResult).toEqual( - expect.objectContaining({ - messageIds: ["legacy-1"], - visibleReplySent: true, - }), - ); + const delivered = deliveryResult(deliveredResult); + expect(delivered.messageIds).toEqual(["legacy-1"]); + expect(delivered.visibleReplySent).toBe(true); }); it("treats durable outbound support preflight failures as terminal", async () => { @@ -341,12 +393,9 @@ describe("channel turn kernel", () => { }, }); - expect(deliveredResult).toEqual( - expect.objectContaining({ - messageIds: ["local-1"], - visibleReplySent: true, - }), - ); + const delivered = deliveryResult(deliveredResult); + expect(delivered.messageIds).toEqual(["local-1"]); + expect(delivered.visibleReplySent).toBe(true); }); it("does not use durable outbound delivery when durable options are omitted", async () => { @@ -392,11 +441,13 @@ describe("channel turn kernel", () => { }); expect(deliver).toHaveBeenCalledWith({ text: "reply!" }, { kind: "final" }); - expect(onDelivered).toHaveBeenCalledWith( - { text: "reply!" }, - { kind: "final" }, - expect.objectContaining({ messageIds: ["local-1"], visibleReplySent: true }), - ); + expect(onDelivered).toHaveBeenCalledTimes(1); + const [deliveredPayload, deliveredInfo, deliveredResult] = onDelivered.mock + .calls[0] as unknown as [ReplyPayload, unknown, DeliveryResult]; + expect(deliveredPayload).toEqual({ text: "reply!" }); + expect(deliveredInfo).toEqual({ kind: "final" }); + expect(deliveredResult.messageIds).toEqual(["local-1"]); + expect(deliveredResult.visibleReplySent).toBe(true); }); it("assembles channel message reply pipeline options inside the turn kernel", async () => { @@ -461,12 +512,11 @@ describe("channel turn kernel", () => { expect(result.dispatched).toBe(true); expect(result.dispatchResult?.counts.final).toBe(1); expect(events).toEqual(["record", "dispatch", "deliver"]); - expect(recordInboundSession).toHaveBeenCalledWith( - expect.objectContaining({ - sessionKey: "agent:main:test:peer", - storePath: "/tmp/sessions.json", - }), - ); + expect(recordInboundSession).toHaveBeenCalledTimes(1); + const [recordRequest] = (recordInboundSession as unknown as ReturnType).mock + .calls[0] as unknown as [{ sessionKey?: string; storePath?: string }]; + expect(recordRequest.sessionKey).toBe("agent:main:test:peer"); + expect(recordRequest.storePath).toBe("/tmp/sessions.json"); expect(deliver).toHaveBeenCalledWith({ text: "reply" }, { kind: "final" }); }); @@ -498,18 +548,12 @@ describe("channel turn kernel", () => { expect(events).toEqual(["record", "dispatch"]); expect(result.dispatchResult?.queuedFinal).toBe(true); - expect(log).toHaveBeenCalledWith( - expect.objectContaining({ stage: "record", event: "start", messageId: "msg-1" }), - ); - expect(log).toHaveBeenCalledWith( - expect.objectContaining({ stage: "record", event: "done", messageId: "msg-1" }), - ); - expect(log).toHaveBeenCalledWith( - expect.objectContaining({ stage: "dispatch", event: "start", messageId: "msg-1" }), - ); - expect(log).toHaveBeenCalledWith( - expect.objectContaining({ stage: "dispatch", event: "done", messageId: "msg-1" }), - ); + expect(loggedEvents(log)).toEqual([ + { stage: "record", event: "start", messageId: "msg-1" }, + { stage: "record", event: "done", messageId: "msg-1" }, + { stage: "dispatch", event: "start", messageId: "msg-1" }, + { stage: "dispatch", event: "done", messageId: "msg-1" }, + ]); }); it("suppresses direct prepared dispatches for observe-only admission", async () => { @@ -602,7 +646,7 @@ describe("channel turn kernel", () => { expect(events).toEqual(["record", "cleanup"]); expect(runDispatch).not.toHaveBeenCalled(); expect(onPreDispatchFailure).toHaveBeenCalledWith(recordError); - expect(log).toHaveBeenCalledWith(expect.objectContaining({ stage: "record", event: "error" })); + expect(loggedEvents(log)).toContainEqual({ stage: "record", event: "error" }); }); it("normalizes visible dispatch checks", () => { @@ -720,13 +764,15 @@ describe("channel turn kernel", () => { expect(result.dispatched).toBe(true); expect(events).toEqual(["record", "dispatch"]); expect(deliver).not.toHaveBeenCalled(); - expect(onFinalize).toHaveBeenCalledWith( - expect.objectContaining({ - admission: { kind: "observeOnly", reason: "broadcast-observer" }, - dispatched: true, - routeSessionKey: "agent:observer:test:peer", - }), - ); + expect(onFinalize).toHaveBeenCalledTimes(1); + const [finalized] = onFinalize.mock.calls[0] as unknown as [unknown]; + const finalizedResult = finalizeResult(finalized); + expect(finalizedResult.admission).toEqual({ + kind: "observeOnly", + reason: "broadcast-observer", + }); + expect(finalizedResult.dispatched).toBe(true); + expect(finalizedResult.routeSessionKey).toBe("agent:observer:test:peer"); }); it("runs custom prepared dispatch from a full turn adapter", async () => { @@ -797,12 +843,14 @@ describe("channel turn kernel", () => { throw new Error("expected dispatch"); } expect(hasFinalChannelTurnDispatch(result.dispatchResult)).toBe(false); - expect(onFinalize).toHaveBeenCalledWith( - expect.objectContaining({ - admission: { kind: "observeOnly", reason: "broadcast-observer" }, - dispatched: true, - }), - ); + expect(onFinalize).toHaveBeenCalledTimes(1); + const [finalized] = onFinalize.mock.calls[0] as unknown as [unknown]; + const finalizedResult = finalizeResult(finalized); + expect(finalizedResult.admission).toEqual({ + kind: "observeOnly", + reason: "broadcast-observer", + }); + expect(finalizedResult.dispatched).toBe(true); }); it("finalizes failed dispatches before rethrowing", async () => { @@ -837,12 +885,11 @@ describe("channel turn kernel", () => { }), ).rejects.toThrow(dispatchError); - expect(onFinalize).toHaveBeenCalledWith( - expect.objectContaining({ - admission: { kind: "dispatch" }, - dispatched: false, - routeSessionKey: "agent:main:test:peer", - }), - ); + expect(onFinalize).toHaveBeenCalledTimes(1); + const [finalized] = onFinalize.mock.calls[0] as unknown as [unknown]; + const finalizedResult = finalizeResult(finalized); + expect(finalizedResult.admission).toEqual({ kind: "dispatch" }); + expect(finalizedResult.dispatched).toBe(false); + expect(finalizedResult.routeSessionKey).toBe("agent:main:test:peer"); }); });