mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:40:44 +00:00
fix(auto-reply): poison inbound dedupe after partial turn failure
* fix(auto-reply): poison inbound dedupe after replay-unsafe failures * fix(clownfish): address review for ghcrawl-165980-agentic-merge (1)
This commit is contained in:
@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Auto-reply: poison inbound message dedupe after replay-unsafe provider/runtime failures so retries stay safe before visible progress but cannot duplicate messages after block output, tool side effects, or session progress. Fixes #69303; keeps #58549 and #64606 as duplicate validation. Thanks @martingarramon, @NikolaFC, and @zeroth-blip.
|
||||
- Gateway/Bonjour: keep @homebridge/ciao cancellation handlers registered across advertiser restarts so late probing cancellations cannot crash Linux and other mDNS-churned gateways. Thanks @codex.
|
||||
- Plugins/startup: load the default `memory-core` slot during Gateway startup when permitted so active-memory recall can call `memory_search` and `memory_get` without requiring an explicit `plugins.slots.memory` entry, while preserving `plugins.slots.memory: "none"`. Thanks @codex.
|
||||
- Plugins/CLI: prefer native require for compiled bundled plugin JavaScript before jiti so read-only config, status, device, and node commands avoid unnecessary transform overhead on slow hosts. Fixes #62842. Thanks @Effet.
|
||||
|
||||
@@ -3392,6 +3392,95 @@ describe("dispatchReplyFromConfig", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("poisons inbound dedupe when dispatch fails after a block reply", async () => {
|
||||
setNoAbort();
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "whatsapp",
|
||||
OriginatingChannel: "whatsapp",
|
||||
OriginatingTo: "whatsapp:+15555550125",
|
||||
To: "whatsapp:+15555550125",
|
||||
AccountId: "default",
|
||||
MessageSid: "msg-dup-block-error",
|
||||
SessionKey: "agent:main:whatsapp:direct:+15555550125",
|
||||
CommandBody: "hello",
|
||||
RawBody: "hello",
|
||||
Body: "hello",
|
||||
});
|
||||
const firstDispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(
|
||||
async (_ctx: MsgContext, opts?: GetReplyOptions): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onBlockReply?.({ text: "partial answer" });
|
||||
throw new Error("provider failed after block");
|
||||
},
|
||||
);
|
||||
|
||||
await expect(
|
||||
dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher: firstDispatcher,
|
||||
replyResolver,
|
||||
}),
|
||||
).rejects.toThrow("provider failed after block");
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher: createDispatcher(),
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(firstDispatcher.sendBlockReply).toHaveBeenCalledWith({ text: "partial answer" });
|
||||
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("poisons inbound dedupe when dispatch fails after a suppressed tool result", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "whatsapp",
|
||||
OriginatingChannel: "whatsapp",
|
||||
OriginatingTo: "whatsapp:+15555550126",
|
||||
To: "whatsapp:+15555550126",
|
||||
AccountId: "default",
|
||||
MessageSid: "msg-dup-tool-error",
|
||||
SessionKey: "agent:main:whatsapp:direct:+15555550126",
|
||||
CommandBody: "hello",
|
||||
RawBody: "hello",
|
||||
Body: "hello",
|
||||
});
|
||||
const firstDispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(
|
||||
async (_ctx: MsgContext, opts?: GetReplyOptions): Promise<ReplyPayload | undefined> => {
|
||||
await opts?.onToolResult?.({ text: "tool touched external state" });
|
||||
throw new Error("provider failed after tool");
|
||||
},
|
||||
);
|
||||
|
||||
await expect(
|
||||
dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher: firstDispatcher,
|
||||
replyResolver,
|
||||
}),
|
||||
).rejects.toThrow("provider failed after tool");
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher: createDispatcher(),
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(firstDispatcher.sendToolResult).not.toHaveBeenCalled();
|
||||
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("passes configOverride to replyResolver when provided", async () => {
|
||||
setNoAbort();
|
||||
const cfg = emptyConfig;
|
||||
|
||||
@@ -343,6 +343,10 @@ export async function dispatchReplyFromConfig(
|
||||
recordProcessed("skipped", { reason: "duplicate" });
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
let inboundDedupeReplayUnsafe = false;
|
||||
const markInboundDedupeReplayUnsafe = () => {
|
||||
inboundDedupeReplayUnsafe = true;
|
||||
};
|
||||
|
||||
const initialSessionStoreEntry = resolveSessionStoreLookup(ctx, cfg);
|
||||
const boundAcpDispatchSessionKey = resolveBoundAcpDispatchSessionKey({ ctx, cfg });
|
||||
@@ -473,6 +477,7 @@ export async function dispatchReplyFromConfig(
|
||||
if (!shouldRouteToOriginating || !routeReplyChannel || !routeReplyTo || !routeReplyRuntime) {
|
||||
return null;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
return await routeReplyRuntime.routeReply({
|
||||
payload,
|
||||
channel: routeReplyChannel,
|
||||
@@ -538,6 +543,7 @@ export async function dispatchReplyFromConfig(
|
||||
}
|
||||
return result.ok;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
return mode === "additive"
|
||||
? dispatcher.sendToolResult(payload)
|
||||
: dispatcher.sendFinalReply(payload);
|
||||
@@ -721,6 +727,7 @@ export async function dispatchReplyFromConfig(
|
||||
);
|
||||
}
|
||||
} else {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
queuedFinal = dispatcher.sendFinalReply(payload);
|
||||
}
|
||||
} else {
|
||||
@@ -744,6 +751,9 @@ export async function dispatchReplyFromConfig(
|
||||
const sendFinalPayload = async (
|
||||
payload: ReplyPayload,
|
||||
): Promise<{ queuedFinal: boolean; routedFinalCount: number }> => {
|
||||
if (resolveSendableOutboundReplyParts(payload).hasContent) {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
}
|
||||
const ttsPayload = await maybeApplyTtsToReplyPayload({
|
||||
payload,
|
||||
cfg,
|
||||
@@ -767,6 +777,7 @@ export async function dispatchReplyFromConfig(
|
||||
routedFinalCount: result.ok ? 1 : 0,
|
||||
};
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
return {
|
||||
queuedFinal: dispatcher.sendFinalReply(normalizedPayload),
|
||||
routedFinalCount: 0,
|
||||
@@ -898,6 +909,7 @@ export async function dispatchReplyFromConfig(
|
||||
await sendPayloadAsync(payload, undefined, false);
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
dispatcher.sendToolResult(payload);
|
||||
};
|
||||
const sendPlanUpdate = async (payload: {
|
||||
@@ -914,6 +926,7 @@ export async function dispatchReplyFromConfig(
|
||||
await sendPayloadAsync(replyPayload, undefined, false);
|
||||
return;
|
||||
}
|
||||
markInboundDedupeReplayUnsafe();
|
||||
dispatcher.sendToolResult(replyPayload);
|
||||
};
|
||||
const summarizeApprovalLabel = (payload: {
|
||||
@@ -1019,6 +1032,7 @@ export async function dispatchReplyFromConfig(
|
||||
suppressTyping: typing.suppressTyping,
|
||||
onToolResult: (payload: ReplyPayload) => {
|
||||
const run = async () => {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
await onToolResultFromReplyOptions?.(payload);
|
||||
if (suppressDelivery) {
|
||||
return;
|
||||
@@ -1055,12 +1069,14 @@ export async function dispatchReplyFromConfig(
|
||||
if (shouldRouteToOriginating) {
|
||||
await sendPayloadAsync(deliveryPayload, undefined, false);
|
||||
} else {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
dispatcher.sendToolResult(deliveryPayload);
|
||||
}
|
||||
};
|
||||
return run();
|
||||
},
|
||||
onPlanUpdate: async (payload) => {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
await onPlanUpdateFromReplyOptions?.(payload);
|
||||
if (payload.phase !== "update" || suppressDefaultToolProgressMessages) {
|
||||
return;
|
||||
@@ -1068,6 +1084,7 @@ export async function dispatchReplyFromConfig(
|
||||
await sendPlanUpdate({ explanation: payload.explanation, steps: payload.steps });
|
||||
},
|
||||
onApprovalEvent: async (payload) => {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
await onApprovalEventFromReplyOptions?.(payload);
|
||||
if (payload.phase !== "requested" || suppressDefaultToolProgressMessages) {
|
||||
return;
|
||||
@@ -1083,6 +1100,7 @@ export async function dispatchReplyFromConfig(
|
||||
await maybeSendWorkingStatus(label);
|
||||
},
|
||||
onPatchSummary: async (payload) => {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
await onPatchSummaryFromReplyOptions?.(payload);
|
||||
if (payload.phase !== "end" || suppressDefaultToolProgressMessages) {
|
||||
return;
|
||||
@@ -1095,6 +1113,12 @@ export async function dispatchReplyFromConfig(
|
||||
},
|
||||
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => {
|
||||
const run = async () => {
|
||||
if (
|
||||
payload.isReasoning !== true &&
|
||||
resolveSendableOutboundReplyParts(payload).hasContent
|
||||
) {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
}
|
||||
if (suppressDelivery) {
|
||||
return;
|
||||
}
|
||||
@@ -1156,6 +1180,7 @@ export async function dispatchReplyFromConfig(
|
||||
if (shouldRouteToOriginating) {
|
||||
await sendPayloadAsync(normalizedPayload, context?.abortSignal, false);
|
||||
} else {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
dispatcher.sendBlockReply(normalizedPayload);
|
||||
}
|
||||
};
|
||||
@@ -1268,6 +1293,7 @@ export async function dispatchReplyFromConfig(
|
||||
);
|
||||
}
|
||||
} else {
|
||||
markInboundDedupeReplayUnsafe();
|
||||
const didQueue = dispatcher.sendFinalReply(normalizedTtsOnlyPayload);
|
||||
queuedFinal = didQueue || queuedFinal;
|
||||
}
|
||||
@@ -1293,7 +1319,11 @@ export async function dispatchReplyFromConfig(
|
||||
return { queuedFinal, counts };
|
||||
} catch (err) {
|
||||
if (inboundDedupeClaim.status === "claimed") {
|
||||
releaseInboundDedupe(inboundDedupeClaim.key);
|
||||
if (inboundDedupeReplayUnsafe) {
|
||||
commitInboundDedupe(inboundDedupeClaim.key);
|
||||
} else {
|
||||
releaseInboundDedupe(inboundDedupeClaim.key);
|
||||
}
|
||||
}
|
||||
recordProcessed("error", { error: String(err) });
|
||||
markIdle("message_error");
|
||||
|
||||
@@ -72,4 +72,33 @@ describe("inbound dedupe", () => {
|
||||
inboundB.resetInboundDedupe();
|
||||
}
|
||||
});
|
||||
|
||||
it("shares claim/commit state across distinct module instances", async () => {
|
||||
const inboundA = await importFreshModule<typeof import("./inbound-dedupe.js")>(
|
||||
import.meta.url,
|
||||
"./inbound-dedupe.js?scope=commit-a",
|
||||
);
|
||||
const inboundB = await importFreshModule<typeof import("./inbound-dedupe.js")>(
|
||||
import.meta.url,
|
||||
"./inbound-dedupe.js?scope=commit-b",
|
||||
);
|
||||
|
||||
inboundA.resetInboundDedupe();
|
||||
inboundB.resetInboundDedupe();
|
||||
|
||||
try {
|
||||
const firstClaim = inboundA.claimInboundDedupe(sharedInboundContext);
|
||||
expect(firstClaim).toMatchObject({ status: "claimed" });
|
||||
if (firstClaim.status !== "claimed") {
|
||||
throw new Error("expected claimed inbound dedupe result");
|
||||
}
|
||||
inboundA.commitInboundDedupe(firstClaim.key);
|
||||
expect(inboundB.claimInboundDedupe(sharedInboundContext)).toMatchObject({
|
||||
status: "duplicate",
|
||||
});
|
||||
} finally {
|
||||
inboundA.resetInboundDedupe();
|
||||
inboundB.resetInboundDedupe();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user