diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 4c29ce17071..e74b1fb69d9 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -967,6 +967,7 @@ export async function startGatewayServer( lastChannel: sessionRow.lastChannel, lastTo: sessionRow.lastTo, lastAccountId: sessionRow.lastAccountId, + lastThreadId: sessionRow.lastThreadId, totalTokens: sessionRow.totalTokens, totalTokensFresh: sessionRow.totalTokensFresh, contextTokens: sessionRow.contextTokens, @@ -1062,6 +1063,7 @@ export async function startGatewayServer( lastChannel: sessionRow.lastChannel, lastTo: sessionRow.lastTo, lastAccountId: sessionRow.lastAccountId, + lastThreadId: sessionRow.lastThreadId, totalTokens: sessionRow.totalTokens, totalTokensFresh: sessionRow.totalTokensFresh, contextTokens: sessionRow.contextTokens, diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts index 89856254651..2269207d1b1 100644 --- a/src/gateway/session-message-events.test.ts +++ b/src/gateway/session-message-events.test.ts @@ -385,6 +385,84 @@ describe("session.message websocket events", () => { } }); + test("includes route thread metadata on session.message and sessions.changed transcript events", async () => { + const storePath = await createSessionStoreFile(); + const transcriptPath = path.join(path.dirname(storePath), "sess-thread.jsonl"); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-thread", + sessionFile: transcriptPath, + updatedAt: Date.now(), + lastChannel: "telegram", + lastTo: "-100123", + lastAccountId: "acct-1", + lastThreadId: 42, + }, + }, + storePath, + }); + const transcriptMessage = { + role: "assistant", + content: [{ type: "text", text: "thread route snapshot" }], + timestamp: Date.now(), + }; + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ type: "session", version: 1, id: "sess-thread" }), + JSON.stringify({ id: "msg-thread", message: transcriptMessage }), + ].join("\n"), + "utf-8", + ); + + const harness = await createGatewaySuiteHarness(); + try { + await withOperatorSessionSubscriber(harness, async (ws) => { + const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main"); + const changedEventPromise = onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "sessions.changed" && + (message.payload as { phase?: string; sessionKey?: string } | undefined)?.phase === + "message" && + (message.payload as { sessionKey?: string } | undefined)?.sessionKey === + "agent:main:main", + ); + + emitSessionTranscriptUpdate({ + sessionFile: transcriptPath, + sessionKey: "agent:main:main", + message: transcriptMessage, + messageId: "msg-thread", + }); + + const [messageEvent, changedEvent] = await Promise.all([ + messageEventPromise, + changedEventPromise, + ]); + expect(messageEvent.payload).toMatchObject({ + sessionKey: "agent:main:main", + lastChannel: "telegram", + lastTo: "-100123", + lastAccountId: "acct-1", + lastThreadId: 42, + }); + expect(changedEvent.payload).toMatchObject({ + sessionKey: "agent:main:main", + phase: "message", + lastChannel: "telegram", + lastTo: "-100123", + lastAccountId: "acct-1", + lastThreadId: 42, + }); + }); + } finally { + await harness.close(); + } + }); + test("sessions.messages.subscribe only delivers transcript events for the requested session", async () => { const storePath = await createSessionStoreFile(); await writeSessionStore({