refactor(channels): add shared turn kernel

This commit is contained in:
Peter Steinberger
2026-04-29 22:28:47 +01:00
parent 4396361f35
commit 9a9cd0c0ab
62 changed files with 3449 additions and 1333 deletions

View File

@@ -119,6 +119,29 @@ export function createMatrixHandlerTestHarness(
counts: { final: 0, block: 0, tool: 0 },
}));
const enqueueSystemEvent = options.enqueueSystemEvent ?? vi.fn();
const runPrepared = vi.fn(
async (
turn: Parameters<MatrixMonitorHandlerParams["core"]["channel"]["turn"]["runPrepared"]>[0],
) => {
await turn.recordInboundSession({
storePath: turn.storePath,
sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey,
ctx: turn.ctxPayload,
groupResolution: turn.record?.groupResolution,
createIfMissing: turn.record?.createIfMissing,
updateLastRoute: turn.record?.updateLastRoute,
onRecordError: turn.record?.onRecordError ?? (() => undefined),
});
const dispatchResult = await turn.runDispatch();
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: turn.ctxPayload,
routeSessionKey: turn.routeSessionKey,
dispatchResult,
};
},
);
const dmPolicy = options.dmPolicy ?? "open";
const allowFrom = options.allowFrom ?? (dmPolicy === "open" ? ["*"] : []);
const cfgForHandler =
@@ -205,6 +228,10 @@ export function createMatrixHandlerTestHarness(
}
}),
},
turn: {
runPrepared,
dispatchAssembled: vi.fn(),
},
reactions: {
shouldAckReaction: options.shouldAckReaction ?? (() => false),
},

View File

@@ -1352,40 +1352,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
OriginatingTo: `room:${roomId}`,
});
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: _route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: _route.accountId,
}
: undefined,
onRecordError: (err) => {
logger.warn("failed updating session meta", {
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
});
},
});
if (sharedDmContextNotice && markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId)) {
client
.sendMessage(roomId, {
msgtype: "m.notice",
body: sharedDmContextNotice,
})
.catch((err) => {
logVerboseMessage(
`matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`,
);
});
}
const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n");
logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`);
@@ -1862,58 +1828,107 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
onIdle: typingCallbacks.onIdle,
});
const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: async () => {
try {
return await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
// Keep block streaming enabled when explicitly requested, even
// with draft previews on. The draft remains the live preview
// for the current assistant block, while block deliveries
// finalize completed blocks into their own preserved events.
disableBlockStreaming: !blockStreamingEnabled,
onPartialReply: draftStream
? (payload) => {
latestDraftFullText = payload.text ?? "";
suppressPreviewToolProgressForAnswerText(latestDraftFullText);
updateDraftFromLatestFullText();
}
: undefined,
onBlockReplyQueued: draftStream
? (payload, context) => {
if (payload.isCompactionNotice === true) {
return;
}
queueDraftBlockBoundary(payload, context);
}
: undefined,
// Reset draft boundary bookkeeping on assistant message
// boundaries so post-tool blocks stream from a fresh
// cumulative payload (payload.text resets upstream).
onAssistantMessageStart: draftStream
? () => {
resetDraftBlockOffsets();
resetPreviewToolProgress();
}
: undefined,
...buildPreviewToolProgressReplyOptions(),
onModelSelected,
},
const { dispatchResult } = await core.channel.turn.runPrepared({
channel: "matrix",
accountId: _route.accountId,
routeSessionKey: _route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
updateLastRoute: isDirectMessage
? {
sessionKey: _route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: _route.accountId,
}
: undefined,
onRecordError: (err) => {
logger.warn("failed updating session meta", {
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
});
} finally {
markRunComplete();
},
},
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => {
markRunComplete();
markDispatchIdle();
},
}),
runDispatch: async () => {
if (sharedDmContextNotice && markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId)) {
client
.sendMessage(roomId, {
msgtype: "m.notice",
body: sharedDmContextNotice,
})
.catch((err) => {
logVerboseMessage(
`matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`,
);
});
}
return await core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: async () => {
try {
return await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
// Keep block streaming enabled when explicitly requested, even
// with draft previews on. The draft remains the live preview
// for the current assistant block, while block deliveries
// finalize completed blocks into their own preserved events.
disableBlockStreaming: !blockStreamingEnabled,
onPartialReply: draftStream
? (payload) => {
latestDraftFullText = payload.text ?? "";
suppressPreviewToolProgressForAnswerText(latestDraftFullText);
updateDraftFromLatestFullText();
}
: undefined,
onBlockReplyQueued: draftStream
? (payload, context) => {
if (payload.isCompactionNotice === true) {
return;
}
queueDraftBlockBoundary(payload, context);
}
: undefined,
// Reset draft boundary bookkeeping on assistant message
// boundaries so post-tool blocks stream from a fresh
// cumulative payload (payload.text resets upstream).
onAssistantMessageStart: draftStream
? () => {
resetDraftBlockOffsets();
resetPreviewToolProgress();
}
: undefined,
...buildPreviewToolProgressReplyOptions(),
onModelSelected,
},
});
} finally {
markRunComplete();
}
},
});
},
});
const { queuedFinal, counts } = dispatchResult;
if (finalReplyDeliveryFailed) {
if (retryableReplyDeliveryFailed) {
logVerboseMessage(