mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-21 15:01:03 +00:00
fix(gateway): skip seq-gap broadcast for stale post-lifecycle events (#43751)
* fix: stop stale gateway seq-gap errors (#43751) (thanks @caesargattuso) * fix: keep agent.request run ids session-scoped --------- Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -118,6 +118,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/config validation: stop treating the implicit default memory slot as a required explicit plugin config, so startup no longer fails with `plugins.slots.memory: plugin not found: memory-core` when `memory-core` was only inferred. (#47494) Thanks @ngutman.
|
||||
- Tlon: honor explicit empty allowlists and defer cite expansion. (#46788) Thanks @zpbrent and @vincentkoc.
|
||||
- Tlon/DM auth: defer cited-message expansion until after DM authorization and owner command handling, so unauthorized DMs and owner approval/admin commands no longer trigger cross-channel cite fetches before the deny or command path.
|
||||
- Gateway/agent events: stop broadcasting false end-of-run `seq gap` errors to clients, and isolate node-driven ingress turns with per-turn run IDs so stale tail events cannot leak into later session runs. (#43751) Thanks @caesargattuso.
|
||||
- Docs/security audit: spell out that `gateway.controlUi.allowedOrigins: ["*"]` is an explicit allow-all browser-origin policy and should be avoided outside tightly controlled local testing.
|
||||
- Gateway/auth: clear self-declared scopes for device-less trusted-proxy Control UI sessions so proxy-authenticated connects cannot claim admin or secrets scopes without a bound device identity.
|
||||
- Nodes/pending actions: re-check queued foreground actions against the current node command policy before returning them to the node. (#46815) Thanks @zpbrent and @vincentkoc.
|
||||
|
||||
@@ -487,6 +487,46 @@ describe("agent event handler", () => {
|
||||
nowSpy?.mockRestore();
|
||||
});
|
||||
|
||||
it("drops stale events that arrive after lifecycle completion", () => {
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler, nowSpy } = createHarness({
|
||||
now: 2_500,
|
||||
});
|
||||
chatRunState.registry.add("run-stale-tail", {
|
||||
sessionKey: "session-stale-tail",
|
||||
clientRunId: "client-stale-tail",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-stale-tail",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "done" },
|
||||
});
|
||||
emitLifecycleEnd(handler, "run-stale-tail");
|
||||
const errorCallsBeforeStaleEvent = broadcast.mock.calls.filter(
|
||||
([event, payload]) =>
|
||||
event === "agent" && (payload as { stream?: string }).stream === "error",
|
||||
).length;
|
||||
const sessionChatCallsBeforeStaleEvent = sessionChatCalls(nodeSendToSession).length;
|
||||
|
||||
handler({
|
||||
runId: "run-stale-tail",
|
||||
seq: 3,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "late tail" },
|
||||
});
|
||||
|
||||
const errorCalls = broadcast.mock.calls.filter(
|
||||
([event, payload]) =>
|
||||
event === "agent" && (payload as { stream?: string }).stream === "error",
|
||||
);
|
||||
expect(errorCalls).toHaveLength(errorCallsBeforeStaleEvent);
|
||||
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(sessionChatCallsBeforeStaleEvent);
|
||||
nowSpy?.mockRestore();
|
||||
});
|
||||
|
||||
it("flushes buffered chat delta before tool start events", () => {
|
||||
let now = 12_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
|
||||
@@ -710,7 +710,7 @@ export function createAgentEventHandler({
|
||||
: { ...eventForClients, data };
|
||||
})()
|
||||
: agentPayload;
|
||||
if (evt.seq !== last + 1) {
|
||||
if (last > 0 && evt.seq !== last + 1) {
|
||||
broadcast("agent", {
|
||||
runId: eventRunId,
|
||||
stream: "error",
|
||||
|
||||
@@ -410,7 +410,9 @@ describe("voice transcript events", () => {
|
||||
});
|
||||
|
||||
it("forwards transcript with voice provenance", async () => {
|
||||
const addChatRun = vi.fn();
|
||||
const ctx = buildCtx();
|
||||
ctx.addChatRun = addChatRun;
|
||||
|
||||
await handleNodeEvent(ctx, "node-v2", {
|
||||
event: "voice.transcript",
|
||||
@@ -432,6 +434,12 @@ describe("voice transcript events", () => {
|
||||
sourceTool: "gateway.voice.transcript",
|
||||
},
|
||||
});
|
||||
expect(typeof opts.runId).toBe("string");
|
||||
expect(opts.runId).not.toBe(opts.sessionId);
|
||||
expect(addChatRun).toHaveBeenCalledWith(
|
||||
opts.runId,
|
||||
expect.objectContaining({ clientRunId: expect.stringMatching(/^voice-/) }),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not block agent dispatch when session-store touch fails", async () => {
|
||||
@@ -674,5 +682,6 @@ describe("agent request events", () => {
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
});
|
||||
expect(opts.runId).toBe(opts.sessionId);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -288,16 +288,18 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
sessionId,
|
||||
now,
|
||||
});
|
||||
const runId = randomUUID();
|
||||
|
||||
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
|
||||
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
|
||||
ctx.addChatRun(sessionId, {
|
||||
// This maps agent bus events (keyed by per-turn runId) to chat events (keyed by clientRunId).
|
||||
ctx.addChatRun(runId, {
|
||||
sessionKey: canonicalKey,
|
||||
clientRunId: `voice-${randomUUID()}`,
|
||||
});
|
||||
|
||||
void agentCommandFromIngress(
|
||||
{
|
||||
runId,
|
||||
message: text,
|
||||
sessionId,
|
||||
sessionKey: canonicalKey,
|
||||
@@ -404,7 +406,6 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
const deliver = deliverRequested && Boolean(channel && to);
|
||||
const deliveryChannel = deliver ? channel : undefined;
|
||||
const deliveryTo = deliver ? to : undefined;
|
||||
|
||||
if (deliverRequested && !deliver) {
|
||||
ctx.logGateway.warn(
|
||||
`agent delivery disabled node=${nodeId}: missing session delivery route (channel=${channel ?? "-"} to=${to ?? "-"})`,
|
||||
@@ -430,6 +431,7 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
|
||||
void agentCommandFromIngress(
|
||||
{
|
||||
runId: sessionId,
|
||||
message,
|
||||
images,
|
||||
sessionId,
|
||||
|
||||
Reference in New Issue
Block a user