mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 12:41:12 +00:00
Gateway: surface blocked ACP task outcomes (#57203)
This commit is contained in:
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Config/runtime: pin the first successful config load in memory for the running process and refresh that snapshot on successful writes/reloads, so hot paths stop reparsing `openclaw.json` between watcher-driven swaps.
|
||||
- Config/legacy cleanup: stop probing obsolete alternate legacy config names and service labels during local config/service detection, while keeping the active `~/.openclaw/openclaw.json` path canonical.
|
||||
- ACP/sessions_spawn: register ACP child runs for completion tracking and lifecycle cleanup, and make registration-failure cleanup explicitly best-effort so callers do not assume an already-started ACP turn was fully aborted. (#40885) Thanks @xaeon2026 and @vincentkoc.
|
||||
- ACP/tasks: mark cleanly exited ACP runs as blocked when they end on deterministic write or authorization blockers, and wake the parent session with a follow-up instead of falsely reporting success.
|
||||
- ACPX/runtime: derive the bundled ACPX expected version from the extension package metadata instead of hardcoding a separate literal, so plugin-local ACPX installs stop drifting out of health-check parity after version bumps. (#49089) Thanks @jiejiesks and @vincentkoc.
|
||||
- Gateway/auth: make local-direct `trusted-proxy` fallback require the configured shared token instead of silently authenticating same-host callers, while keeping same-host reverse proxy identity-header flows on the normal trusted-proxy path. Thanks @zhangning-agent and @vincentkoc.
|
||||
- Agents/sandbox: honor `tools.sandbox.tools.alsoAllow`, let explicit sandbox re-allows remove matching built-in default-deny tools, and keep sandbox explain/error guidance aligned with the effective sandbox tool policy. (#54492) Thanks @ngutman.
|
||||
|
||||
@@ -104,6 +104,37 @@ function resolveBackgroundTaskFailureStatus(error: AcpRuntimeError): "failed" |
|
||||
return /\btimed out\b/i.test(error.message) ? "timed_out" : "failed";
|
||||
}
|
||||
|
||||
function resolveBackgroundTaskTerminalResult(progressSummary: string): {
|
||||
terminalOutcome?: "blocked";
|
||||
terminalSummary?: string;
|
||||
} {
|
||||
const normalized = normalizeText(progressSummary)?.replace(/\s+/g, " ").trim();
|
||||
if (!normalized) {
|
||||
return {};
|
||||
}
|
||||
const permissionDeniedMatch = normalized.match(
|
||||
/\b(?:write failed:\s*)?permission denied(?: for (?<path>\S+))?\.?/i,
|
||||
);
|
||||
if (permissionDeniedMatch) {
|
||||
const path = permissionDeniedMatch.groups?.path?.trim().replace(/[.,;:!?]+$/, "");
|
||||
return {
|
||||
terminalOutcome: "blocked",
|
||||
terminalSummary: path ? `Permission denied for ${path}.` : "Permission denied.",
|
||||
};
|
||||
}
|
||||
if (
|
||||
/\bneed a writable session\b/i.test(normalized) ||
|
||||
/\bfilesystem authorization\b/i.test(normalized) ||
|
||||
/`?apply_patch`?/i.test(normalized)
|
||||
) {
|
||||
return {
|
||||
terminalOutcome: "blocked",
|
||||
terminalSummary: "Writable session or apply_patch authorization required.",
|
||||
};
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
type BackgroundTaskContext = {
|
||||
requesterSessionKey: string;
|
||||
requesterOrigin?: DeliveryContext;
|
||||
@@ -800,13 +831,15 @@ export class AcpSessionManager {
|
||||
startedAt: turnStartedAt,
|
||||
});
|
||||
if (taskContext) {
|
||||
const terminalResult = resolveBackgroundTaskTerminalResult(taskProgressSummary);
|
||||
this.updateBackgroundTaskState(taskContext.runId, {
|
||||
status: "done",
|
||||
endedAt: Date.now(),
|
||||
lastEventAt: Date.now(),
|
||||
error: undefined,
|
||||
progressSummary: taskProgressSummary || null,
|
||||
terminalSummary: null,
|
||||
terminalSummary: terminalResult.terminalSummary ?? null,
|
||||
terminalOutcome: terminalResult.terminalOutcome,
|
||||
});
|
||||
}
|
||||
await this.setSessionState({
|
||||
|
||||
@@ -312,6 +312,8 @@ describe("AcpSessionManager", () => {
|
||||
task: "Implement the feature and report back",
|
||||
status: "done",
|
||||
progressSummary: "Write failed: permission denied for /root/oc-acp-write-should-fail.txt.",
|
||||
terminalOutcome: "blocked",
|
||||
terminalSummary: "Permission denied for /root/oc-acp-write-should-fail.txt.",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import { resetHeartbeatWakeStateForTests } from "../infra/heartbeat-wake.js";
|
||||
import {
|
||||
hasPendingHeartbeatWake,
|
||||
resetHeartbeatWakeStateForTests,
|
||||
} from "../infra/heartbeat-wake.js";
|
||||
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import {
|
||||
@@ -245,6 +248,44 @@ describe("task-registry", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("still wakes the parent when blocked delivery misses the outward channel", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
hoisted.sendMessageMock.mockRejectedValueOnce(new Error("telegram unavailable"));
|
||||
|
||||
createTaskRecord({
|
||||
source: "sessions_spawn",
|
||||
runtime: "acp",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterOrigin: {
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
},
|
||||
childSessionKey: "agent:main:acp:child",
|
||||
runId: "run-delivery-blocked",
|
||||
task: "Port the repo changes",
|
||||
status: "done",
|
||||
deliveryStatus: "pending",
|
||||
terminalOutcome: "blocked",
|
||||
terminalSummary: "Writable session or apply_patch authorization required.",
|
||||
});
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(findTaskByRunId("run-delivery-blocked")).toMatchObject({
|
||||
status: "done",
|
||||
deliveryStatus: "failed",
|
||||
terminalOutcome: "blocked",
|
||||
}),
|
||||
);
|
||||
expect(peekSystemEvents("agent:main:main")).toEqual([
|
||||
"Background task blocked: ACP background task (run run-deli). Writable session or apply_patch authorization required.",
|
||||
"Task needs follow-up: ACP background task (run run-deli). Writable session or apply_patch authorization required.",
|
||||
]);
|
||||
expect(hasPendingHeartbeatWake()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it("marks internal fallback delivery as session queued instead of delivered", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
@@ -284,6 +325,39 @@ describe("task-registry", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("wakes the parent for blocked tasks even when delivery falls back to the session", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
|
||||
createTaskRecord({
|
||||
source: "sessions_spawn",
|
||||
runtime: "acp",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
childSessionKey: "agent:main:acp:child",
|
||||
runId: "run-session-blocked",
|
||||
task: "Port the repo changes",
|
||||
status: "done",
|
||||
deliveryStatus: "pending",
|
||||
terminalOutcome: "blocked",
|
||||
terminalSummary: "Writable session or apply_patch authorization required.",
|
||||
});
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(findTaskByRunId("run-session-blocked")).toMatchObject({
|
||||
status: "done",
|
||||
deliveryStatus: "session_queued",
|
||||
}),
|
||||
);
|
||||
expect(peekSystemEvents("agent:main:main")).toEqual([
|
||||
"Background task blocked: ACP background task (run run-sess). Writable session or apply_patch authorization required.",
|
||||
"Task needs follow-up: ACP background task (run run-sess). Writable session or apply_patch authorization required.",
|
||||
]);
|
||||
expect(hasPendingHeartbeatWake()).toBe(true);
|
||||
expect(hoisted.sendMessageMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not include internal progress detail in the terminal channel message", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
@@ -335,6 +409,88 @@ describe("task-registry", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("surfaces blocked outcomes separately from completed tasks", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
hoisted.sendMessageMock.mockResolvedValue({
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
via: "direct",
|
||||
});
|
||||
|
||||
createTaskRecord({
|
||||
source: "sessions_spawn",
|
||||
runtime: "acp",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterOrigin: {
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
},
|
||||
childSessionKey: "agent:main:acp:child",
|
||||
runId: "run-blocked-outcome",
|
||||
task: "Port the repo changes",
|
||||
status: "done",
|
||||
deliveryStatus: "pending",
|
||||
terminalOutcome: "blocked",
|
||||
terminalSummary: "Writable session or apply_patch authorization required.",
|
||||
});
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
content:
|
||||
"Background task blocked: ACP background task (run run-bloc). Writable session or apply_patch authorization required.",
|
||||
}),
|
||||
),
|
||||
);
|
||||
expect(peekSystemEvents("agent:main:main")).toEqual([
|
||||
"Task needs follow-up: ACP background task (run run-bloc). Writable session or apply_patch authorization required.",
|
||||
]);
|
||||
expect(hasPendingHeartbeatWake()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it("does not queue an unblock follow-up for ordinary completed tasks", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
hoisted.sendMessageMock.mockResolvedValue({
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
via: "direct",
|
||||
});
|
||||
|
||||
createTaskRecord({
|
||||
source: "sessions_spawn",
|
||||
runtime: "acp",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterOrigin: {
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
},
|
||||
childSessionKey: "agent:main:acp:child",
|
||||
runId: "run-succeeded-outcome",
|
||||
task: "Create the file and verify it",
|
||||
status: "done",
|
||||
deliveryStatus: "pending",
|
||||
terminalSummary: "Created /tmp/file.txt and verified contents.",
|
||||
terminalOutcome: "succeeded",
|
||||
});
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
content:
|
||||
"Background task done: ACP background task (run run-succ). Created /tmp/file.txt and verified contents.",
|
||||
}),
|
||||
),
|
||||
);
|
||||
expect(peekSystemEvents("agent:main:main")).toEqual([]);
|
||||
expect(hasPendingHeartbeatWake()).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps distinct task records when different producers share a runId", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
@@ -493,6 +649,8 @@ describe("task-registry", () => {
|
||||
task: "Investigate issue",
|
||||
status: "done",
|
||||
deliveryStatus: "pending",
|
||||
terminalOutcome: "blocked",
|
||||
terminalSummary: "Writable session or apply_patch authorization required.",
|
||||
});
|
||||
|
||||
const first = maybeDeliverTaskTerminalUpdate(task.taskId);
|
||||
@@ -502,9 +660,9 @@ describe("task-registry", () => {
|
||||
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
idempotencyKey: `task-terminal:${task.taskId}:done`,
|
||||
idempotencyKey: `task-terminal:${task.taskId}:done:blocked`,
|
||||
mirror: expect.objectContaining({
|
||||
idempotencyKey: `task-terminal:${task.taskId}:done`,
|
||||
idempotencyKey: `task-terminal:${task.taskId}:done:blocked`,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -26,6 +26,7 @@ import type {
|
||||
TaskRuntime,
|
||||
TaskSource,
|
||||
TaskStatus,
|
||||
TaskTerminalOutcome,
|
||||
} from "./task-registry.types.js";
|
||||
|
||||
const log = createSubsystemLogger("tasks/registry");
|
||||
@@ -93,6 +94,12 @@ function normalizeTaskSummary(value: string | null | undefined): string | undefi
|
||||
return normalized || undefined;
|
||||
}
|
||||
|
||||
function normalizeTaskTerminalOutcome(
|
||||
value: TaskTerminalOutcome | null | undefined,
|
||||
): TaskTerminalOutcome | undefined {
|
||||
return value === "succeeded" || value === "blocked" ? value : undefined;
|
||||
}
|
||||
|
||||
const TASK_RECENT_EVENT_LIMIT = 12;
|
||||
|
||||
function appendTaskEvent(
|
||||
@@ -271,7 +278,8 @@ function mergeExistingTaskForCreate(
|
||||
}
|
||||
|
||||
function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string {
|
||||
return `task-terminal:${task.taskId}:${task.status}`;
|
||||
const outcome = task.status === "done" ? (task.terminalOutcome ?? "default") : "default";
|
||||
return `task-terminal:${task.taskId}:${task.status}:${outcome}`;
|
||||
}
|
||||
|
||||
function restoreTaskRegistryOnce() {
|
||||
@@ -334,6 +342,11 @@ function formatTaskTerminalEvent(task: TaskRecord): string {
|
||||
const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : "";
|
||||
const summary = task.terminalSummary?.trim();
|
||||
if (task.status === "done") {
|
||||
if (task.terminalOutcome === "blocked") {
|
||||
return summary
|
||||
? `Background task blocked: ${title}${runLabel}. ${summary}`
|
||||
: `Background task blocked: ${title}${runLabel}.`;
|
||||
}
|
||||
return summary
|
||||
? `Background task done: ${title}${runLabel}. ${summary}`
|
||||
: `Background task done: ${title}${runLabel}.`;
|
||||
@@ -377,6 +390,35 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) {
|
||||
return true;
|
||||
}
|
||||
|
||||
function queueBlockedTaskFollowup(task: TaskRecord) {
|
||||
if (task.status !== "done" || task.terminalOutcome !== "blocked") {
|
||||
return false;
|
||||
}
|
||||
const requesterSessionKey = task.requesterSessionKey.trim();
|
||||
if (!requesterSessionKey) {
|
||||
return false;
|
||||
}
|
||||
const title =
|
||||
task.label?.trim() ||
|
||||
(task.runtime === "acp"
|
||||
? "ACP background task"
|
||||
: task.runtime === "subagent"
|
||||
? "Subagent task"
|
||||
: task.task.trim() || "Background task");
|
||||
const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : "";
|
||||
const summary = task.terminalSummary?.trim() || "Task is blocked and needs follow-up.";
|
||||
enqueueSystemEvent(`Task needs follow-up: ${title}${runLabel}. ${summary}`, {
|
||||
sessionKey: requesterSessionKey,
|
||||
contextKey: `task:${task.taskId}:blocked-followup`,
|
||||
deliveryContext: task.requesterOrigin,
|
||||
});
|
||||
requestHeartbeatNow({
|
||||
reason: "background-task-blocked",
|
||||
sessionKey: requesterSessionKey,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
function formatTaskStateChangeEvent(task: TaskRecord, event: TaskEventRecord): string | null {
|
||||
const title =
|
||||
task.label?.trim() ||
|
||||
@@ -464,6 +506,9 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
|
||||
if (!canDeliverTaskToRequesterOrigin(latest)) {
|
||||
try {
|
||||
queueTaskSystemEvent(latest, eventText);
|
||||
if (latest.terminalOutcome === "blocked") {
|
||||
queueBlockedTaskFollowup(latest);
|
||||
}
|
||||
return updateTask(taskId, {
|
||||
deliveryStatus: "session_queued",
|
||||
lastEventAt: Date.now(),
|
||||
@@ -498,6 +543,9 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
|
||||
idempotencyKey: taskTerminalDeliveryIdempotencyKey(latest),
|
||||
},
|
||||
});
|
||||
if (latest.terminalOutcome === "blocked") {
|
||||
queueBlockedTaskFollowup(latest);
|
||||
}
|
||||
return updateTask(taskId, {
|
||||
deliveryStatus: "delivered",
|
||||
lastEventAt: Date.now(),
|
||||
@@ -511,6 +559,9 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
|
||||
});
|
||||
try {
|
||||
queueTaskSystemEvent(latest, eventText);
|
||||
if (latest.terminalOutcome === "blocked") {
|
||||
queueBlockedTaskFollowup(latest);
|
||||
}
|
||||
} catch (fallbackError) {
|
||||
log.warn("Failed to queue background task fallback event", {
|
||||
taskId,
|
||||
@@ -685,6 +736,7 @@ export function createTaskRecord(params: {
|
||||
lastEventAt?: number;
|
||||
progressSummary?: string | null;
|
||||
terminalSummary?: string | null;
|
||||
terminalOutcome?: TaskTerminalOutcome | null;
|
||||
transcriptPath?: string;
|
||||
streamLogPath?: string;
|
||||
backend?: string;
|
||||
@@ -725,6 +777,7 @@ export function createTaskRecord(params: {
|
||||
lastEventAt,
|
||||
progressSummary: normalizeTaskSummary(params.progressSummary),
|
||||
terminalSummary: normalizeTaskSummary(params.terminalSummary),
|
||||
terminalOutcome: normalizeTaskTerminalOutcome(params.terminalOutcome),
|
||||
recentEvents: appendTaskEvent(
|
||||
{
|
||||
taskId,
|
||||
@@ -767,6 +820,7 @@ export function updateTaskStateByRunId(params: {
|
||||
error?: string;
|
||||
progressSummary?: string | null;
|
||||
terminalSummary?: string | null;
|
||||
terminalOutcome?: TaskTerminalOutcome | null;
|
||||
eventSummary?: string | null;
|
||||
}) {
|
||||
ensureTaskRegistryReady();
|
||||
@@ -804,6 +858,9 @@ export function updateTaskStateByRunId(params: {
|
||||
if (params.terminalSummary !== undefined) {
|
||||
patch.terminalSummary = normalizeTaskSummary(params.terminalSummary);
|
||||
}
|
||||
if (params.terminalOutcome !== undefined) {
|
||||
patch.terminalOutcome = normalizeTaskTerminalOutcome(params.terminalOutcome);
|
||||
}
|
||||
const eventSummary =
|
||||
normalizeTaskSummary(params.eventSummary) ??
|
||||
(nextStatus === "failed"
|
||||
|
||||
@@ -21,6 +21,8 @@ export type TaskDeliveryStatus =
|
||||
|
||||
export type TaskNotifyPolicy = "done_only" | "state_changes" | "silent";
|
||||
|
||||
export type TaskTerminalOutcome = "succeeded" | "blocked";
|
||||
|
||||
export type TaskBindingTargetKind = "subagent" | "session";
|
||||
|
||||
export type TaskSource = "sessions_spawn" | "background_cli" | "unknown";
|
||||
@@ -54,6 +56,7 @@ export type TaskRecord = {
|
||||
error?: string;
|
||||
progressSummary?: string;
|
||||
terminalSummary?: string;
|
||||
terminalOutcome?: TaskTerminalOutcome;
|
||||
recentEvents?: TaskEventRecord[];
|
||||
lastNotifiedEventAt?: number;
|
||||
transcriptPath?: string;
|
||||
|
||||
Reference in New Issue
Block a user