mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:10:44 +00:00
fix(gateway): reauthorize session history SSE updates (#70237)
* fix(gateway): reauthorize session history SSE updates * docs(changelog): note session history sse reauth * fix(gateway): use live proxy config for sse reauth * fix(gateway): skip unrelated session sse reauth * fix(gateway): filter sse transcript updates early, log work failures, forward-declare cleanup bindings
This commit is contained in:
@@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai
|
||||
- CLI/channels: honor `channels.<id>.enabled=false` as a hard read-only presence opt-out, so env vars, manifest env vars, or stale persisted auth state no longer make disabled channel plugins appear in status, doctor, or setup-only discovery.
|
||||
- Channels/preview streaming: centralize draft-preview finalization so Slack, Discord, Mattermost, and Matrix no longer flush temporary preview messages for media/error finals, and preserve first-reply threading for normal fallback delivery.
|
||||
- Discord: keep slash command follow-up chunks ephemeral when the command is configured for ephemeral replies, so long `/status` output no longer leaks fallback model or runtime details into the public channel. (#69869) thanks @gumadeiras.
|
||||
- Gateway/session history: re-check current auth and `chat.history` scope before later SSE keepalives and transcript updates, so active session-history streams close before delivering post-revocation events.
|
||||
- Plugins/discovery: reject package plugin source entries that escape the package directory before explicit runtime entries or inferred built JavaScript peers can be used. (#69868) thanks @gumadeiras.
|
||||
- CLI/channels: resolve channel presence through a shared policy that keeps ambient env vars and stale persisted auth from surfacing disabled bundled plugins in status, doctor, security audit, and cron delivery validation unless the channel or plugin is effectively enabled or explicitly configured. (#69862) Thanks @gumadeiras.
|
||||
- Doctor/plugins: hydrate legacy partial interactive handler state before plugin reload clears dedupe caches, so `openclaw doctor` and post-update doctor runs no longer crash with `Cannot read properties of undefined (reading 'clear')`. (#70135) Thanks @ngutman.
|
||||
|
||||
@@ -54,6 +54,16 @@ export type AuthorizedGatewayHttpRequest = {
|
||||
trustDeclaredOperatorScopes: boolean;
|
||||
};
|
||||
|
||||
export type GatewayHttpRequestAuthCheckResult =
|
||||
| {
|
||||
ok: true;
|
||||
requestAuth: AuthorizedGatewayHttpRequest;
|
||||
}
|
||||
| {
|
||||
ok: false;
|
||||
authResult: GatewayAuthResult;
|
||||
};
|
||||
|
||||
export function resolveHttpBrowserOriginPolicy(
|
||||
req: IncomingMessage,
|
||||
cfg = loadConfig(),
|
||||
@@ -96,8 +106,24 @@ export async function authorizeGatewayHttpRequestOrReply(params: {
|
||||
allowRealIpFallback?: boolean;
|
||||
rateLimiter?: AuthRateLimiter;
|
||||
}): Promise<AuthorizedGatewayHttpRequest | null> {
|
||||
const result = await checkGatewayHttpRequestAuth(params);
|
||||
if (!result.ok) {
|
||||
sendGatewayAuthFailure(params.res, result.authResult);
|
||||
return null;
|
||||
}
|
||||
return result.requestAuth;
|
||||
}
|
||||
|
||||
export async function checkGatewayHttpRequestAuth(params: {
|
||||
req: IncomingMessage;
|
||||
auth: ResolvedGatewayAuth;
|
||||
trustedProxies?: string[];
|
||||
allowRealIpFallback?: boolean;
|
||||
rateLimiter?: AuthRateLimiter;
|
||||
cfg?: OpenClawConfig;
|
||||
}): Promise<GatewayHttpRequestAuthCheckResult> {
|
||||
const token = getBearerToken(params.req);
|
||||
const browserOriginPolicy = resolveHttpBrowserOriginPolicy(params.req);
|
||||
const browserOriginPolicy = resolveHttpBrowserOriginPolicy(params.req, params.cfg);
|
||||
const authResult = await authorizeHttpGatewayConnect({
|
||||
auth: params.auth,
|
||||
connectAuth: token ? { token, password: token } : null,
|
||||
@@ -108,16 +134,21 @@ export async function authorizeGatewayHttpRequestOrReply(params: {
|
||||
browserOriginPolicy,
|
||||
});
|
||||
if (!authResult.ok) {
|
||||
sendGatewayAuthFailure(params.res, authResult);
|
||||
return null;
|
||||
return {
|
||||
ok: false,
|
||||
authResult,
|
||||
};
|
||||
}
|
||||
return {
|
||||
authMethod: authResult.method,
|
||||
// Shared-secret bearer auth proves possession of the gateway secret, but it
|
||||
// does not prove a narrower per-request operator identity. HTTP endpoints
|
||||
// must opt in explicitly if they want to treat that shared-secret path as a
|
||||
// full trusted-operator surface.
|
||||
trustDeclaredOperatorScopes: !usesSharedSecretGatewayMethod(authResult.method),
|
||||
ok: true,
|
||||
requestAuth: {
|
||||
authMethod: authResult.method,
|
||||
// Shared-secret bearer auth proves possession of the gateway secret, but it
|
||||
// does not prove a narrower per-request operator identity. HTTP endpoints
|
||||
// must opt in explicitly if they want to treat that shared-secret path as a
|
||||
// full trusted-operator surface.
|
||||
trustDeclaredOperatorScopes: !usesSharedSecretGatewayMethod(authResult.method),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -976,6 +976,7 @@ export function createGatewayHttpServer(opts: {
|
||||
run: async () =>
|
||||
(await getSessionHistoryHttpModule()).handleSessionHistoryHttpRequest(req, res, {
|
||||
auth: resolvedAuth,
|
||||
getResolvedAuth,
|
||||
trustedProxies,
|
||||
allowRealIpFallback,
|
||||
rateLimiter,
|
||||
|
||||
277
src/gateway/sessions-history-http.revocation.test.ts
Normal file
277
src/gateway/sessions-history-http.revocation.test.ts
Normal file
@@ -0,0 +1,277 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
let transcriptUpdateHandler:
|
||||
| ((update: { sessionFile?: string; message?: unknown; messageId?: string }) => void)
|
||||
| undefined;
|
||||
let authRevoked = false;
|
||||
let gatewayConfig: {
|
||||
trustedProxies?: string[];
|
||||
allowRealIpFallback?: boolean;
|
||||
webchat: { chatHistoryMaxChars: number };
|
||||
} = {
|
||||
trustedProxies: ["10.0.0.1"],
|
||||
allowRealIpFallback: false,
|
||||
webchat: { chatHistoryMaxChars: 2000 },
|
||||
};
|
||||
let authCheckCalls = 0;
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: () => ({
|
||||
gateway: gatewayConfig,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../config/sessions.js", () => ({
|
||||
loadSessionStore: () => ({ entries: [] }),
|
||||
}));
|
||||
|
||||
vi.mock("../sessions/transcript-events.js", () => ({
|
||||
onSessionTranscriptUpdate: (cb: typeof transcriptUpdateHandler) => {
|
||||
transcriptUpdateHandler = cb;
|
||||
return () => {
|
||||
if (transcriptUpdateHandler === cb) {
|
||||
transcriptUpdateHandler = undefined;
|
||||
}
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("./http-utils.js", () => ({
|
||||
getHeader: (req: IncomingMessage, name: string) => {
|
||||
const value = req.headers[name.toLowerCase()];
|
||||
return Array.isArray(value) ? value[0] : value;
|
||||
},
|
||||
resolveTrustedHttpOperatorScopes: () => ["operator.read"],
|
||||
authorizeScopedGatewayHttpRequestOrReply: async () => ({
|
||||
cfg: { gateway: { webchat: { chatHistoryMaxChars: 2000 } } },
|
||||
requestAuth: { trustDeclaredOperatorScopes: true },
|
||||
}),
|
||||
checkGatewayHttpRequestAuth: async (params: {
|
||||
trustedProxies?: string[];
|
||||
allowRealIpFallback?: boolean;
|
||||
}) => {
|
||||
authCheckCalls += 1;
|
||||
if (authRevoked) {
|
||||
return {
|
||||
ok: false as const,
|
||||
authResult: { ok: false, reason: "trusted_proxy_user_not_allowed" },
|
||||
};
|
||||
}
|
||||
if (
|
||||
gatewayConfig.trustedProxies === undefined &&
|
||||
gatewayConfig.allowRealIpFallback === undefined
|
||||
) {
|
||||
return params.trustedProxies === undefined && params.allowRealIpFallback === undefined
|
||||
? {
|
||||
ok: false as const,
|
||||
authResult: { ok: false, reason: "trusted_proxy_no_proxies_configured" },
|
||||
}
|
||||
: {
|
||||
ok: true as const,
|
||||
requestAuth: { trustDeclaredOperatorScopes: true },
|
||||
};
|
||||
}
|
||||
return {
|
||||
ok: true as const,
|
||||
requestAuth: { trustDeclaredOperatorScopes: true },
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("./session-utils.js", () => ({
|
||||
resolveGatewaySessionStoreTarget: () => ({
|
||||
storePath: "/tmp",
|
||||
storeKeys: ["agent:main"],
|
||||
canonicalKey: "agent:main",
|
||||
agentId: "main",
|
||||
}),
|
||||
resolveFreshestSessionEntryFromStoreKeys: () => ({
|
||||
sessionId: "session-1",
|
||||
sessionFile: "/tmp/session-1.jsonl",
|
||||
}),
|
||||
readSessionMessages: () => [],
|
||||
resolveSessionTranscriptCandidates: () => ["/tmp/session-1.jsonl"],
|
||||
}));
|
||||
|
||||
vi.mock("./session-history-state.js", () => ({
|
||||
buildSessionHistorySnapshot: () => ({
|
||||
history: { items: [], nextCursor: null, messages: [] },
|
||||
}),
|
||||
SessionHistorySseState: {
|
||||
fromRawSnapshot: () => ({
|
||||
snapshot: () => ({ items: [], nextCursor: null, messages: [] }),
|
||||
appendInlineMessage: ({ message, messageId }: { message: unknown; messageId?: string }) => ({
|
||||
message,
|
||||
messageSeq: 1,
|
||||
messageId,
|
||||
}),
|
||||
refresh: () => ({ items: [], nextCursor: null, messages: [] }),
|
||||
}),
|
||||
},
|
||||
}));
|
||||
|
||||
import { handleSessionHistoryHttpRequest } from "./sessions-history-http.js";
|
||||
|
||||
class MockReq extends EventEmitter {
|
||||
url: string;
|
||||
method: string;
|
||||
headers: Record<string, string>;
|
||||
socket = new EventEmitter();
|
||||
|
||||
constructor(url: string) {
|
||||
super();
|
||||
this.url = url;
|
||||
this.method = "GET";
|
||||
this.headers = {
|
||||
host: "localhost",
|
||||
accept: "text/event-stream",
|
||||
authorization: "Bearer token",
|
||||
"x-openclaw-scopes": "operator.read",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
class MockRes extends EventEmitter {
|
||||
statusCode = 0;
|
||||
headers = new Map<string, string>();
|
||||
writes: string[] = [];
|
||||
writableEnded = false;
|
||||
socket = new EventEmitter();
|
||||
|
||||
setHeader(name: string, value: string) {
|
||||
this.headers.set(name.toLowerCase(), value);
|
||||
}
|
||||
|
||||
write(chunk: string) {
|
||||
this.writes.push(chunk);
|
||||
return true;
|
||||
}
|
||||
|
||||
end(chunk?: string) {
|
||||
if (chunk !== undefined) {
|
||||
this.writes.push(chunk);
|
||||
}
|
||||
this.writableEnded = true;
|
||||
this.emit("finish");
|
||||
this.emit("close");
|
||||
return this;
|
||||
}
|
||||
|
||||
flushHeaders() {}
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
transcriptUpdateHandler = undefined;
|
||||
authRevoked = false;
|
||||
authCheckCalls = 0;
|
||||
gatewayConfig = {
|
||||
trustedProxies: ["10.0.0.1"],
|
||||
allowRealIpFallback: false,
|
||||
webchat: { chatHistoryMaxChars: 2000 },
|
||||
};
|
||||
});
|
||||
|
||||
describe("session history SSE auth revocation", () => {
|
||||
it("closes the stream before delivering transcript updates after auth is revoked", async () => {
|
||||
const req = new MockReq("/sessions/agent%3Amain/history");
|
||||
const res = new MockRes();
|
||||
|
||||
const handled = await handleSessionHistoryHttpRequest(
|
||||
req as unknown as IncomingMessage,
|
||||
res as unknown as ServerResponse,
|
||||
{ auth: { mode: "trusted-proxy" } as never },
|
||||
);
|
||||
|
||||
expect(handled).toBe(true);
|
||||
expect(transcriptUpdateHandler).toBeTypeOf("function");
|
||||
expect(res.headers.get("content-type")).toContain("text/event-stream");
|
||||
|
||||
authRevoked = true;
|
||||
|
||||
transcriptUpdateHandler?.({
|
||||
sessionFile: "/tmp/session-1.jsonl",
|
||||
message: { role: "assistant", content: [{ type: "text", text: "post-revocation secret" }] },
|
||||
messageId: "m-1",
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
const joined = res.writes.join("");
|
||||
expect(joined).not.toContain("event: message");
|
||||
expect(joined).not.toContain("post-revocation secret");
|
||||
expect(res.writableEnded).toBe(true);
|
||||
});
|
||||
|
||||
it("rechecks SSE auth against live proxy config instead of startup fallbacks", async () => {
|
||||
const req = new MockReq("/sessions/agent%3Amain/history");
|
||||
const res = new MockRes();
|
||||
|
||||
const handled = await handleSessionHistoryHttpRequest(
|
||||
req as unknown as IncomingMessage,
|
||||
res as unknown as ServerResponse,
|
||||
{
|
||||
auth: { mode: "trusted-proxy" } as never,
|
||||
trustedProxies: ["10.0.0.1"],
|
||||
allowRealIpFallback: false,
|
||||
},
|
||||
);
|
||||
|
||||
expect(handled).toBe(true);
|
||||
expect(transcriptUpdateHandler).toBeTypeOf("function");
|
||||
|
||||
gatewayConfig = {
|
||||
webchat: { chatHistoryMaxChars: 2000 },
|
||||
};
|
||||
|
||||
transcriptUpdateHandler?.({
|
||||
sessionFile: "/tmp/session-1.jsonl",
|
||||
message: { role: "assistant", content: [{ type: "text", text: "stale-proxy event" }] },
|
||||
messageId: "m-2",
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
const joined = res.writes.join("");
|
||||
expect(joined).not.toContain("event: message");
|
||||
expect(joined).not.toContain("stale-proxy event");
|
||||
expect(res.writableEnded).toBe(true);
|
||||
});
|
||||
|
||||
it("skips SSE reauth for transcript updates outside this stream", async () => {
|
||||
const req = new MockReq("/sessions/agent%3Amain/history");
|
||||
const res = new MockRes();
|
||||
|
||||
const handled = await handleSessionHistoryHttpRequest(
|
||||
req as unknown as IncomingMessage,
|
||||
res as unknown as ServerResponse,
|
||||
{
|
||||
auth: { mode: "trusted-proxy" } as never,
|
||||
trustedProxies: ["10.0.0.1"],
|
||||
allowRealIpFallback: false,
|
||||
},
|
||||
);
|
||||
|
||||
expect(handled).toBe(true);
|
||||
expect(transcriptUpdateHandler).toBeTypeOf("function");
|
||||
|
||||
authCheckCalls = 0;
|
||||
gatewayConfig = {
|
||||
webchat: { chatHistoryMaxChars: 2000 },
|
||||
};
|
||||
|
||||
transcriptUpdateHandler?.({
|
||||
sessionFile: "/tmp/other-session.jsonl",
|
||||
message: { role: "assistant", content: [{ type: "text", text: "other session" }] },
|
||||
messageId: "m-3",
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
const joined = res.writes.join("");
|
||||
expect(authCheckCalls).toBe(0);
|
||||
expect(joined).not.toContain("other session");
|
||||
expect(res.writableEnded).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -1,7 +1,9 @@
|
||||
import fs from "node:fs";
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import path from "node:path";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { loadSessionStore } from "../config/sessions.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import {
|
||||
normalizeLowercaseStringOrEmpty,
|
||||
@@ -17,9 +19,11 @@ import {
|
||||
} from "./http-common.js";
|
||||
import {
|
||||
authorizeScopedGatewayHttpRequestOrReply,
|
||||
checkGatewayHttpRequestAuth,
|
||||
getHeader,
|
||||
resolveTrustedHttpOperatorScopes,
|
||||
} from "./http-utils.js";
|
||||
import { authorizeOperatorScopesForMethod } from "./method-scopes.js";
|
||||
import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS } from "./server-methods/chat.js";
|
||||
import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js";
|
||||
import {
|
||||
@@ -29,6 +33,8 @@ import {
|
||||
resolveSessionTranscriptCandidates,
|
||||
} from "./session-utils.js";
|
||||
|
||||
const log = createSubsystemLogger("gateway/sessions-history-sse");
|
||||
|
||||
const MAX_SESSION_HISTORY_LIMIT = 1000;
|
||||
|
||||
function resolveSessionHistoryPath(req: IncomingMessage): string | null {
|
||||
@@ -88,6 +94,7 @@ export async function handleSessionHistoryHttpRequest(
|
||||
res: ServerResponse,
|
||||
opts: {
|
||||
auth: ResolvedGatewayAuth;
|
||||
getResolvedAuth?: () => ResolvedGatewayAuth;
|
||||
trustedProxies?: string[];
|
||||
allowRealIpFallback?: boolean;
|
||||
rateLimiter?: AuthRateLimiter;
|
||||
@@ -197,50 +204,128 @@ export async function handleSessionHistoryHttpRequest(
|
||||
...sentHistory,
|
||||
});
|
||||
|
||||
const heartbeat = setInterval(() => {
|
||||
if (!res.writableEnded) {
|
||||
res.write(": keepalive\n\n");
|
||||
let cleanedUp = false;
|
||||
let streamQueue = Promise.resolve();
|
||||
// Forward-declared so `cleanup` can reference them without relying on
|
||||
// Temporal-Dead-Zone leniency. A future refactor that wires the close event
|
||||
// listeners before the `setInterval` / `onSessionTranscriptUpdate` calls
|
||||
// would otherwise hit a `ReferenceError` on the first cleanup invocation.
|
||||
let heartbeat: ReturnType<typeof setInterval> | undefined;
|
||||
let unsubscribe: (() => void) | undefined;
|
||||
|
||||
const cleanup = () => {
|
||||
if (cleanedUp) {
|
||||
return;
|
||||
}
|
||||
cleanedUp = true;
|
||||
if (heartbeat) {
|
||||
clearInterval(heartbeat);
|
||||
}
|
||||
if (unsubscribe) {
|
||||
unsubscribe();
|
||||
}
|
||||
};
|
||||
|
||||
const closeStream = () => {
|
||||
cleanup();
|
||||
if (!res.writableEnded) {
|
||||
res.end();
|
||||
}
|
||||
};
|
||||
|
||||
const queueStreamWork = (work: () => Promise<void>) => {
|
||||
streamQueue = streamQueue
|
||||
.then(async () => {
|
||||
if (cleanedUp || res.writableEnded) {
|
||||
return;
|
||||
}
|
||||
await work();
|
||||
})
|
||||
.catch((error) => {
|
||||
// Surface the underlying error so operators can distinguish transient
|
||||
// infrastructure failures (for example a `loadConfig()` read error
|
||||
// inside the reauth path) from deliberate revocation, then fail closed.
|
||||
log.warn("session history SSE stream work failed; closing stream", { error });
|
||||
closeStream();
|
||||
});
|
||||
};
|
||||
|
||||
const isStreamStillAuthorized = async (): Promise<boolean> => {
|
||||
const cfg = loadConfig();
|
||||
const currentRequestAuth = await checkGatewayHttpRequestAuth({
|
||||
req,
|
||||
auth: opts.getResolvedAuth?.() ?? opts.auth,
|
||||
trustedProxies: cfg.gateway?.trustedProxies,
|
||||
allowRealIpFallback: cfg.gateway?.allowRealIpFallback,
|
||||
rateLimiter: opts.rateLimiter,
|
||||
cfg,
|
||||
});
|
||||
if (!currentRequestAuth.ok) {
|
||||
return false;
|
||||
}
|
||||
const requestedScopes = resolveTrustedHttpOperatorScopes(req, currentRequestAuth.requestAuth);
|
||||
return authorizeOperatorScopesForMethod("chat.history", requestedScopes).allowed;
|
||||
};
|
||||
|
||||
heartbeat = setInterval(() => {
|
||||
queueStreamWork(async () => {
|
||||
if (!(await isStreamStillAuthorized())) {
|
||||
closeStream();
|
||||
return;
|
||||
}
|
||||
if (!res.writableEnded) {
|
||||
res.write(": keepalive\n\n");
|
||||
}
|
||||
});
|
||||
}, 15_000);
|
||||
|
||||
const unsubscribe = onSessionTranscriptUpdate((update) => {
|
||||
if (res.writableEnded || !entry?.sessionId) {
|
||||
unsubscribe = onSessionTranscriptUpdate((update) => {
|
||||
// Filter to candidate sessions synchronously before enqueueing any async
|
||||
// work. `onSessionTranscriptUpdate` is a global fan-out listener, so every
|
||||
// transcript write in the gateway would otherwise append a Promise-chain
|
||||
// entry capturing `update.message` to every open SSE stream's queue —
|
||||
// O(streams × updates) for busy deployments.
|
||||
if (!entry?.sessionId) {
|
||||
return;
|
||||
}
|
||||
const updatePath = canonicalizePath(update.sessionFile);
|
||||
if (!updatePath || !transcriptCandidates.has(updatePath)) {
|
||||
return;
|
||||
}
|
||||
if (update.message !== undefined) {
|
||||
if (limit === undefined && cursor === undefined) {
|
||||
const nextEvent = sseState.appendInlineMessage({
|
||||
message: update.message,
|
||||
messageId: update.messageId,
|
||||
});
|
||||
if (!nextEvent) {
|
||||
return;
|
||||
}
|
||||
sentHistory = sseState.snapshot();
|
||||
sseWrite(res, "message", {
|
||||
sessionKey: target.canonicalKey,
|
||||
message: nextEvent.message,
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
messageSeq: nextEvent.messageSeq,
|
||||
});
|
||||
queueStreamWork(async () => {
|
||||
if (res.writableEnded) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
sentHistory = sseState.refresh();
|
||||
sseWrite(res, "history", {
|
||||
sessionKey: target.canonicalKey,
|
||||
...sentHistory,
|
||||
if (!(await isStreamStillAuthorized())) {
|
||||
closeStream();
|
||||
return;
|
||||
}
|
||||
if (update.message !== undefined) {
|
||||
if (limit === undefined && cursor === undefined) {
|
||||
const nextEvent = sseState.appendInlineMessage({
|
||||
message: update.message,
|
||||
messageId: update.messageId,
|
||||
});
|
||||
if (!nextEvent) {
|
||||
return;
|
||||
}
|
||||
sentHistory = sseState.snapshot();
|
||||
sseWrite(res, "message", {
|
||||
sessionKey: target.canonicalKey,
|
||||
message: nextEvent.message,
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
messageSeq: nextEvent.messageSeq,
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
sentHistory = sseState.refresh();
|
||||
sseWrite(res, "history", {
|
||||
sessionKey: target.canonicalKey,
|
||||
...sentHistory,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
const cleanup = () => {
|
||||
clearInterval(heartbeat);
|
||||
unsubscribe();
|
||||
};
|
||||
req.on("close", cleanup);
|
||||
res.on("close", cleanup);
|
||||
res.on("finish", cleanup);
|
||||
|
||||
Reference in New Issue
Block a user