From b76edc09e6a143f18f1fbc2abf07fdd7bb4fd57c Mon Sep 17 00:00:00 2001 From: Devin Robison Date: Wed, 22 Apr 2026 16:11:32 -0600 Subject: [PATCH] fix(gateway): reauthorize session history SSE updates (#70237) * fix(gateway): reauthorize session history SSE updates * docs(changelog): note session history sse reauth * fix(gateway): use live proxy config for sse reauth * fix(gateway): skip unrelated session sse reauth * fix(gateway): filter sse transcript updates early, log work failures, forward-declare cleanup bindings --- CHANGELOG.md | 1 + src/gateway/http-utils.ts | 49 +++- src/gateway/server-http.ts | 1 + .../sessions-history-http.revocation.test.ts | 277 ++++++++++++++++++ src/gateway/sessions-history-http.ts | 147 ++++++++-- 5 files changed, 435 insertions(+), 40 deletions(-) create mode 100644 src/gateway/sessions-history-http.revocation.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 367de0d74b0..72f16c84596 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai - CLI/channels: honor `channels..enabled=false` as a hard read-only presence opt-out, so env vars, manifest env vars, or stale persisted auth state no longer make disabled channel plugins appear in status, doctor, or setup-only discovery. - Channels/preview streaming: centralize draft-preview finalization so Slack, Discord, Mattermost, and Matrix no longer flush temporary preview messages for media/error finals, and preserve first-reply threading for normal fallback delivery. - Discord: keep slash command follow-up chunks ephemeral when the command is configured for ephemeral replies, so long `/status` output no longer leaks fallback model or runtime details into the public channel. (#69869) thanks @gumadeiras. +- Gateway/session history: re-check current auth and `chat.history` scope before later SSE keepalives and transcript updates, so active session-history streams close before delivering post-revocation events. - Plugins/discovery: reject package plugin source entries that escape the package directory before explicit runtime entries or inferred built JavaScript peers can be used. (#69868) thanks @gumadeiras. - CLI/channels: resolve channel presence through a shared policy that keeps ambient env vars and stale persisted auth from surfacing disabled bundled plugins in status, doctor, security audit, and cron delivery validation unless the channel or plugin is effectively enabled or explicitly configured. (#69862) Thanks @gumadeiras. - Doctor/plugins: hydrate legacy partial interactive handler state before plugin reload clears dedupe caches, so `openclaw doctor` and post-update doctor runs no longer crash with `Cannot read properties of undefined (reading 'clear')`. (#70135) Thanks @ngutman. diff --git a/src/gateway/http-utils.ts b/src/gateway/http-utils.ts index ba03df32f4f..9efa4db6fda 100644 --- a/src/gateway/http-utils.ts +++ b/src/gateway/http-utils.ts @@ -54,6 +54,16 @@ export type AuthorizedGatewayHttpRequest = { trustDeclaredOperatorScopes: boolean; }; +export type GatewayHttpRequestAuthCheckResult = + | { + ok: true; + requestAuth: AuthorizedGatewayHttpRequest; + } + | { + ok: false; + authResult: GatewayAuthResult; + }; + export function resolveHttpBrowserOriginPolicy( req: IncomingMessage, cfg = loadConfig(), @@ -96,8 +106,24 @@ export async function authorizeGatewayHttpRequestOrReply(params: { allowRealIpFallback?: boolean; rateLimiter?: AuthRateLimiter; }): Promise { + const result = await checkGatewayHttpRequestAuth(params); + if (!result.ok) { + sendGatewayAuthFailure(params.res, result.authResult); + return null; + } + return result.requestAuth; +} + +export async function checkGatewayHttpRequestAuth(params: { + req: IncomingMessage; + auth: ResolvedGatewayAuth; + trustedProxies?: string[]; + allowRealIpFallback?: boolean; + rateLimiter?: AuthRateLimiter; + cfg?: OpenClawConfig; +}): Promise { const token = getBearerToken(params.req); - const browserOriginPolicy = resolveHttpBrowserOriginPolicy(params.req); + const browserOriginPolicy = resolveHttpBrowserOriginPolicy(params.req, params.cfg); const authResult = await authorizeHttpGatewayConnect({ auth: params.auth, connectAuth: token ? { token, password: token } : null, @@ -108,16 +134,21 @@ export async function authorizeGatewayHttpRequestOrReply(params: { browserOriginPolicy, }); if (!authResult.ok) { - sendGatewayAuthFailure(params.res, authResult); - return null; + return { + ok: false, + authResult, + }; } return { - authMethod: authResult.method, - // Shared-secret bearer auth proves possession of the gateway secret, but it - // does not prove a narrower per-request operator identity. HTTP endpoints - // must opt in explicitly if they want to treat that shared-secret path as a - // full trusted-operator surface. - trustDeclaredOperatorScopes: !usesSharedSecretGatewayMethod(authResult.method), + ok: true, + requestAuth: { + authMethod: authResult.method, + // Shared-secret bearer auth proves possession of the gateway secret, but it + // does not prove a narrower per-request operator identity. HTTP endpoints + // must opt in explicitly if they want to treat that shared-secret path as a + // full trusted-operator surface. + trustDeclaredOperatorScopes: !usesSharedSecretGatewayMethod(authResult.method), + }, }; } diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index b047b237d0b..ab7ef2b7666 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -976,6 +976,7 @@ export function createGatewayHttpServer(opts: { run: async () => (await getSessionHistoryHttpModule()).handleSessionHistoryHttpRequest(req, res, { auth: resolvedAuth, + getResolvedAuth, trustedProxies, allowRealIpFallback, rateLimiter, diff --git a/src/gateway/sessions-history-http.revocation.test.ts b/src/gateway/sessions-history-http.revocation.test.ts new file mode 100644 index 00000000000..e01be4a1eb3 --- /dev/null +++ b/src/gateway/sessions-history-http.revocation.test.ts @@ -0,0 +1,277 @@ +import { EventEmitter } from "node:events"; +import type { IncomingMessage, ServerResponse } from "node:http"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +let transcriptUpdateHandler: + | ((update: { sessionFile?: string; message?: unknown; messageId?: string }) => void) + | undefined; +let authRevoked = false; +let gatewayConfig: { + trustedProxies?: string[]; + allowRealIpFallback?: boolean; + webchat: { chatHistoryMaxChars: number }; +} = { + trustedProxies: ["10.0.0.1"], + allowRealIpFallback: false, + webchat: { chatHistoryMaxChars: 2000 }, +}; +let authCheckCalls = 0; + +vi.mock("../config/config.js", () => ({ + loadConfig: () => ({ + gateway: gatewayConfig, + }), +})); + +vi.mock("../config/sessions.js", () => ({ + loadSessionStore: () => ({ entries: [] }), +})); + +vi.mock("../sessions/transcript-events.js", () => ({ + onSessionTranscriptUpdate: (cb: typeof transcriptUpdateHandler) => { + transcriptUpdateHandler = cb; + return () => { + if (transcriptUpdateHandler === cb) { + transcriptUpdateHandler = undefined; + } + }; + }, +})); + +vi.mock("./http-utils.js", () => ({ + getHeader: (req: IncomingMessage, name: string) => { + const value = req.headers[name.toLowerCase()]; + return Array.isArray(value) ? value[0] : value; + }, + resolveTrustedHttpOperatorScopes: () => ["operator.read"], + authorizeScopedGatewayHttpRequestOrReply: async () => ({ + cfg: { gateway: { webchat: { chatHistoryMaxChars: 2000 } } }, + requestAuth: { trustDeclaredOperatorScopes: true }, + }), + checkGatewayHttpRequestAuth: async (params: { + trustedProxies?: string[]; + allowRealIpFallback?: boolean; + }) => { + authCheckCalls += 1; + if (authRevoked) { + return { + ok: false as const, + authResult: { ok: false, reason: "trusted_proxy_user_not_allowed" }, + }; + } + if ( + gatewayConfig.trustedProxies === undefined && + gatewayConfig.allowRealIpFallback === undefined + ) { + return params.trustedProxies === undefined && params.allowRealIpFallback === undefined + ? { + ok: false as const, + authResult: { ok: false, reason: "trusted_proxy_no_proxies_configured" }, + } + : { + ok: true as const, + requestAuth: { trustDeclaredOperatorScopes: true }, + }; + } + return { + ok: true as const, + requestAuth: { trustDeclaredOperatorScopes: true }, + }; + }, +})); + +vi.mock("./session-utils.js", () => ({ + resolveGatewaySessionStoreTarget: () => ({ + storePath: "/tmp", + storeKeys: ["agent:main"], + canonicalKey: "agent:main", + agentId: "main", + }), + resolveFreshestSessionEntryFromStoreKeys: () => ({ + sessionId: "session-1", + sessionFile: "/tmp/session-1.jsonl", + }), + readSessionMessages: () => [], + resolveSessionTranscriptCandidates: () => ["/tmp/session-1.jsonl"], +})); + +vi.mock("./session-history-state.js", () => ({ + buildSessionHistorySnapshot: () => ({ + history: { items: [], nextCursor: null, messages: [] }, + }), + SessionHistorySseState: { + fromRawSnapshot: () => ({ + snapshot: () => ({ items: [], nextCursor: null, messages: [] }), + appendInlineMessage: ({ message, messageId }: { message: unknown; messageId?: string }) => ({ + message, + messageSeq: 1, + messageId, + }), + refresh: () => ({ items: [], nextCursor: null, messages: [] }), + }), + }, +})); + +import { handleSessionHistoryHttpRequest } from "./sessions-history-http.js"; + +class MockReq extends EventEmitter { + url: string; + method: string; + headers: Record; + socket = new EventEmitter(); + + constructor(url: string) { + super(); + this.url = url; + this.method = "GET"; + this.headers = { + host: "localhost", + accept: "text/event-stream", + authorization: "Bearer token", + "x-openclaw-scopes": "operator.read", + }; + } +} + +class MockRes extends EventEmitter { + statusCode = 0; + headers = new Map(); + writes: string[] = []; + writableEnded = false; + socket = new EventEmitter(); + + setHeader(name: string, value: string) { + this.headers.set(name.toLowerCase(), value); + } + + write(chunk: string) { + this.writes.push(chunk); + return true; + } + + end(chunk?: string) { + if (chunk !== undefined) { + this.writes.push(chunk); + } + this.writableEnded = true; + this.emit("finish"); + this.emit("close"); + return this; + } + + flushHeaders() {} +} + +afterEach(() => { + transcriptUpdateHandler = undefined; + authRevoked = false; + authCheckCalls = 0; + gatewayConfig = { + trustedProxies: ["10.0.0.1"], + allowRealIpFallback: false, + webchat: { chatHistoryMaxChars: 2000 }, + }; +}); + +describe("session history SSE auth revocation", () => { + it("closes the stream before delivering transcript updates after auth is revoked", async () => { + const req = new MockReq("/sessions/agent%3Amain/history"); + const res = new MockRes(); + + const handled = await handleSessionHistoryHttpRequest( + req as unknown as IncomingMessage, + res as unknown as ServerResponse, + { auth: { mode: "trusted-proxy" } as never }, + ); + + expect(handled).toBe(true); + expect(transcriptUpdateHandler).toBeTypeOf("function"); + expect(res.headers.get("content-type")).toContain("text/event-stream"); + + authRevoked = true; + + transcriptUpdateHandler?.({ + sessionFile: "/tmp/session-1.jsonl", + message: { role: "assistant", content: [{ type: "text", text: "post-revocation secret" }] }, + messageId: "m-1", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + const joined = res.writes.join(""); + expect(joined).not.toContain("event: message"); + expect(joined).not.toContain("post-revocation secret"); + expect(res.writableEnded).toBe(true); + }); + + it("rechecks SSE auth against live proxy config instead of startup fallbacks", async () => { + const req = new MockReq("/sessions/agent%3Amain/history"); + const res = new MockRes(); + + const handled = await handleSessionHistoryHttpRequest( + req as unknown as IncomingMessage, + res as unknown as ServerResponse, + { + auth: { mode: "trusted-proxy" } as never, + trustedProxies: ["10.0.0.1"], + allowRealIpFallback: false, + }, + ); + + expect(handled).toBe(true); + expect(transcriptUpdateHandler).toBeTypeOf("function"); + + gatewayConfig = { + webchat: { chatHistoryMaxChars: 2000 }, + }; + + transcriptUpdateHandler?.({ + sessionFile: "/tmp/session-1.jsonl", + message: { role: "assistant", content: [{ type: "text", text: "stale-proxy event" }] }, + messageId: "m-2", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + const joined = res.writes.join(""); + expect(joined).not.toContain("event: message"); + expect(joined).not.toContain("stale-proxy event"); + expect(res.writableEnded).toBe(true); + }); + + it("skips SSE reauth for transcript updates outside this stream", async () => { + const req = new MockReq("/sessions/agent%3Amain/history"); + const res = new MockRes(); + + const handled = await handleSessionHistoryHttpRequest( + req as unknown as IncomingMessage, + res as unknown as ServerResponse, + { + auth: { mode: "trusted-proxy" } as never, + trustedProxies: ["10.0.0.1"], + allowRealIpFallback: false, + }, + ); + + expect(handled).toBe(true); + expect(transcriptUpdateHandler).toBeTypeOf("function"); + + authCheckCalls = 0; + gatewayConfig = { + webchat: { chatHistoryMaxChars: 2000 }, + }; + + transcriptUpdateHandler?.({ + sessionFile: "/tmp/other-session.jsonl", + message: { role: "assistant", content: [{ type: "text", text: "other session" }] }, + messageId: "m-3", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + const joined = res.writes.join(""); + expect(authCheckCalls).toBe(0); + expect(joined).not.toContain("other session"); + expect(res.writableEnded).toBe(false); + }); +}); diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index 60ba784acb0..02e7dabe653 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -1,7 +1,9 @@ import fs from "node:fs"; import type { IncomingMessage, ServerResponse } from "node:http"; import path from "node:path"; +import { loadConfig } from "../config/config.js"; import { loadSessionStore } from "../config/sessions.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { normalizeLowercaseStringOrEmpty, @@ -17,9 +19,11 @@ import { } from "./http-common.js"; import { authorizeScopedGatewayHttpRequestOrReply, + checkGatewayHttpRequestAuth, getHeader, resolveTrustedHttpOperatorScopes, } from "./http-utils.js"; +import { authorizeOperatorScopesForMethod } from "./method-scopes.js"; import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS } from "./server-methods/chat.js"; import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js"; import { @@ -29,6 +33,8 @@ import { resolveSessionTranscriptCandidates, } from "./session-utils.js"; +const log = createSubsystemLogger("gateway/sessions-history-sse"); + const MAX_SESSION_HISTORY_LIMIT = 1000; function resolveSessionHistoryPath(req: IncomingMessage): string | null { @@ -88,6 +94,7 @@ export async function handleSessionHistoryHttpRequest( res: ServerResponse, opts: { auth: ResolvedGatewayAuth; + getResolvedAuth?: () => ResolvedGatewayAuth; trustedProxies?: string[]; allowRealIpFallback?: boolean; rateLimiter?: AuthRateLimiter; @@ -197,50 +204,128 @@ export async function handleSessionHistoryHttpRequest( ...sentHistory, }); - const heartbeat = setInterval(() => { - if (!res.writableEnded) { - res.write(": keepalive\n\n"); + let cleanedUp = false; + let streamQueue = Promise.resolve(); + // Forward-declared so `cleanup` can reference them without relying on + // Temporal-Dead-Zone leniency. A future refactor that wires the close event + // listeners before the `setInterval` / `onSessionTranscriptUpdate` calls + // would otherwise hit a `ReferenceError` on the first cleanup invocation. + let heartbeat: ReturnType | undefined; + let unsubscribe: (() => void) | undefined; + + const cleanup = () => { + if (cleanedUp) { + return; } + cleanedUp = true; + if (heartbeat) { + clearInterval(heartbeat); + } + if (unsubscribe) { + unsubscribe(); + } + }; + + const closeStream = () => { + cleanup(); + if (!res.writableEnded) { + res.end(); + } + }; + + const queueStreamWork = (work: () => Promise) => { + streamQueue = streamQueue + .then(async () => { + if (cleanedUp || res.writableEnded) { + return; + } + await work(); + }) + .catch((error) => { + // Surface the underlying error so operators can distinguish transient + // infrastructure failures (for example a `loadConfig()` read error + // inside the reauth path) from deliberate revocation, then fail closed. + log.warn("session history SSE stream work failed; closing stream", { error }); + closeStream(); + }); + }; + + const isStreamStillAuthorized = async (): Promise => { + const cfg = loadConfig(); + const currentRequestAuth = await checkGatewayHttpRequestAuth({ + req, + auth: opts.getResolvedAuth?.() ?? opts.auth, + trustedProxies: cfg.gateway?.trustedProxies, + allowRealIpFallback: cfg.gateway?.allowRealIpFallback, + rateLimiter: opts.rateLimiter, + cfg, + }); + if (!currentRequestAuth.ok) { + return false; + } + const requestedScopes = resolveTrustedHttpOperatorScopes(req, currentRequestAuth.requestAuth); + return authorizeOperatorScopesForMethod("chat.history", requestedScopes).allowed; + }; + + heartbeat = setInterval(() => { + queueStreamWork(async () => { + if (!(await isStreamStillAuthorized())) { + closeStream(); + return; + } + if (!res.writableEnded) { + res.write(": keepalive\n\n"); + } + }); }, 15_000); - const unsubscribe = onSessionTranscriptUpdate((update) => { - if (res.writableEnded || !entry?.sessionId) { + unsubscribe = onSessionTranscriptUpdate((update) => { + // Filter to candidate sessions synchronously before enqueueing any async + // work. `onSessionTranscriptUpdate` is a global fan-out listener, so every + // transcript write in the gateway would otherwise append a Promise-chain + // entry capturing `update.message` to every open SSE stream's queue — + // O(streams × updates) for busy deployments. + if (!entry?.sessionId) { return; } const updatePath = canonicalizePath(update.sessionFile); if (!updatePath || !transcriptCandidates.has(updatePath)) { return; } - if (update.message !== undefined) { - if (limit === undefined && cursor === undefined) { - const nextEvent = sseState.appendInlineMessage({ - message: update.message, - messageId: update.messageId, - }); - if (!nextEvent) { - return; - } - sentHistory = sseState.snapshot(); - sseWrite(res, "message", { - sessionKey: target.canonicalKey, - message: nextEvent.message, - ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), - messageSeq: nextEvent.messageSeq, - }); + queueStreamWork(async () => { + if (res.writableEnded) { return; } - } - sentHistory = sseState.refresh(); - sseWrite(res, "history", { - sessionKey: target.canonicalKey, - ...sentHistory, + if (!(await isStreamStillAuthorized())) { + closeStream(); + return; + } + if (update.message !== undefined) { + if (limit === undefined && cursor === undefined) { + const nextEvent = sseState.appendInlineMessage({ + message: update.message, + messageId: update.messageId, + }); + if (!nextEvent) { + return; + } + sentHistory = sseState.snapshot(); + sseWrite(res, "message", { + sessionKey: target.canonicalKey, + message: nextEvent.message, + ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), + messageSeq: nextEvent.messageSeq, + }); + return; + } + } + sentHistory = sseState.refresh(); + sseWrite(res, "history", { + sessionKey: target.canonicalKey, + ...sentHistory, + }); }); }); - - const cleanup = () => { - clearInterval(heartbeat); - unsubscribe(); - }; req.on("close", cleanup); res.on("close", cleanup); res.on("finish", cleanup);