mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:20:43 +00:00
fix: speed up exact session lookups
This commit is contained in:
@@ -147,7 +147,7 @@ The current bridge exposes these MCP tools:
|
||||
|
||||
</Accordion>
|
||||
<Accordion title="conversation_get">
|
||||
Returns one conversation by `session_key`.
|
||||
Returns one conversation by `session_key` using a direct Gateway session lookup.
|
||||
</Accordion>
|
||||
<Accordion title="messages_read">
|
||||
Reads recent transcript messages for one session-backed conversation.
|
||||
|
||||
@@ -399,6 +399,7 @@ enumeration of `src/gateway/server-methods/*.ts`.
|
||||
- `sessions.subscribe` and `sessions.unsubscribe` toggle session change event subscriptions for the current WS client.
|
||||
- `sessions.messages.subscribe` and `sessions.messages.unsubscribe` toggle transcript/message event subscriptions for one session.
|
||||
- `sessions.preview` returns bounded transcript previews for specific session keys.
|
||||
- `sessions.describe` returns one Gateway session row for an exact session key.
|
||||
- `sessions.resolve` resolves or canonicalizes a session target.
|
||||
- `sessions.create` creates a new session entry.
|
||||
- `sessions.send` sends a message into an existing session.
|
||||
|
||||
@@ -97,6 +97,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"sessions.list",
|
||||
"sessions.get",
|
||||
"sessions.preview",
|
||||
"sessions.describe",
|
||||
"sessions.resolve",
|
||||
"sessions.compaction.list",
|
||||
"sessions.compaction.get",
|
||||
|
||||
@@ -270,6 +270,8 @@ import {
|
||||
SessionsCreateParamsSchema,
|
||||
type SessionsDeleteParams,
|
||||
SessionsDeleteParamsSchema,
|
||||
type SessionsDescribeParams,
|
||||
SessionsDescribeParamsSchema,
|
||||
type SessionsListParams,
|
||||
SessionsListParamsSchema,
|
||||
type SessionsMessagesSubscribeParams,
|
||||
@@ -452,6 +454,9 @@ export const validateSessionsCleanupParams = ajv.compile<SessionsCleanupParams>(
|
||||
export const validateSessionsPreviewParams = ajv.compile<SessionsPreviewParams>(
|
||||
SessionsPreviewParamsSchema,
|
||||
);
|
||||
export const validateSessionsDescribeParams = ajv.compile<SessionsDescribeParams>(
|
||||
SessionsDescribeParamsSchema,
|
||||
);
|
||||
export const validateSessionsResolveParams = ajv.compile<SessionsResolveParams>(
|
||||
SessionsResolveParamsSchema,
|
||||
);
|
||||
@@ -700,6 +705,7 @@ export {
|
||||
SessionsListParamsSchema,
|
||||
SessionsCleanupParamsSchema,
|
||||
SessionsPreviewParamsSchema,
|
||||
SessionsDescribeParamsSchema,
|
||||
SessionsResolveParamsSchema,
|
||||
SessionsCompactionListParamsSchema,
|
||||
SessionsCompactionGetParamsSchema,
|
||||
@@ -926,6 +932,7 @@ export type {
|
||||
SessionsListParams,
|
||||
SessionsCleanupParams,
|
||||
SessionsPreviewParams,
|
||||
SessionsDescribeParams,
|
||||
SessionsResolveParams,
|
||||
SessionsPatchParams,
|
||||
SessionsPatchResult,
|
||||
|
||||
@@ -204,6 +204,7 @@ import {
|
||||
SessionsCleanupParamsSchema,
|
||||
SessionsCreateParamsSchema,
|
||||
SessionsDeleteParamsSchema,
|
||||
SessionsDescribeParamsSchema,
|
||||
SessionsListParamsSchema,
|
||||
SessionsMessagesSubscribeParamsSchema,
|
||||
SessionsMessagesUnsubscribeParamsSchema,
|
||||
@@ -278,6 +279,7 @@ export const ProtocolSchemas = {
|
||||
SessionsListParams: SessionsListParamsSchema,
|
||||
SessionsCleanupParams: SessionsCleanupParamsSchema,
|
||||
SessionsPreviewParams: SessionsPreviewParamsSchema,
|
||||
SessionsDescribeParams: SessionsDescribeParamsSchema,
|
||||
SessionsResolveParams: SessionsResolveParamsSchema,
|
||||
SessionCompactionCheckpoint: SessionCompactionCheckpointSchema,
|
||||
SessionsCompactionListParams: SessionsCompactionListParamsSchema,
|
||||
|
||||
@@ -80,6 +80,15 @@ export const SessionsPreviewParamsSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const SessionsDescribeParamsSchema = Type.Object(
|
||||
{
|
||||
key: NonEmptyString,
|
||||
includeDerivedTitles: Type.Optional(Type.Boolean()),
|
||||
includeLastMessage: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const SessionsResolveParamsSchema = Type.Object(
|
||||
{
|
||||
key: Type.Optional(NonEmptyString),
|
||||
|
||||
@@ -46,6 +46,7 @@ export type PushTestResult = SchemaType<"PushTestResult">;
|
||||
export type SessionsListParams = SchemaType<"SessionsListParams">;
|
||||
export type SessionsCleanupParams = SchemaType<"SessionsCleanupParams">;
|
||||
export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">;
|
||||
export type SessionsDescribeParams = SchemaType<"SessionsDescribeParams">;
|
||||
export type SessionsResolveParams = SchemaType<"SessionsResolveParams">;
|
||||
export type SessionCompactionCheckpoint = SchemaType<"SessionCompactionCheckpoint">;
|
||||
export type SessionsCompactionListParams = SchemaType<"SessionsCompactionListParams">;
|
||||
|
||||
@@ -96,6 +96,7 @@ const BASE_METHODS = [
|
||||
"sessions.messages.subscribe",
|
||||
"sessions.messages.unsubscribe",
|
||||
"sessions.preview",
|
||||
"sessions.describe",
|
||||
"sessions.compaction.list",
|
||||
"sessions.compaction.get",
|
||||
"sessions.compaction.branch",
|
||||
|
||||
@@ -57,6 +57,7 @@ import {
|
||||
validateSessionsCompactionRestoreParams,
|
||||
validateSessionsCreateParams,
|
||||
validateSessionsDeleteParams,
|
||||
validateSessionsDescribeParams,
|
||||
validateSessionsListParams,
|
||||
validateSessionsMessagesSubscribeParams,
|
||||
validateSessionsMessagesUnsubscribeParams,
|
||||
@@ -76,6 +77,7 @@ import {
|
||||
import { reactivateCompletedSubagentSession } from "../session-subagent-reactivation.js";
|
||||
import {
|
||||
archiveFileOnDisk,
|
||||
buildGatewaySessionRow,
|
||||
listSessionsFromStoreAsync,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadGatewaySessionRow,
|
||||
@@ -824,6 +826,34 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
|
||||
respond(true, { ts: Date.now(), previews } satisfies SessionsPreviewResult, undefined);
|
||||
},
|
||||
"sessions.describe": ({ params, respond, context }) => {
|
||||
if (!assertValidParams(params, validateSessionsDescribeParams, "sessions.describe", respond)) {
|
||||
return;
|
||||
}
|
||||
const key = requireSessionKey(params.key, respond);
|
||||
if (!key) {
|
||||
return;
|
||||
}
|
||||
const cfg = context.getRuntimeConfig();
|
||||
const { target, storePath } = resolveGatewaySessionTargetFromKey(key, cfg);
|
||||
const store = loadSessionStore(storePath);
|
||||
const entry = resolveFreshestSessionEntryFromStoreKeys(store, target.storeKeys);
|
||||
if (!entry) {
|
||||
respond(true, { session: null }, undefined);
|
||||
return;
|
||||
}
|
||||
const row = buildGatewaySessionRow({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
key: target.canonicalKey,
|
||||
entry,
|
||||
includeDerivedTitles: params.includeDerivedTitles,
|
||||
includeLastMessage: params.includeLastMessage,
|
||||
transcriptUsageMaxBytes: 64 * 1024,
|
||||
});
|
||||
respond(true, { session: row }, undefined);
|
||||
},
|
||||
"sessions.resolve": async ({ params, respond, context }) => {
|
||||
if (!assertValidParams(params, validateSessionsResolveParams, "sessions.resolve", respond)) {
|
||||
return;
|
||||
|
||||
@@ -1688,7 +1688,7 @@ export function loadGatewaySessionRow(
|
||||
*/
|
||||
const SESSIONS_LIST_YIELD_BATCH_SIZE = 10;
|
||||
|
||||
function filterAndSortSessionEntries(params: {
|
||||
export function filterAndSortSessionEntries(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
opts: import("./protocol/index.js").SessionsListParams;
|
||||
now: number;
|
||||
|
||||
@@ -190,9 +190,6 @@ describe("resolveSessionKeyFromResolveParams", () => {
|
||||
storePath,
|
||||
store: { [deletedAgentKey]: { sessionId: "sess-orphan", updatedAt: 1 } },
|
||||
});
|
||||
hoisted.listSessionsFromStoreMock.mockReturnValue({
|
||||
sessions: [{ key: deletedAgentKey, sessionId: "sess-orphan" }],
|
||||
});
|
||||
hoisted.listAgentIdsMock.mockReturnValue(["main"]);
|
||||
|
||||
const result = await resolveSessionKeyFromResolveParams({
|
||||
@@ -209,6 +206,27 @@ describe("resolveSessionKeyFromResolveParams", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("resolves sessionId matches from raw store metadata without hydrating session rows", async () => {
|
||||
hoisted.loadCombinedSessionStoreForGatewayMock.mockReturnValue({
|
||||
storePath,
|
||||
store: {
|
||||
"agent:main:noisy": { sessionId: "sess-noisy", updatedAt: 2 },
|
||||
"agent:main:target": { sessionId: "sess-target", updatedAt: 1 },
|
||||
},
|
||||
});
|
||||
hoisted.listSessionsFromStoreMock.mockImplementation(() => {
|
||||
throw new Error("session rows should not be materialized for exact sessionId lookup");
|
||||
});
|
||||
|
||||
const result = await resolveSessionKeyFromResolveParams({
|
||||
cfg: {},
|
||||
p: { sessionId: "sess-target" },
|
||||
});
|
||||
|
||||
expect(result).toEqual({ ok: true, key: "agent:main:target" });
|
||||
expect(hoisted.listSessionsFromStoreMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("rejects sessions belonging to a deleted agent (label-based lookup)", async () => {
|
||||
const deletedAgentKey = "agent:deleted-agent:main";
|
||||
hoisted.loadCombinedSessionStoreForGatewayMock.mockReturnValue({
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { loadSessionStore, updateSessionStore } from "../config/sessions.js";
|
||||
import { loadSessionStore, updateSessionStore, type SessionEntry } from "../config/sessions.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveSessionIdMatchSelection } from "../sessions/session-id-resolution.js";
|
||||
import { parseSessionLabel } from "../sessions/session-label.js";
|
||||
import { normalizeOptionalString } from "../shared/string-coerce.js";
|
||||
import {
|
||||
@@ -9,6 +10,7 @@ import {
|
||||
type SessionsResolveParams,
|
||||
} from "./protocol/index.js";
|
||||
import {
|
||||
filterAndSortSessionEntries,
|
||||
listSessionsFromStore,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
@@ -70,6 +72,22 @@ function isResolvedSessionKeyVisible(params: {
|
||||
}).sessions.some((session) => session.key === params.key);
|
||||
}
|
||||
|
||||
function findVisibleSessionIdMatches(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
p: SessionsResolveParams;
|
||||
sessionId: string;
|
||||
}): Array<[string, SessionEntry]> {
|
||||
const now = Date.now();
|
||||
const entries = filterAndSortSessionEntries({
|
||||
store: params.store,
|
||||
now,
|
||||
opts: resolveSessionVisibilityFilterOptions(params.p),
|
||||
});
|
||||
return entries.filter(
|
||||
([key, entry]) => entry?.sessionId === params.sessionId || key === params.sessionId,
|
||||
);
|
||||
}
|
||||
|
||||
export async function resolveSessionKeyFromResolveParams(params: {
|
||||
cfg: OpenClawConfig;
|
||||
p: SessionsResolveParams;
|
||||
@@ -148,29 +166,17 @@ export async function resolveSessionKeyFromResolveParams(params: {
|
||||
}
|
||||
|
||||
if (hasSessionId) {
|
||||
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
|
||||
const list = listSessionsFromStore({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
opts: {
|
||||
includeGlobal: p.includeGlobal === true,
|
||||
includeUnknown: p.includeUnknown === true,
|
||||
spawnedBy: p.spawnedBy,
|
||||
agentId: p.agentId,
|
||||
},
|
||||
});
|
||||
const matches = list.sessions.filter(
|
||||
(session) => session.sessionId === sessionId || session.key === sessionId,
|
||||
);
|
||||
if (matches.length === 0) {
|
||||
const { store } = loadCombinedSessionStoreForGateway(cfg);
|
||||
const matches = findVisibleSessionIdMatches({ store, p, sessionId });
|
||||
const selection = resolveSessionIdMatchSelection(matches, sessionId);
|
||||
if (selection.kind === "none") {
|
||||
return {
|
||||
ok: false,
|
||||
error: errorShape(ErrorCodes.INVALID_REQUEST, `No session found: ${sessionId}`),
|
||||
};
|
||||
}
|
||||
if (matches.length > 1) {
|
||||
const keys = matches.map((session) => session.key).join(", ");
|
||||
if (selection.kind === "ambiguous") {
|
||||
const keys = selection.sessionKeys.join(", ");
|
||||
return {
|
||||
ok: false,
|
||||
error: errorShape(
|
||||
@@ -179,11 +185,11 @@ export async function resolveSessionKeyFromResolveParams(params: {
|
||||
),
|
||||
};
|
||||
}
|
||||
const agentCheckSessionId = validateSessionAgentExists(cfg, matches[0].key);
|
||||
const agentCheckSessionId = validateSessionAgentExists(cfg, selection.sessionKey);
|
||||
if (agentCheckSessionId) {
|
||||
return agentCheckSessionId;
|
||||
}
|
||||
return { ok: true, key: matches[0].key };
|
||||
return { ok: true, key: selection.sessionKey };
|
||||
}
|
||||
|
||||
const parsedLabel = parseSessionLabel(p.label);
|
||||
|
||||
@@ -18,6 +18,7 @@ import type {
|
||||
ConversationDescriptor,
|
||||
PendingApproval,
|
||||
QueueEvent,
|
||||
SessionDescribeResult,
|
||||
SessionListResult,
|
||||
SessionMessagePayload,
|
||||
WaitFilter,
|
||||
@@ -206,10 +207,13 @@ export class OpenClawChannelBridge {
|
||||
if (!normalizedSessionKey) {
|
||||
return null;
|
||||
}
|
||||
const conversations = await this.listConversations({ limit: 500, includeLastMessage: true });
|
||||
return (
|
||||
conversations.find((conversation) => conversation.sessionKey === normalizedSessionKey) ?? null
|
||||
);
|
||||
await this.waitUntilReady();
|
||||
const response: SessionDescribeResult = await this.requestGateway("sessions.describe", {
|
||||
key: normalizedSessionKey,
|
||||
includeDerivedTitles: true,
|
||||
includeLastMessage: true,
|
||||
});
|
||||
return response.session ? toConversation(response.session) : null;
|
||||
}
|
||||
|
||||
async readMessages(
|
||||
|
||||
@@ -323,6 +323,46 @@ describe("openclaw channel mcp server", () => {
|
||||
);
|
||||
});
|
||||
|
||||
test("gets one conversation through sessions.describe without broad listing", async () => {
|
||||
const bridge = new OpenClawChannelBridge({} as never, {
|
||||
claudeChannelMode: "off",
|
||||
verbose: false,
|
||||
});
|
||||
const gatewayRequest = vi.fn(async (method: string) => {
|
||||
if (method === "sessions.describe") {
|
||||
return {
|
||||
session: {
|
||||
key: "agent:main:main",
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "-100123",
|
||||
accountId: "acct-1",
|
||||
},
|
||||
lastMessagePreview: "latest message",
|
||||
},
|
||||
};
|
||||
}
|
||||
throw new Error(`unexpected gateway method ${method}`);
|
||||
});
|
||||
|
||||
attachReadyGateway(bridge, gatewayRequest);
|
||||
|
||||
await expect(bridge.getConversation("agent:main:main")).resolves.toEqual(
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:main",
|
||||
channel: "telegram",
|
||||
to: "-100123",
|
||||
accountId: "acct-1",
|
||||
lastMessagePreview: "latest message",
|
||||
}),
|
||||
);
|
||||
expect(gatewayRequest).toHaveBeenCalledWith("sessions.describe", {
|
||||
key: "agent:main:main",
|
||||
includeDerivedTitles: true,
|
||||
includeLastMessage: true,
|
||||
});
|
||||
});
|
||||
|
||||
test("lists routed sessions from deliveryContext without mirrored route fields", async () => {
|
||||
const bridge = new OpenClawChannelBridge({} as never, {
|
||||
claudeChannelMode: "off",
|
||||
|
||||
@@ -46,6 +46,10 @@ export type SessionListResult = {
|
||||
sessions?: SessionRow[];
|
||||
};
|
||||
|
||||
export type SessionDescribeResult = {
|
||||
session?: SessionRow | null;
|
||||
};
|
||||
|
||||
export type ChatHistoryResult = {
|
||||
messages?: Array<{ id?: string; role?: string; content?: unknown; [key: string]: unknown }>;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user