Gateway: track background task lifecycle (#52518)

Merged via squash.

Prepared head SHA: 7c4554204e
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-03-29 12:48:02 +02:00
committed by GitHub
parent 270d0c5158
commit 17c36b5093
38 changed files with 3548 additions and 58 deletions

View File

@@ -71,6 +71,7 @@ Docs: https://docs.openclaw.ai
- Memory/plugins: move the pre-compaction memory flush plan behind the active memory plugin contract so `memory-core` owns flush prompts and target-path policy instead of hardcoded core logic.
- MiniMax: trim model catalog to M2.7 only, removing legacy M2, M2.1, M2.5, and VL-01 models. (#54487) Thanks @liyuan97.
- Plugins/runtime: expose `runHeartbeatOnce` in the plugin runtime `system` namespace so plugins can trigger a single heartbeat cycle with an explicit delivery target override (e.g. `heartbeat: { target: "last" }`). (#40299) Thanks @loveyana.
- Background tasks: keep durable lifecycle records for ACP/subagent spawned work and deliver ACP completion/failure updates through the real requester chat path instead of session-only stream events.
- Agents/compaction: preserve the post-compaction AGENTS refresh on stale-usage preflight compaction for both immediate replies and queued followups. (#49479) Thanks @jared596.
- Agents/compaction: surface safeguard-specific cancel reasons and relabel benign manual `/compact` no-op cases as skipped instead of failed. (#51072) Thanks @afurm.
- Docs: add `pnpm docs:check-links:anchors` for Mintlify anchor validation while keeping `scripts/docs-link-audit.mjs` as the stable link-audit entrypoint. (#55912) Thanks @velvet-shark.

View File

@@ -3,6 +3,8 @@ import type { OpenClawConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import { normalizeAgentId } from "../../routing/session-key.js";
import { isAcpSessionKey } from "../../sessions/session-key-utils.js";
import { createTaskRecord, updateTaskStateByRunId } from "../../tasks/task-registry.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
import {
AcpRuntimeError,
toAcpRuntimeError,
@@ -75,6 +77,43 @@ import { SessionActorQueue } from "./session-actor-queue.js";
const ACP_TURN_TIMEOUT_GRACE_MS = 1_000;
const ACP_TURN_TIMEOUT_CLEANUP_GRACE_MS = 2_000;
const ACP_TURN_TIMEOUT_REASON = "turn-timeout";
const ACP_BACKGROUND_TASK_TEXT_MAX_LENGTH = 160;
const ACP_BACKGROUND_TASK_PROGRESS_MAX_LENGTH = 240;
function summarizeBackgroundTaskText(text: string): string {
const normalized = normalizeText(text) ?? "ACP background task";
if (normalized.length <= ACP_BACKGROUND_TASK_TEXT_MAX_LENGTH) {
return normalized;
}
return `${normalized.slice(0, ACP_BACKGROUND_TASK_TEXT_MAX_LENGTH - 1)}`;
}
function appendBackgroundTaskProgressSummary(current: string, chunk: string): string {
const normalizedChunk = normalizeText(chunk)?.replace(/\s+/g, " ");
if (!normalizedChunk) {
return current;
}
const combined = current ? `${current} ${normalizedChunk}` : normalizedChunk;
if (combined.length <= ACP_BACKGROUND_TASK_PROGRESS_MAX_LENGTH) {
return combined;
}
return `${combined.slice(0, ACP_BACKGROUND_TASK_PROGRESS_MAX_LENGTH - 1)}`;
}
function resolveBackgroundTaskFailureStatus(error: AcpRuntimeError): "failed" | "timed_out" {
return /\btimed out\b/i.test(error.message) ? "timed_out" : "failed";
}
type BackgroundTaskContext = {
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
childSessionKey: string;
runId: string;
label?: string;
task: string;
};
type BackgroundTaskStatePatch = Omit<Parameters<typeof updateTaskStateByRunId>[0], "runId">;
export class AcpSessionManager {
private readonly actorQueue = new SessionActorQueue();
@@ -614,6 +653,19 @@ export class AcpSessionManager {
async () => {
const turnStartedAt = Date.now();
const actorKey = normalizeActorKey(sessionKey);
const taskContext =
input.mode === "prompt"
? this.resolveBackgroundTaskContext({
cfg: input.cfg,
sessionKey,
requestId: input.requestId,
text: input.text,
})
: null;
if (taskContext) {
this.createBackgroundTaskRecord(taskContext, turnStartedAt);
}
let taskProgressSummary = "";
for (let attempt = 0; attempt < 2; attempt += 1) {
const resolution = this.resolveSession({
cfg: input.cfg,
@@ -696,6 +748,19 @@ export class AcpSessionManager {
);
} else if (event.type === "text_delta" || event.type === "tool_call") {
sawTurnOutput = true;
if (event.type === "text_delta" && event.stream !== "thought" && event.text) {
taskProgressSummary = appendBackgroundTaskProgressSummary(
taskProgressSummary,
event.text,
);
}
if (taskContext) {
this.updateBackgroundTaskState(taskContext.runId, {
status: "running",
lastEventAt: Date.now(),
progressSummary: taskProgressSummary || null,
});
}
}
if (input.onEvent) {
await input.onEvent(event);
@@ -734,6 +799,16 @@ export class AcpSessionManager {
this.recordTurnCompletion({
startedAt: turnStartedAt,
});
if (taskContext) {
this.updateBackgroundTaskState(taskContext.runId, {
status: "done",
endedAt: Date.now(),
lastEventAt: Date.now(),
error: undefined,
progressSummary: taskProgressSummary || null,
terminalSummary: null,
});
}
await this.setSessionState({
cfg: input.cfg,
sessionKey,
@@ -762,6 +837,16 @@ export class AcpSessionManager {
startedAt: turnStartedAt,
errorCode: acpError.code,
});
if (taskContext) {
this.updateBackgroundTaskState(taskContext.runId, {
status: resolveBackgroundTaskFailureStatus(acpError),
endedAt: Date.now(),
lastEventAt: Date.now(),
error: acpError.message,
progressSummary: taskProgressSummary || null,
terminalSummary: null,
});
}
await this.setSessionState({
cfg: input.cfg,
sessionKey,
@@ -1729,4 +1814,66 @@ export class AcpSessionManager {
(a.agentSessionId ?? "") === (b.agentSessionId ?? "")
);
}
private resolveBackgroundTaskContext(params: {
cfg: OpenClawConfig;
sessionKey: string;
requestId: string;
text: string;
}): BackgroundTaskContext | null {
const childEntry = this.deps.readSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
})?.entry;
const requesterSessionKey =
normalizeText(childEntry?.spawnedBy) ?? normalizeText(childEntry?.parentSessionKey);
if (!requesterSessionKey) {
return null;
}
const parentEntry = this.deps.readSessionEntry({
cfg: params.cfg,
sessionKey: requesterSessionKey,
})?.entry;
return {
requesterSessionKey,
requesterOrigin: parentEntry?.deliveryContext ?? childEntry?.deliveryContext,
childSessionKey: params.sessionKey,
runId: params.requestId,
label: normalizeText(childEntry?.label),
task: summarizeBackgroundTaskText(params.text),
};
}
private createBackgroundTaskRecord(context: BackgroundTaskContext, startedAt: number): void {
try {
createTaskRecord({
source: "unknown",
runtime: "acp",
requesterSessionKey: context.requesterSessionKey,
requesterOrigin: context.requesterOrigin,
childSessionKey: context.childSessionKey,
runId: context.runId,
bindingTargetKind: "session",
label: context.label,
task: context.task,
status: "running",
startedAt,
});
} catch (error) {
logVerbose(
`acp-manager: failed creating background task for ${context.runId}: ${String(error)}`,
);
}
}
private updateBackgroundTaskState(runId: string, patch: BackgroundTaskStatePatch): void {
try {
updateTaskStateByRunId({
...patch,
runId,
});
} catch (error) {
logVerbose(`acp-manager: failed updating background task for ${runId}: ${String(error)}`);
}
}
}

View File

@@ -1,8 +1,10 @@
import { setTimeout as scheduleNativeTimeout } from "node:timers";
import { setTimeout as sleep } from "node:timers/promises";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import type { AcpSessionRuntimeOptions, SessionAcpMeta } from "../../config/sessions/types.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { withTempDir } from "../../test-helpers/temp-dir.js";
import type { AcpRuntime, AcpRuntimeCapabilities } from "../runtime/types.js";
const hoisted = vi.hoisted(() => {
@@ -44,6 +46,7 @@ const baseCfg = {
dispatch: { enabled: true },
},
} as const;
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
function createRuntime(): {
runtime: AcpRuntime;
@@ -168,6 +171,15 @@ describe("AcpSessionManager", () => {
hoisted.requireAcpRuntimeBackendMock.mockReset();
});
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests();
});
it("marks ACP-shaped sessions without metadata as stale", () => {
hoisted.readAcpSessionEntryMock.mockReturnValue(null);
const manager = new AcpSessionManager();
@@ -236,6 +248,74 @@ describe("AcpSessionManager", () => {
);
});
it("tracks parented direct ACP turns in the task registry", async () => {
await withTempDir({ prefix: "openclaw-acp-manager-task-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const runtimeState = createRuntime();
runtimeState.runTurn.mockImplementation(async function* () {
yield {
type: "text_delta" as const,
stream: "output" as const,
text: "Write failed: permission denied for /root/oc-acp-write-should-fail.txt.",
};
yield { type: "done" as const };
});
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => {
const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey;
if (sessionKey === "agent:codex:acp:child-1") {
return {
sessionKey,
storeSessionKey: sessionKey,
entry: {
sessionId: "child-1",
updatedAt: Date.now(),
spawnedBy: "agent:quant:telegram:quant:direct:822430204",
label: "Quant patch",
},
acp: readySessionMeta(),
};
}
if (sessionKey === "agent:quant:telegram:quant:direct:822430204") {
return {
sessionKey,
storeSessionKey: sessionKey,
entry: {
sessionId: "parent-1",
updatedAt: Date.now(),
},
};
}
return null;
});
const manager = new AcpSessionManager();
await manager.runTurn({
cfg: baseCfg,
sessionKey: "agent:codex:acp:child-1",
text: "Implement the feature and report back",
mode: "prompt",
requestId: "direct-parented-run",
});
expect(findTaskByRunId("direct-parented-run")).toMatchObject({
source: "unknown",
runtime: "acp",
requesterSessionKey: "agent:quant:telegram:quant:direct:822430204",
childSessionKey: "agent:codex:acp:child-1",
label: "Quant patch",
task: "Implement the feature and report back",
status: "done",
progressSummary: "Write failed: permission denied for /root/oc-acp-write-should-fail.txt.",
});
});
});
it("serializes concurrent turns for the same ACP session", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({

View File

@@ -0,0 +1,29 @@
import type { SessionEntry } from "../config/sessions/types.js";
export type AcpSessionInteractionMode = "interactive" | "parent-owned-background";
type SessionInteractionEntry = Pick<SessionEntry, "spawnedBy" | "parentSessionKey" | "acp">;
function normalizeText(value: string | undefined): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}
export function resolveAcpSessionInteractionMode(
entry?: SessionInteractionEntry | null,
): AcpSessionInteractionMode {
// Parent-owned oneshot ACP sessions are background work delegated from another session.
// They should report back through the parent task notifier instead of speaking directly
// on the user-facing channel themselves.
if (entry?.acp?.mode !== "oneshot") {
return "interactive";
}
if (normalizeText(entry.spawnedBy) || normalizeText(entry.parentSessionKey)) {
return "parent-owned-background";
}
return "interactive";
}
export function isParentOwnedBackgroundAcpSession(entry?: SessionInteractionEntry | null): boolean {
return resolveAcpSessionInteractionMode(entry) === "parent-owned-background";
}

View File

@@ -20,14 +20,25 @@ vi.mock("../infra/heartbeat-wake.js", async (importOriginal) => {
);
});
vi.mock("../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args),
}));
vi.mock("../acp/runtime/session-meta.js", async (importOriginal) => {
return await mergeMockedModule(
await importOriginal<typeof import("../acp/runtime/session-meta.js")>(),
() => ({
readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args),
}),
);
});
vi.mock("../config/sessions/paths.js", () => ({
resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args),
resolveSessionFilePathOptions: (...args: unknown[]) => resolveSessionFilePathOptionsMock(...args),
}));
vi.mock("../config/sessions/paths.js", async (importOriginal) => {
return await mergeMockedModule(
await importOriginal<typeof import("../config/sessions/paths.js")>(),
() => ({
resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args),
resolveSessionFilePathOptions: (...args: unknown[]) =>
resolveSessionFilePathOptionsMock(...args),
}),
);
});
let emitAgentEvent: typeof import("../infra/agent-events.js").emitAgentEvent;
let resolveAcpSpawnStreamLogPath: typeof import("./acp-spawn-parent-stream.js").resolveAcpSpawnStreamLogPath;
@@ -48,14 +59,28 @@ async function loadFreshAcpSpawnParentStreamModulesForTest() {
}),
);
});
vi.doMock("../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args),
}));
vi.doMock("../config/sessions/paths.js", () => ({
resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args),
resolveSessionFilePathOptions: (...args: unknown[]) =>
resolveSessionFilePathOptionsMock(...args),
}));
vi.doMock("../acp/runtime/session-meta.js", async () => {
return await mergeMockedModule(
await vi.importActual<typeof import("../acp/runtime/session-meta.js")>(
"../acp/runtime/session-meta.js",
),
() => ({
readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args),
}),
);
});
vi.doMock("../config/sessions/paths.js", async () => {
return await mergeMockedModule(
await vi.importActual<typeof import("../config/sessions/paths.js")>(
"../config/sessions/paths.js",
),
() => ({
resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args),
resolveSessionFilePathOptions: (...args: unknown[]) =>
resolveSessionFilePathOptionsMock(...args),
}),
);
});
const [agentEvents, relayModule] = await Promise.all([
import("../infra/agent-events.js"),
import("./acp-spawn-parent-stream.js"),
@@ -219,6 +244,39 @@ describe("startAcpSpawnParentStreamRelay", () => {
relay.dispose();
});
it("can keep background relays out of the parent session while still logging", () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-quiet",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child-quiet",
agentId: "codex",
surfaceUpdates: false,
streamFlushMs: 10,
noOutputNoticeMs: 120_000,
});
relay.notifyStarted();
emitAgentEvent({
runId: "run-quiet",
stream: "assistant",
data: {
delta: "hello from child",
},
});
vi.advanceTimersByTime(15);
emitAgentEvent({
runId: "run-quiet",
stream: "lifecycle",
data: {
phase: "end",
},
});
expect(collectedTexts()).toEqual([]);
expect(requestHeartbeatNowMock).not.toHaveBeenCalled();
relay.dispose();
});
it("preserves delta whitespace boundaries in progress relays", () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-5",

View File

@@ -6,6 +6,7 @@ import { onAgentEvent } from "../infra/agent-events.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { scopedHeartbeatWakeOptions } from "../routing/session-key.js";
import { updateTaskStateByRunId } from "../tasks/task-registry.js";
const DEFAULT_STREAM_FLUSH_MS = 2_500;
const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000;
@@ -79,6 +80,7 @@ export function startAcpSpawnParentStreamRelay(params: {
childSessionKey: string;
agentId: string;
logPath?: string;
surfaceUpdates?: boolean;
streamFlushMs?: number;
noOutputNoticeMs?: number;
noOutputPollMs?: number;
@@ -178,7 +180,11 @@ export function startAcpSpawnParentStreamRelay(params: {
...fields,
});
};
const shouldSurfaceUpdates = params.surfaceUpdates !== false;
const wake = () => {
if (!shouldSurfaceUpdates) {
return;
}
requestHeartbeatNow(
scopedHeartbeatWakeOptions(parentSessionKey, {
reason: "acp:spawn:stream",
@@ -191,10 +197,18 @@ export function startAcpSpawnParentStreamRelay(params: {
return;
}
logEvent("system_event", { contextKey, text: cleaned });
if (!shouldSurfaceUpdates) {
return;
}
enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey });
wake();
};
const emitStartNotice = () => {
updateTaskStateByRunId({
runId,
lastEventAt: Date.now(),
eventSummary: "Started.",
});
emit(
`Started ${relayLabel} session ${params.childSessionKey}. Streaming progress updates to parent session.`,
`${contextPrefix}:start`,
@@ -257,6 +271,11 @@ export function startAcpSpawnParentStreamRelay(params: {
return;
}
stallNotified = true;
updateTaskStateByRunId({
runId,
lastEventAt: Date.now(),
eventSummary: `No output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for input.`,
});
emit(
`${relayLabel} has produced no output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for interactive input.`,
`${contextPrefix}:stall`,
@@ -298,6 +317,11 @@ export function startAcpSpawnParentStreamRelay(params: {
if (stallNotified) {
stallNotified = false;
updateTaskStateByRunId({
runId,
lastEventAt: Date.now(),
eventSummary: "Resumed output.",
});
emit(`${relayLabel} resumed output.`, `${contextPrefix}:resumed`);
}

View File

@@ -6,7 +6,8 @@ import {
setRuntimeConfigSnapshot,
type OpenClawConfig,
} from "../config/config.js";
import * as sessionConfig from "../config/sessions.js";
import * as sessionPaths from "../config/sessions/paths.js";
import * as sessionStore from "../config/sessions/store.js";
import * as sessionTranscript from "../config/sessions/transcript.js";
import * as gatewayCall from "../gateway/call.js";
import * as heartbeatWake from "../infra/heartbeat-wake.js";
@@ -17,6 +18,7 @@ import {
type SessionBindingPlacement,
type SessionBindingRecord,
} from "../infra/outbound/session-binding-service.js";
import { resetTaskRegistryForTests } from "../tasks/task-registry.js";
import * as acpSpawnParentStream from "./acp-spawn-parent-stream.js";
function createDefaultSpawnConfig(): OpenClawConfig {
@@ -78,8 +80,8 @@ const hoisted = vi.hoisted(() => {
const callGatewaySpy = vi.spyOn(gatewayCall, "callGateway");
const getAcpSessionManagerSpy = vi.spyOn(acpSessionManager, "getAcpSessionManager");
const loadSessionStoreSpy = vi.spyOn(sessionConfig, "loadSessionStore");
const resolveStorePathSpy = vi.spyOn(sessionConfig, "resolveStorePath");
const loadSessionStoreSpy = vi.spyOn(sessionStore, "loadSessionStore");
const resolveStorePathSpy = vi.spyOn(sessionPaths, "resolveStorePath");
const resolveSessionTranscriptFileSpy = vi.spyOn(sessionTranscript, "resolveSessionTranscriptFile");
const areHeartbeatsEnabledSpy = vi.spyOn(heartbeatWake, "areHeartbeatsEnabled");
const startAcpSpawnParentStreamRelaySpy = vi.spyOn(
@@ -250,6 +252,7 @@ function enableLineCurrentConversationBindings(): void {
describe("spawnAcpDirect", () => {
beforeEach(() => {
replaceSpawnConfig(createDefaultSpawnConfig());
resetTaskRegistryForTests();
hoisted.areHeartbeatsEnabledMock.mockReset().mockReturnValue(true);
hoisted.callGatewayMock.mockReset();
@@ -414,6 +417,7 @@ describe("spawnAcpDirect", () => {
});
afterEach(() => {
resetTaskRegistryForTests();
sessionBindingServiceTesting.resetSessionBindingAdaptersForTests();
clearRuntimeConfigSnapshot();
});
@@ -673,15 +677,15 @@ describe("spawnAcpDirect", () => {
it.each([
{
name: "inlines delivery for run-mode spawns from non-subagent requester sessions",
name: "does not inline delivery for run-mode spawns from non-subagent requester sessions",
ctx: createRequesterContext(),
expectedAgentCall: {
deliver: true,
channel: "telegram",
to: "telegram:6098642967",
threadId: "1",
deliver: false,
channel: undefined,
to: undefined,
threadId: undefined,
} satisfies AgentCallParams,
expectTranscriptPersistence: true,
expectTranscriptPersistence: false,
},
{
name: "does not inline delivery for run-mode spawns from subagent requester sessions",

View File

@@ -26,8 +26,10 @@ import {
import { parseDurationMs } from "../cli/parse-duration.js";
import { loadConfig } from "../config/config.js";
import type { OpenClawConfig } from "../config/config.js";
import { loadSessionStore, resolveStorePath, type SessionEntry } from "../config/sessions.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { loadSessionStore } from "../config/sessions/store.js";
import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js";
import type { SessionEntry } from "../config/sessions/types.js";
import { callGateway } from "../gateway/call.js";
import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js";
import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
@@ -42,6 +44,7 @@ import {
normalizeAgentId,
parseAgentSessionKey,
} from "../routing/session-key.js";
import { createTaskRecord } from "../tasks/task-registry.js";
import {
deliveryContextFromSession,
formatConversationTarget,
@@ -710,15 +713,11 @@ function resolveAcpSpawnBootstrapDeliveryPlan(params: {
const hasDeliveryTarget = Boolean(params.requester.origin?.channel && inferredDeliveryTo);
// Thread-bound session spawns always deliver inline to their bound thread.
// Run-mode spawns use stream-to-parent when the requester is a subagent
// orchestrator with an active heartbeat relay route. For all other run-mode
// spawns from non-subagent requester sessions, fall back to inline delivery
// so the result reaches the originating channel.
// Background run-mode spawns should stay internal and report back through
// the parent task lifecycle notifier instead of letting the child ACP
// session write raw output directly into the originating channel.
const useInlineDelivery =
hasDeliveryTarget &&
!params.effectiveStreamToParent &&
(params.spawnMode === "session" ||
(!params.requester.isSubagentSession && !params.requestThreadBinding));
hasDeliveryTarget && !params.effectiveStreamToParent && params.spawnMode === "session";
return {
useInlineDelivery,
@@ -953,6 +952,29 @@ export async function spawnAcpDirect(
});
}
parentRelay?.notifyStarted();
try {
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: requesterInternalKey,
requesterOrigin: requesterState.origin,
childSessionKey: sessionKey,
runId: childRunId,
bindingTargetKind: "session",
label: params.label,
task: params.task,
status: "running",
deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing",
startedAt: Date.now(),
streamLogPath,
});
} catch (error) {
log.warn("Failed to create background task for ACP spawn", {
sessionKey,
runId: childRunId,
error,
});
}
return {
status: "accepted",
childSessionKey: sessionKey,
@@ -963,6 +985,29 @@ export async function spawnAcpDirect(
};
}
try {
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: requesterInternalKey,
requesterOrigin: requesterState.origin,
childSessionKey: sessionKey,
runId: childRunId,
bindingTargetKind: "session",
label: params.label,
task: params.task,
status: "running",
deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing",
startedAt: Date.now(),
});
} catch (error) {
log.warn("Failed to create background task for ACP spawn", {
sessionKey,
runId: childRunId,
error,
});
}
return {
status: "accepted",
childSessionKey: sessionKey,

View File

@@ -1,6 +1,7 @@
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { defaultRuntime } from "../runtime.js";
import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { updateTaskDeliveryByRunId, updateTaskStateByRunId } from "../tasks/task-registry.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import {
captureSubagentCompletionReply,
@@ -151,6 +152,10 @@ export function createSubagentRegistryLifecycleController(params: {
entry: SubagentRunRecord;
reason: "retry-limit" | "expiry";
}) => {
updateTaskDeliveryByRunId({
runId: giveUpParams.runId,
deliveryStatus: "failed",
});
giveUpParams.entry.wakeOnDescendantSettle = undefined;
giveUpParams.entry.fallbackFrozenResultText = undefined;
giveUpParams.entry.fallbackFrozenResultCapturedAt = undefined;
@@ -263,6 +268,10 @@ export function createSubagentRegistryLifecycleController(params: {
return;
}
if (didAnnounce) {
updateTaskDeliveryByRunId({
runId,
deliveryStatus: "delivered",
});
entry.wakeOnDescendantSettle = undefined;
entry.fallbackFrozenResultText = undefined;
entry.fallbackFrozenResultCapturedAt = undefined;
@@ -315,6 +324,10 @@ export function createSubagentRegistryLifecycleController(params: {
}
if (deferredDecision.kind === "give-up") {
updateTaskDeliveryByRunId({
runId,
deliveryStatus: "failed",
});
entry.wakeOnDescendantSettle = undefined;
entry.fallbackFrozenResultText = undefined;
entry.fallbackFrozenResultCapturedAt = undefined;
@@ -443,6 +456,21 @@ export function createSubagentRegistryLifecycleController(params: {
if (mutated) {
params.persist();
}
updateTaskStateByRunId({
runId: entry.runId,
status:
completeParams.outcome.status === "ok"
? "done"
: completeParams.outcome.status === "timeout"
? "timed_out"
: "failed",
startedAt: entry.startedAt,
endedAt: entry.endedAt,
lastEventAt: entry.endedAt ?? Date.now(),
error: completeParams.outcome.status === "error" ? completeParams.outcome.error : undefined,
progressSummary: entry.frozenResultText ?? undefined,
terminalSummary: null,
});
try {
await persistSubagentSessionTiming(entry);

View File

@@ -1,6 +1,7 @@
import { loadConfig } from "../config/config.js";
import { callGateway } from "../gateway/call.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { createTaskRecord } from "../tasks/task-registry.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js";
import type { SubagentRunOutcome } from "./subagent-announce.js";
@@ -315,6 +316,29 @@ export function createSubagentRunManager(params: {
attachmentsRootDir: registerParams.attachmentsRootDir,
retainAttachmentsOnKeep: registerParams.retainAttachmentsOnKeep,
});
try {
createTaskRecord({
source: "sessions_spawn",
runtime: "subagent",
requesterSessionKey: registerParams.requesterSessionKey,
requesterOrigin,
childSessionKey: registerParams.childSessionKey,
runId: registerParams.runId,
bindingTargetKind: "subagent",
label: registerParams.label,
task: registerParams.task,
status: "running",
deliveryStatus:
registerParams.expectsCompletionMessage === false ? "not_applicable" : "pending",
startedAt: now,
lastEventAt: now,
});
} catch (error) {
log.warn("Failed to create background task for subagent run", {
runId: registerParams.runId,
error,
});
}
params.ensureListener();
params.persist();
if (archiveAtMs) {

View File

@@ -5,6 +5,7 @@ import type { SubagentEndReason } from "../context-engine/types.js";
import { callGateway } from "../gateway/call.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { resetTaskRegistryForTests } from "../tasks/task-registry.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js";
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
@@ -555,6 +556,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
endedHookInFlightRunIds.clear();
clearAllPendingLifecycleErrors();
resetAnnounceQueuesForTests();
resetTaskRegistryForTests({ persist: opts?.persist });
stopSweeper();
restoreAttempted = false;
if (listenerStop) {

View File

@@ -3,7 +3,7 @@ import { loadConfig } from "../../config/config.js";
import { callGateway } from "../../gateway/call.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
import { ACP_SPAWN_MODES, ACP_SPAWN_STREAM_TARGETS, spawnAcpDirect } from "../acp-spawn.js";
import { spawnAcpDirect } from "../acp-spawn.js";
import { optionalStringEnum } from "../schema/typebox.js";
import type { SpawnedToolContext } from "../spawned-context.js";
import { registerSubagentRun } from "../subagent-registry.js";
@@ -18,6 +18,8 @@ import {
const SESSIONS_SPAWN_RUNTIMES = ["subagent", "acp"] as const;
const SESSIONS_SPAWN_SANDBOX_MODES = ["inherit", "require"] as const;
// Keep the schema local to avoid a circular import through acp-spawn/openclaw-tools.
const SESSIONS_SPAWN_ACP_STREAM_TARGETS = ["parent"] as const;
const UNSUPPORTED_SESSIONS_SPAWN_PARAM_KEYS = [
"target",
"transport",
@@ -90,7 +92,7 @@ const SessionsSpawnToolSchema = Type.Object({
mode: optionalStringEnum(SUBAGENT_SPAWN_MODES),
cleanup: optionalStringEnum(["delete", "keep"] as const),
sandbox: optionalStringEnum(SESSIONS_SPAWN_SANDBOX_MODES),
streamTo: optionalStringEnum(ACP_SPAWN_STREAM_TARGETS),
streamTo: optionalStringEnum(SESSIONS_SPAWN_ACP_STREAM_TARGETS),
// Inline attachments (snapshot-by-value).
// NOTE: Attachment contents are redacted from transcript persistence by sanitizeToolCallInputs.
@@ -205,7 +207,7 @@ export function createSessionsSpawnTool(
agentId: requestedAgentId,
resumeSessionId,
cwd,
mode: mode && ACP_SPAWN_MODES.includes(mode) ? mode : undefined,
mode: mode === "run" || mode === "session" ? mode : undefined,
thread,
sandbox,
streamTo,

View File

@@ -8,6 +8,7 @@ import {
validateRuntimePermissionProfileInput,
} from "../../../acp/control-plane/runtime-options.js";
import { resolveAcpSessionIdentifierLinesFromIdentity } from "../../../acp/runtime/session-identifiers.js";
import { findLatestTaskForSessionKey } from "../../../tasks/task-registry.js";
import type { CommandHandlerResult, HandleCommandsParams } from "../commands-types.js";
import {
ACP_CWD_USAGE,
@@ -122,6 +123,7 @@ export async function handleAcpStatusAction(
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "Could not read ACP session status.",
onSuccess: (status) => {
const linkedTask = findLatestTaskForSessionKey(status.sessionKey);
const sessionIdentifierLines = resolveAcpSessionIdentifierLinesFromIdentity({
backend: status.backend,
identity: status.identity,
@@ -135,6 +137,13 @@ export async function handleAcpStatusAction(
...sessionIdentifierLines,
`sessionMode: ${status.mode}`,
`state: ${status.state}`,
...(linkedTask
? [
`taskId: ${linkedTask.taskId}`,
`taskStatus: ${linkedTask.status}`,
`delivery: ${linkedTask.deliveryStatus}`,
]
: []),
`runtimeOptions: ${formatRuntimeOptionsText(status.runtimeOptions)}`,
`capabilities: ${formatAcpCapabilitiesText(status.capabilities.controls)}`,
`lastActivityAt: ${new Date(status.lastActivityAt).toISOString()}`,

View File

@@ -1,6 +1,7 @@
import { countPendingDescendantRuns } from "../../../agents/subagent-registry.js";
import { loadSessionStore, resolveStorePath } from "../../../config/sessions.js";
import { formatDurationCompact } from "../../../shared/subagents-format.js";
import { findTaskByRunId } from "../../../tasks/task-registry.js";
import type { CommandHandlerResult } from "../commands-types.js";
import { formatRunLabel } from "../subagents-utils.js";
import {
@@ -36,6 +37,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command
const outcome = run.outcome
? `${run.outcome.status}${run.outcome.error ? ` (${run.outcome.error})` : ""}`
: "n/a";
const linkedTask = findTaskByRunId(run.runId);
const lines = [
" Subagent info",
@@ -43,6 +45,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command
`Label: ${formatRunLabel(run)}`,
`Task: ${run.task}`,
`Run: ${run.runId}`,
linkedTask ? `TaskId: ${linkedTask.taskId}` : undefined,
`Session: ${run.childSessionKey}`,
`SessionId: ${sessionEntry?.sessionId ?? "n/a"}`,
`Transcript: ${sessionEntry?.sessionFile ?? "n/a"}`,
@@ -54,6 +57,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command
run.archiveAtMs ? `Archive: ${formatTimestampWithAge(run.archiveAtMs)}` : undefined,
run.cleanupHandled ? "Cleanup handled: yes" : undefined,
`Outcome: ${outcome}`,
linkedTask ? `Delivery: ${linkedTask.deliveryStatus}` : undefined,
].filter(Boolean);
return stopWithText(lines.join("\n"));

View File

@@ -192,4 +192,33 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(onReplyStart).not.toHaveBeenCalled();
});
it("keeps parent-owned background ACP child delivery silent while preserving accumulated output", async () => {
const dispatcher = createDispatcher();
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "telegram",
Surface: "telegram",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher,
inboundAudio: false,
suppressUserDelivery: true,
shouldRouteToOriginating: true,
originatingChannel: "telegram",
originatingTo: "telegram:123",
});
const blockDelivered = await coordinator.deliver("block", { text: "working on it" });
const finalDelivered = await coordinator.deliver("final", { text: "done" });
await coordinator.settleVisibleText();
expect(blockDelivered).toBe(false);
expect(finalDelivered).toBe(false);
expect(dispatcher.sendBlockReply).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(coordinator.getAccumulatedBlockText()).toBe("working on it");
expect(coordinator.hasDeliveredVisibleText()).toBe(false);
});
});

View File

@@ -79,6 +79,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
inboundAudio: boolean;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
suppressUserDelivery?: boolean;
shouldRouteToOriginating: boolean;
originatingChannel?: string;
originatingTo?: string;
@@ -184,6 +185,10 @@ export function createAcpDispatchDeliveryCoordinator(params: {
await startReplyLifecycleOnce();
}
if (params.suppressUserDelivery) {
return false;
}
const ttsPayload = meta?.skipTts
? payload
: await maybeApplyTtsToPayload({

View File

@@ -315,6 +315,7 @@ export async function tryDispatchAcpReply(params: {
inboundAudio: boolean;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
suppressUserDelivery?: boolean;
shouldRouteToOriginating: boolean;
originatingChannel?: string;
originatingTo?: string;
@@ -347,6 +348,7 @@ export async function tryDispatchAcpReply(params: {
inboundAudio: params.inboundAudio,
sessionTtsAuto: params.sessionTtsAuto,
ttsChannel: params.ttsChannel,
suppressUserDelivery: params.suppressUserDelivery,
shouldRouteToOriginating: params.shouldRouteToOriginating,
originatingChannel: params.originatingChannel,
originatingTo: params.originatingTo,
@@ -357,6 +359,7 @@ export async function tryDispatchAcpReply(params: {
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
);
const shouldEmitResolvedIdentityNotice =
!params.suppressUserDelivery &&
identityPendingBeforeTurn &&
(Boolean(params.ctx.MessageThreadId != null && String(params.ctx.MessageThreadId).trim()) ||
hasBoundConversationForSession({

View File

@@ -1,4 +1,5 @@
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { isParentOwnedBackgroundAcpSession } from "../../acp/session-interaction-mode.js";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import {
resolveConversationBindingRecord,
@@ -39,11 +40,12 @@ import {
import { getGlobalHookRunner, getGlobalPluginRegistry } from "../../plugins/hook-runner-global.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { normalizeTtsAutoMode, resolveConfiguredTtsMode } from "../../tts/tts-config.js";
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js";
import { normalizeMessageChannel } from "../../utils/message-channel.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { BlockReplyContext, GetReplyOptions, ReplyPayload } from "../types.js";
import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
import { resolveReplyRoutingDecision } from "./routing-policy.js";
import { resolveRunTypingPolicy } from "./typing-policy.js";
let routeReplyRuntimePromise: Promise<typeof import("./route-reply.runtime.js")> | null = null;
@@ -247,25 +249,19 @@ export async function dispatchReplyFromConfig(params: {
// flow when the provider handles its own messages.
//
// Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts`
const originatingChannel = normalizeMessageChannel(ctx.OriginatingChannel);
const originatingTo = ctx.OriginatingTo;
const providerChannel = normalizeMessageChannel(ctx.Provider);
const surfaceChannel = normalizeMessageChannel(ctx.Surface);
// Prefer provider channel because surface may carry origin metadata in relayed flows.
const currentSurface = providerChannel ?? surfaceChannel;
const isInternalWebchatTurn =
currentSurface === INTERNAL_MESSAGE_CHANNEL &&
(surfaceChannel === INTERNAL_MESSAGE_CHANNEL || !surfaceChannel) &&
ctx.ExplicitDeliverRoute !== true;
const suppressAcpChildUserDelivery = isParentOwnedBackgroundAcpSession(sessionStoreEntry.entry);
const routeReplyRuntime = await loadRouteReplyRuntime();
const shouldRouteToOriginating = Boolean(
!isInternalWebchatTurn &&
routeReplyRuntime.isRoutableChannel(originatingChannel) &&
originatingTo &&
originatingChannel !== currentSurface,
);
const shouldSuppressTyping =
shouldRouteToOriginating || originatingChannel === INTERNAL_MESSAGE_CHANNEL;
const { originatingChannel, currentSurface, shouldRouteToOriginating, shouldSuppressTyping } =
resolveReplyRoutingDecision({
provider: ctx.Provider,
surface: ctx.Surface,
explicitDeliverRoute: ctx.ExplicitDeliverRoute,
originatingChannel: ctx.OriginatingChannel,
originatingTo: ctx.OriginatingTo,
suppressDirectUserDelivery: suppressAcpChildUserDelivery,
isRoutableChannel: routeReplyRuntime.isRoutableChannel,
});
const originatingTo = ctx.OriginatingTo;
const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface;
/**
@@ -601,6 +597,7 @@ export async function dispatchReplyFromConfig(params: {
inboundAudio,
sessionTtsAuto,
ttsChannel,
suppressUserDelivery: suppressAcpChildUserDelivery,
shouldRouteToOriginating,
originatingChannel,
originatingTo,

View File

@@ -0,0 +1,62 @@
import { describe, expect, it } from "vitest";
import { resolveReplyRoutingDecision } from "./routing-policy.js";
function isRoutableChannel(channel: string | undefined) {
return Boolean(
channel &&
["telegram", "slack", "discord", "signal", "imessage", "whatsapp", "feishu"].includes(channel),
);
}
describe("resolveReplyRoutingDecision", () => {
it("routes replies to the originating channel when the current provider differs", () => {
expect(
resolveReplyRoutingDecision({
provider: "slack",
surface: "slack",
originatingChannel: "telegram",
originatingTo: "telegram:123",
isRoutableChannel,
}),
).toMatchObject({
originatingChannel: "telegram",
currentSurface: "slack",
shouldRouteToOriginating: true,
shouldSuppressTyping: true,
});
});
it("does not route external replies from internal webchat without explicit delivery", () => {
expect(
resolveReplyRoutingDecision({
provider: "webchat",
surface: "webchat",
explicitDeliverRoute: false,
originatingChannel: "telegram",
originatingTo: "telegram:123",
isRoutableChannel,
}),
).toMatchObject({
currentSurface: "webchat",
isInternalWebchatTurn: true,
shouldRouteToOriginating: false,
});
});
it("suppresses direct user delivery for parent-owned background ACP children", () => {
expect(
resolveReplyRoutingDecision({
provider: "discord",
surface: "discord",
originatingChannel: "telegram",
originatingTo: "telegram:123",
suppressDirectUserDelivery: true,
isRoutableChannel,
}),
).toMatchObject({
currentSurface: "discord",
shouldRouteToOriginating: false,
shouldSuppressTyping: true,
});
});
});

View File

@@ -0,0 +1,37 @@
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js";
export function resolveReplyRoutingDecision(params: {
provider?: string;
surface?: string;
explicitDeliverRoute?: boolean;
originatingChannel?: string;
originatingTo?: string;
suppressDirectUserDelivery?: boolean;
isRoutableChannel: (channel: string | undefined) => boolean;
}) {
const originatingChannel = normalizeMessageChannel(params.originatingChannel);
const providerChannel = normalizeMessageChannel(params.provider);
const surfaceChannel = normalizeMessageChannel(params.surface);
const currentSurface = providerChannel ?? surfaceChannel;
const isInternalWebchatTurn =
currentSurface === INTERNAL_MESSAGE_CHANNEL &&
(surfaceChannel === INTERNAL_MESSAGE_CHANNEL || !surfaceChannel) &&
params.explicitDeliverRoute !== true;
const shouldRouteToOriginating = Boolean(
!params.suppressDirectUserDelivery &&
!isInternalWebchatTurn &&
params.isRoutableChannel(originatingChannel) &&
params.originatingTo &&
originatingChannel !== currentSurface,
);
return {
originatingChannel,
currentSurface,
isInternalWebchatTurn,
shouldRouteToOriginating,
shouldSuppressTyping:
params.suppressDirectUserDelivery === true ||
shouldRouteToOriginating ||
originatingChannel === INTERNAL_MESSAGE_CHANNEL,
};
}

View File

@@ -39,6 +39,8 @@ vi.mock("./register.status-health-sessions.js", () => ({
program.command("status");
program.command("health");
program.command("sessions");
const tasks = program.command("tasks");
tasks.command("show");
},
}));
@@ -75,6 +77,7 @@ describe("command-registry", () => {
expect(names).toContain("agents");
expect(names).toContain("backup");
expect(names).toContain("sessions");
expect(names).toContain("tasks");
expect(names).not.toContain("agent");
expect(names).not.toContain("status");
expect(names).not.toContain("doctor");
@@ -139,6 +142,7 @@ describe("command-registry", () => {
expect(names).toContain("status");
expect(names).toContain("health");
expect(names).toContain("sessions");
expect(names).toContain("tasks");
});
it("replaces placeholders when loading a grouped entry by secondary command name", async () => {

View File

@@ -197,6 +197,11 @@ const coreEntries: CoreCliEntry[] = [
description: "List stored conversation sessions",
hasSubcommands: true,
},
{
name: "tasks",
description: "Inspect durable background task state",
hasSubcommands: true,
},
],
register: async ({ program }) => {
const mod = await import("./register.status-health-sessions.js");

View File

@@ -81,6 +81,11 @@ export const CORE_CLI_COMMAND_DESCRIPTORS = [
description: "List stored conversation sessions",
hasSubcommands: true,
},
{
name: "tasks",
description: "Inspect durable background task state",
hasSubcommands: true,
},
] as const satisfies ReadonlyArray<CoreCliCommandDescriptor>;
export function getCoreCliCommandDescriptors(): ReadonlyArray<CoreCliCommandDescriptor> {

View File

@@ -6,6 +6,10 @@ const statusCommand = vi.fn();
const healthCommand = vi.fn();
const sessionsCommand = vi.fn();
const sessionsCleanupCommand = vi.fn();
const tasksListCommand = vi.fn();
const tasksShowCommand = vi.fn();
const tasksNotifyCommand = vi.fn();
const tasksCancelCommand = vi.fn();
const setVerbose = vi.fn();
const { defaultRuntime: runtime, resetRuntimeCapture } = createCliRuntimeCapture();
@@ -26,6 +30,13 @@ vi.mock("../../commands/sessions-cleanup.js", () => ({
sessionsCleanupCommand,
}));
vi.mock("../../commands/tasks.js", () => ({
tasksListCommand,
tasksShowCommand,
tasksNotifyCommand,
tasksCancelCommand,
}));
vi.mock("../../globals.js", () => ({
setVerbose,
}));
@@ -55,6 +66,10 @@ describe("registerStatusHealthSessionsCommands", () => {
healthCommand.mockResolvedValue(undefined);
sessionsCommand.mockResolvedValue(undefined);
sessionsCleanupCommand.mockResolvedValue(undefined);
tasksListCommand.mockResolvedValue(undefined);
tasksShowCommand.mockResolvedValue(undefined);
tasksNotifyCommand.mockResolvedValue(undefined);
tasksCancelCommand.mockResolvedValue(undefined);
});
it("runs status command with timeout and debug-derived verbose", async () => {
@@ -201,4 +216,52 @@ describe("registerStatusHealthSessionsCommands", () => {
runtime,
);
});
it("runs tasks list from the parent command", async () => {
await runCli(["tasks", "--json", "--runtime", "acp", "--status", "running"]);
expect(tasksListCommand).toHaveBeenCalledWith(
expect.objectContaining({
json: true,
runtime: "acp",
status: "running",
}),
runtime,
);
});
it("runs tasks show subcommand with lookup forwarding", async () => {
await runCli(["tasks", "show", "run-123", "--json"]);
expect(tasksShowCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "run-123",
json: true,
}),
runtime,
);
});
it("runs tasks notify subcommand with lookup and policy forwarding", async () => {
await runCli(["tasks", "notify", "run-123", "state_changes"]);
expect(tasksNotifyCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "run-123",
notify: "state_changes",
}),
runtime,
);
});
it("runs tasks cancel subcommand with lookup forwarding", async () => {
await runCli(["tasks", "cancel", "run-123"]);
expect(tasksCancelCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "run-123",
}),
runtime,
);
});
});

View File

@@ -3,6 +3,12 @@ import { healthCommand } from "../../commands/health.js";
import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js";
import { sessionsCommand } from "../../commands/sessions.js";
import { statusCommand } from "../../commands/status.js";
import {
tasksCancelCommand,
tasksListCommand,
tasksNotifyCommand,
tasksShowCommand,
} from "../../commands/tasks.js";
import { setVerbose } from "../../globals.js";
import { defaultRuntime } from "../../runtime.js";
import { formatDocsLink } from "../../terminal/links.js";
@@ -213,4 +219,106 @@ export function registerStatusHealthSessionsCommands(program: Command) {
);
});
});
const tasksCmd = program
.command("tasks")
.description("Inspect durable background task state")
.option("--json", "Output as JSON", false)
.option("--runtime <name>", "Filter by runtime (subagent, acp, cli)")
.option(
"--status <name>",
"Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)",
)
.action(async (opts) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await tasksListCommand(
{
json: Boolean(opts.json),
runtime: opts.runtime as string | undefined,
status: opts.status as string | undefined,
},
defaultRuntime,
);
});
});
tasksCmd.enablePositionalOptions();
tasksCmd
.command("list")
.description("List tracked background tasks")
.option("--json", "Output as JSON", false)
.option("--runtime <name>", "Filter by runtime (subagent, acp, cli)")
.option(
"--status <name>",
"Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)",
)
.action(async (opts, command) => {
const parentOpts = command.parent?.opts() as
| {
json?: boolean;
runtime?: string;
status?: string;
}
| undefined;
await runCommandWithRuntime(defaultRuntime, async () => {
await tasksListCommand(
{
json: Boolean(opts.json || parentOpts?.json),
runtime: (opts.runtime as string | undefined) ?? parentOpts?.runtime,
status: (opts.status as string | undefined) ?? parentOpts?.status,
},
defaultRuntime,
);
});
});
tasksCmd
.command("show")
.description("Show one background task by task id, run id, or session key")
.argument("<lookup>", "Task id, run id, or session key")
.option("--json", "Output as JSON", false)
.action(async (lookup, opts, command) => {
const parentOpts = command.parent?.opts() as { json?: boolean } | undefined;
await runCommandWithRuntime(defaultRuntime, async () => {
await tasksShowCommand(
{
lookup,
json: Boolean(opts.json || parentOpts?.json),
},
defaultRuntime,
);
});
});
tasksCmd
.command("notify")
.description("Set task notify policy")
.argument("<lookup>", "Task id, run id, or session key")
.argument("<notify>", "Notify policy (done_only, state_changes, silent)")
.action(async (lookup, notify) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await tasksNotifyCommand(
{
lookup,
notify: notify as "done_only" | "state_changes" | "silent",
},
defaultRuntime,
);
});
});
tasksCmd
.command("cancel")
.description("Cancel a running background task")
.argument("<lookup>", "Task id, run id, or session key")
.action(async (lookup) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await tasksCancelCommand(
{
lookup,
},
defaultRuntime,
);
});
});
}

View File

@@ -8,6 +8,8 @@ const ensurePathMock = vi.hoisted(() => vi.fn());
const assertRuntimeMock = vi.hoisted(() => vi.fn());
const closeActiveMemorySearchManagersMock = vi.hoisted(() => vi.fn(async () => {}));
const hasMemoryRuntimeMock = vi.hoisted(() => vi.fn(() => false));
const ensureTaskRegistryReadyMock = vi.hoisted(() => vi.fn());
const startTaskRegistryMaintenanceMock = vi.hoisted(() => vi.fn());
const outputRootHelpMock = vi.hoisted(() => vi.fn());
const buildProgramMock = vi.hoisted(() => vi.fn());
const maybeRunCliInContainerMock = vi.hoisted(() =>
@@ -49,6 +51,14 @@ vi.mock("../plugins/memory-state.js", () => ({
hasMemoryRuntime: hasMemoryRuntimeMock,
}));
vi.mock("../tasks/task-registry.js", () => ({
ensureTaskRegistryReady: ensureTaskRegistryReadyMock,
}));
vi.mock("../tasks/task-registry.maintenance.js", () => ({
startTaskRegistryMaintenance: startTaskRegistryMaintenanceMock,
}));
vi.mock("./program/root-help.js", () => ({
outputRootHelp: outputRootHelpMock,
}));
@@ -76,6 +86,8 @@ describe("runCli exit behavior", () => {
expect(maybeRunCliInContainerMock).toHaveBeenCalledWith(["node", "openclaw", "status"]);
expect(tryRouteCliMock).toHaveBeenCalledWith(["node", "openclaw", "status"]);
expect(closeActiveMemorySearchManagersMock).not.toHaveBeenCalled();
expect(ensureTaskRegistryReadyMock).not.toHaveBeenCalled();
expect(startTaskRegistryMaintenanceMock).not.toHaveBeenCalled();
expect(exitSpy).not.toHaveBeenCalled();
exitSpy.mockRestore();
});

139
src/commands/tasks.test.ts Normal file
View File

@@ -0,0 +1,139 @@
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { createCliRuntimeCapture } from "../cli/test-runtime-capture.js";
const reconcileInspectableTasksMock = vi.fn();
const reconcileTaskLookupTokenMock = vi.fn();
const updateTaskNotifyPolicyByIdMock = vi.fn();
const cancelTaskByIdMock = vi.fn();
const getTaskByIdMock = vi.fn();
const loadConfigMock = vi.fn(() => ({ loaded: true }));
vi.mock("../tasks/task-registry.reconcile.js", () => ({
reconcileInspectableTasks: (...args: unknown[]) => reconcileInspectableTasksMock(...args),
reconcileTaskLookupToken: (...args: unknown[]) => reconcileTaskLookupTokenMock(...args),
}));
vi.mock("../tasks/task-registry.js", () => ({
updateTaskNotifyPolicyById: (...args: unknown[]) => updateTaskNotifyPolicyByIdMock(...args),
cancelTaskById: (...args: unknown[]) => cancelTaskByIdMock(...args),
getTaskById: (...args: unknown[]) => getTaskByIdMock(...args),
}));
vi.mock("../config/config.js", () => ({
loadConfig: () => loadConfigMock(),
}));
const {
defaultRuntime: runtime,
runtimeLogs,
runtimeErrors,
resetRuntimeCapture,
} = createCliRuntimeCapture();
let tasksListCommand: typeof import("./tasks.js").tasksListCommand;
let tasksShowCommand: typeof import("./tasks.js").tasksShowCommand;
let tasksNotifyCommand: typeof import("./tasks.js").tasksNotifyCommand;
let tasksCancelCommand: typeof import("./tasks.js").tasksCancelCommand;
const taskFixture = {
taskId: "task-12345678",
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
runId: "run-12345678",
task: "Create a file",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "state_changes",
createdAt: Date.parse("2026-03-29T10:00:00.000Z"),
lastEventAt: Date.parse("2026-03-29T10:00:10.000Z"),
progressSummary: "No output for 60s. It may be waiting for input.",
recentEvents: [
{
at: Date.parse("2026-03-29T10:00:10.000Z"),
kind: "progress",
summary: "No output for 60s. It may be waiting for input.",
},
],
} as const;
beforeAll(async () => {
({ tasksListCommand, tasksShowCommand, tasksNotifyCommand, tasksCancelCommand } =
await import("./tasks.js"));
});
describe("tasks commands", () => {
beforeEach(() => {
vi.clearAllMocks();
resetRuntimeCapture();
reconcileInspectableTasksMock.mockReturnValue([]);
reconcileTaskLookupTokenMock.mockReturnValue(undefined);
updateTaskNotifyPolicyByIdMock.mockReturnValue(undefined);
cancelTaskByIdMock.mockResolvedValue({ found: false, cancelled: false, reason: "missing" });
getTaskByIdMock.mockReturnValue(undefined);
});
it("lists task rows with progress summary fallback", async () => {
reconcileInspectableTasksMock.mockReturnValue([taskFixture]);
await tasksListCommand({ runtime: "acp", status: "running" }, runtime);
expect(runtimeLogs[0]).toContain("Background tasks: 1");
expect(runtimeLogs.join("\n")).toContain("No output for 60s. It may be waiting for input.");
});
it("shows detailed task fields including notify and recent events", async () => {
reconcileTaskLookupTokenMock.mockReturnValue(taskFixture);
await tasksShowCommand({ lookup: "run-12345678" }, runtime);
expect(runtimeLogs.join("\n")).toContain("notify: state_changes");
expect(runtimeLogs.join("\n")).toContain(
"progressSummary: No output for 60s. It may be waiting for input.",
);
expect(runtimeLogs.join("\n")).toContain("recentEvent[0]: 2026-03-29T10:00:10.000Z progress");
});
it("updates notify policy for an existing task", async () => {
reconcileTaskLookupTokenMock.mockReturnValue(taskFixture);
updateTaskNotifyPolicyByIdMock.mockReturnValue({
...taskFixture,
notifyPolicy: "silent",
});
await tasksNotifyCommand({ lookup: "run-12345678", notify: "silent" }, runtime);
expect(updateTaskNotifyPolicyByIdMock).toHaveBeenCalledWith({
taskId: "task-12345678",
notifyPolicy: "silent",
});
expect(runtimeLogs[0]).toContain("Updated task-12345678 notify policy to silent.");
});
it("cancels a running task and reports the updated runtime", async () => {
reconcileTaskLookupTokenMock.mockReturnValue(taskFixture);
cancelTaskByIdMock.mockResolvedValue({
found: true,
cancelled: true,
task: {
...taskFixture,
status: "cancelled",
},
});
getTaskByIdMock.mockReturnValue({
...taskFixture,
status: "cancelled",
});
await tasksCancelCommand({ lookup: "run-12345678" }, runtime);
expect(loadConfigMock).toHaveBeenCalled();
expect(cancelTaskByIdMock).toHaveBeenCalledWith({
cfg: { loaded: true },
taskId: "task-12345678",
});
expect(runtimeLogs[0]).toContain("Cancelled task-12345678 (acp) run run-12345678.");
expect(runtimeErrors).toEqual([]);
});
});

237
src/commands/tasks.ts Normal file
View File

@@ -0,0 +1,237 @@
import { loadConfig } from "../config/config.js";
import { info } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import { cancelTaskById, getTaskById, updateTaskNotifyPolicyById } from "../tasks/task-registry.js";
import {
reconcileInspectableTasks,
reconcileTaskLookupToken,
} from "../tasks/task-registry.reconcile.js";
import type { TaskNotifyPolicy, TaskRecord } from "../tasks/task-registry.types.js";
import { isRich, theme } from "../terminal/theme.js";
const RUNTIME_PAD = 8;
const STATUS_PAD = 10;
const DELIVERY_PAD = 14;
const ID_PAD = 10;
const RUN_PAD = 10;
function truncate(value: string, maxChars: number) {
if (value.length <= maxChars) {
return value;
}
if (maxChars <= 1) {
return value.slice(0, maxChars);
}
return `${value.slice(0, maxChars - 1)}`;
}
function shortToken(value: string | undefined, maxChars = ID_PAD): string {
const trimmed = value?.trim();
if (!trimmed) {
return "n/a";
}
return truncate(trimmed, maxChars);
}
function formatTaskStatusCell(status: string, rich: boolean) {
const padded = status.padEnd(STATUS_PAD);
if (!rich) {
return padded;
}
if (status === "done") {
return theme.success(padded);
}
if (status === "failed" || status === "lost" || status === "timed_out") {
return theme.error(padded);
}
if (status === "running") {
return theme.accentBright(padded);
}
return theme.muted(padded);
}
function formatTaskRows(tasks: TaskRecord[], rich: boolean) {
const header = [
"Task".padEnd(ID_PAD),
"Runtime".padEnd(RUNTIME_PAD),
"Status".padEnd(STATUS_PAD),
"Delivery".padEnd(DELIVERY_PAD),
"Run".padEnd(RUN_PAD),
"Child Session",
"Summary",
].join(" ");
const lines = [rich ? theme.heading(header) : header];
for (const task of tasks) {
const summary = truncate(
task.terminalSummary?.trim() ||
task.progressSummary?.trim() ||
task.label?.trim() ||
task.task.trim(),
80,
);
const line = [
shortToken(task.taskId).padEnd(ID_PAD),
task.runtime.padEnd(RUNTIME_PAD),
formatTaskStatusCell(task.status, rich),
task.deliveryStatus.padEnd(DELIVERY_PAD),
shortToken(task.runId, RUN_PAD).padEnd(RUN_PAD),
truncate(task.childSessionKey?.trim() || "n/a", 36).padEnd(36),
summary,
].join(" ");
lines.push(line.trimEnd());
}
return lines;
}
export async function tasksListCommand(
opts: { json?: boolean; runtime?: string; status?: string },
runtime: RuntimeEnv,
) {
const runtimeFilter = opts.runtime?.trim();
const statusFilter = opts.status?.trim();
const tasks = reconcileInspectableTasks().filter((task) => {
if (runtimeFilter && task.runtime !== runtimeFilter) {
return false;
}
if (statusFilter && task.status !== statusFilter) {
return false;
}
return true;
});
if (opts.json) {
runtime.log(
JSON.stringify(
{
count: tasks.length,
runtime: runtimeFilter ?? null,
status: statusFilter ?? null,
tasks,
},
null,
2,
),
);
return;
}
runtime.log(info(`Background tasks: ${tasks.length}`));
if (runtimeFilter) {
runtime.log(info(`Runtime filter: ${runtimeFilter}`));
}
if (statusFilter) {
runtime.log(info(`Status filter: ${statusFilter}`));
}
if (tasks.length === 0) {
runtime.log("No background tasks found.");
return;
}
const rich = isRich();
for (const line of formatTaskRows(tasks, rich)) {
runtime.log(line);
}
}
export async function tasksShowCommand(
opts: { json?: boolean; lookup: string },
runtime: RuntimeEnv,
) {
const task = reconcileTaskLookupToken(opts.lookup);
if (!task) {
runtime.error(`Task not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
if (opts.json) {
runtime.log(JSON.stringify(task, null, 2));
return;
}
const lines = [
"Background task:",
`taskId: ${task.taskId}`,
`runtime: ${task.runtime}`,
`status: ${task.status}`,
`delivery: ${task.deliveryStatus}`,
`notify: ${task.notifyPolicy}`,
`source: ${task.source}`,
`requesterSessionKey: ${task.requesterSessionKey}`,
`childSessionKey: ${task.childSessionKey ?? "n/a"}`,
`runId: ${task.runId ?? "n/a"}`,
`bindingTargetKind: ${task.bindingTargetKind ?? "n/a"}`,
`label: ${task.label ?? "n/a"}`,
`task: ${task.task}`,
`createdAt: ${new Date(task.createdAt).toISOString()}`,
`startedAt: ${task.startedAt ? new Date(task.startedAt).toISOString() : "n/a"}`,
`endedAt: ${task.endedAt ? new Date(task.endedAt).toISOString() : "n/a"}`,
`lastEventAt: ${task.lastEventAt ? new Date(task.lastEventAt).toISOString() : "n/a"}`,
...(task.error ? [`error: ${task.error}`] : []),
...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []),
...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []),
...(task.recentEvents?.length
? task.recentEvents.map(
(event, index) =>
`recentEvent[${index}]: ${new Date(event.at).toISOString()} ${event.kind}${
event.summary ? ` ${event.summary}` : ""
}`,
)
: []),
...(task.streamLogPath ? [`streamLogPath: ${task.streamLogPath}`] : []),
...(task.transcriptPath ? [`transcriptPath: ${task.transcriptPath}`] : []),
...(task.agentSessionId ? [`agentSessionId: ${task.agentSessionId}`] : []),
...(task.backendSessionId ? [`backendSessionId: ${task.backendSessionId}`] : []),
];
for (const line of lines) {
runtime.log(line);
}
}
export async function tasksNotifyCommand(
opts: { lookup: string; notify: TaskNotifyPolicy },
runtime: RuntimeEnv,
) {
const task = reconcileTaskLookupToken(opts.lookup);
if (!task) {
runtime.error(`Task not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
const updated = updateTaskNotifyPolicyById({
taskId: task.taskId,
notifyPolicy: opts.notify,
});
if (!updated) {
runtime.error(`Task not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
runtime.log(`Updated ${updated.taskId} notify policy to ${updated.notifyPolicy}.`);
}
export async function tasksCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) {
const task = reconcileTaskLookupToken(opts.lookup);
if (!task) {
runtime.error(`Task not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
const result = await cancelTaskById({
cfg: loadConfig(),
taskId: task.taskId,
});
if (!result.found) {
runtime.error(result.reason ?? `Task not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
if (!result.cancelled) {
runtime.error(result.reason ?? `Could not cancel task: ${opts.lookup}`);
runtime.exit(1);
return;
}
const updated = getTaskById(task.taskId);
runtime.log(
`Cancelled ${updated?.taskId ?? task.taskId} (${updated?.runtime ?? task.runtime})${updated?.runId ? ` run ${updated.runId}` : ""}.`,
);
}

View File

@@ -1,9 +1,13 @@
import { describe, expect, it, vi } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import { BARE_SESSION_RESET_PROMPT } from "../../auto-reply/reply/session-reset-prompt.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { withTempDir } from "../../test-helpers/temp-dir.js";
import { agentHandlers } from "./agent.js";
import { expectSubagentFollowupReactivation } from "./subagent-followup.test-helpers.js";
import type { GatewayRequestContext } from "./types.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const mocks = vi.hoisted(() => ({
loadSessionEntry: vi.fn(),
loadGatewaySessionRow: vi.fn(),
@@ -302,6 +306,15 @@ async function invokeAgentIdentityGet(
}
describe("gateway agent handler", () => {
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests();
});
it("preserves ACP metadata from the current stored session entry", async () => {
const existingAcpMeta = {
backend: "acpx",
@@ -817,6 +830,30 @@ describe("gateway agent handler", () => {
expect(callArgs.runContext?.messageChannel).toBe("webchat");
});
it("tracks async gateway agent runs in the shared task registry", async () => {
await withTempDir({ prefix: "openclaw-gateway-agent-task-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
primeMainAgentRun();
await invokeAgent(
{
message: "background cli task",
sessionKey: "agent:main:main",
idempotencyKey: "task-registry-agent-run",
},
{ reqId: "task-registry-agent-run" },
);
expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({
source: "background_cli",
runtime: "cli",
childSessionKey: "agent:main:main",
status: "running",
});
});
});
it("handles missing cliSessionIds gracefully", async () => {
mockMainSessionEntry({});

View File

@@ -26,7 +26,11 @@ import { classifySessionKeyShape, normalizeAgentId } from "../../routing/session
import { defaultRuntime } from "../../runtime.js";
import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js";
import { createTaskRecord } from "../../tasks/task-registry.js";
import {
normalizeDeliveryContext,
normalizeSessionDeliveryFields,
} from "../../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isDeliverableMessageChannel,
@@ -184,6 +188,30 @@ function dispatchAgentRunFromGateway(params: {
respond: GatewayRequestHandlerOptions["respond"];
context: GatewayRequestHandlerOptions["context"];
}) {
if (params.ingressOpts.sessionKey?.trim()) {
try {
createTaskRecord({
source: "background_cli",
runtime: "cli",
requesterSessionKey: params.ingressOpts.sessionKey,
requesterOrigin: normalizeDeliveryContext({
channel: params.ingressOpts.channel,
to: params.ingressOpts.to,
accountId: params.ingressOpts.accountId,
threadId: params.ingressOpts.threadId,
}),
childSessionKey: params.ingressOpts.sessionKey,
runId: params.runId,
bindingTargetKind: "session",
task: params.ingressOpts.message,
status: "running",
deliveryStatus: "not_applicable",
startedAt: Date.now(),
});
} catch {
// Best-effort only: background task tracking must not block agent runs.
}
}
void agentCommandFromIngress(params.ingressOpts, defaultRuntime, params.context.deps)
.then((result) => {
const payload = {

View File

@@ -73,6 +73,7 @@ import {
} from "../secrets/runtime.js";
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js";
import { runSetupWizard } from "../wizard/setup.js";
import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
@@ -880,6 +881,7 @@ export async function startGatewayServer(
});
if (!minimalTestGateway) {
startTaskRegistryMaintenance();
({ tickInterval, healthInterval, dedupeCleanup, mediaCleanup } =
startGatewayMaintenanceTimers({
broadcast,

View File

@@ -0,0 +1 @@
export { sendMessage } from "../infra/outbound/message.js";

View File

@@ -0,0 +1,173 @@
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import {
deleteTaskRecordById,
ensureTaskRegistryReady,
getTaskById,
listTaskRecords,
maybeDeliverTaskTerminalUpdate,
resolveTaskForLookupToken,
updateTaskRecordById,
} from "./task-registry.js";
import type { TaskRecord } from "./task-registry.types.js";
const TASK_RECONCILE_GRACE_MS = 5 * 60_000;
const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
const TASK_SWEEP_INTERVAL_MS = 60_000;
let sweeper: NodeJS.Timeout | null = null;
function findSessionEntryByKey(store: Record<string, unknown>, sessionKey: string): unknown {
const direct = store[sessionKey];
if (direct) {
return direct;
}
const normalized = sessionKey.toLowerCase();
for (const [key, entry] of Object.entries(store)) {
if (key.toLowerCase() === normalized) {
return entry;
}
}
return undefined;
}
function isActiveTask(task: TaskRecord): boolean {
return task.status === "accepted" || task.status === "running";
}
function isTerminalTask(task: TaskRecord): boolean {
return !isActiveTask(task);
}
function hasLostGraceExpired(task: TaskRecord, now: number): boolean {
const referenceAt = task.lastEventAt ?? task.startedAt ?? task.createdAt;
return now - referenceAt >= TASK_RECONCILE_GRACE_MS;
}
function hasBackingSession(task: TaskRecord): boolean {
const childSessionKey = task.childSessionKey?.trim();
if (!childSessionKey) {
return true;
}
if (task.runtime === "acp") {
const acpEntry = readAcpSessionEntry({
sessionKey: childSessionKey,
});
if (!acpEntry || acpEntry.storeReadFailed) {
return true;
}
return Boolean(acpEntry.entry);
}
if (task.runtime === "subagent" || task.runtime === "cli") {
const agentId = parseAgentSessionKey(childSessionKey)?.agentId;
const storePath = resolveStorePath(undefined, { agentId });
const store = loadSessionStore(storePath);
return Boolean(findSessionEntryByKey(store, childSessionKey));
}
return true;
}
function shouldMarkLost(task: TaskRecord, now: number): boolean {
if (!isActiveTask(task)) {
return false;
}
if (!hasLostGraceExpired(task, now)) {
return false;
}
return !hasBackingSession(task);
}
function shouldPruneTerminalTask(task: TaskRecord, now: number): boolean {
if (!isTerminalTask(task)) {
return false;
}
const terminalAt = task.endedAt ?? task.lastEventAt ?? task.createdAt;
return now - terminalAt >= TASK_RETENTION_MS;
}
function markTaskLost(task: TaskRecord, now: number): TaskRecord {
const updated =
updateTaskRecordById(task.taskId, {
status: "lost",
endedAt: task.endedAt ?? now,
lastEventAt: now,
error: task.error ?? "backing session missing",
}) ?? task;
void maybeDeliverTaskTerminalUpdate(updated.taskId);
return updated;
}
function projectTaskLost(task: TaskRecord, now: number): TaskRecord {
return {
...task,
status: "lost",
endedAt: task.endedAt ?? now,
lastEventAt: now,
error: task.error ?? "backing session missing",
};
}
export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): TaskRecord {
const now = Date.now();
if (!shouldMarkLost(task, now)) {
return task;
}
return projectTaskLost(task, now);
}
export function reconcileInspectableTasks(): TaskRecord[] {
ensureTaskRegistryReady();
return listTaskRecords().map((task) => reconcileTaskRecordForOperatorInspection(task));
}
export function reconcileTaskLookupToken(token: string): TaskRecord | undefined {
ensureTaskRegistryReady();
const task = resolveTaskForLookupToken(token);
return task ? reconcileTaskRecordForOperatorInspection(task) : undefined;
}
export function sweepTaskRegistry(): { reconciled: number; pruned: number } {
ensureTaskRegistryReady();
const now = Date.now();
let reconciled = 0;
let pruned = 0;
for (const task of listTaskRecords()) {
if (shouldMarkLost(task, now)) {
const next = markTaskLost(task, now);
if (next.status === "lost") {
reconciled += 1;
}
continue;
}
if (shouldPruneTerminalTask(task, now) && deleteTaskRecordById(task.taskId)) {
pruned += 1;
}
}
return { reconciled, pruned };
}
export function startTaskRegistryMaintenance() {
ensureTaskRegistryReady();
void sweepTaskRegistry();
if (sweeper) {
return;
}
sweeper = setInterval(() => {
void sweepTaskRegistry();
}, TASK_SWEEP_INTERVAL_MS);
sweeper.unref?.();
}
export function stopTaskRegistryMaintenanceForTests() {
if (!sweeper) {
return;
}
clearInterval(sweeper);
sweeper = null;
}
export function getReconciledTaskById(taskId: string): TaskRecord | undefined {
const task = getTaskById(taskId);
return task ? reconcileTaskRecordForOperatorInspection(task) : undefined;
}

View File

@@ -0,0 +1,5 @@
export {
reconcileInspectableTasks,
reconcileTaskLookupToken,
reconcileTaskRecordForOperatorInspection,
} from "./task-registry.maintenance.js";

View File

@@ -0,0 +1,68 @@
import os from "node:os";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
import type { TaskRecord } from "./task-registry.types.js";
type PersistedTaskRegistry = {
version: 1;
tasks: Record<string, TaskRecord>;
};
const TASK_REGISTRY_VERSION = 1 as const;
function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string {
const explicit = env.OPENCLAW_STATE_DIR?.trim();
if (explicit) {
return resolveStateDir(env);
}
if (env.VITEST || env.NODE_ENV === "test") {
return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid));
}
return resolveStateDir(env);
}
export function resolveTaskRegistryPath(): string {
return path.join(resolveTaskStateDir(process.env), "tasks", "runs.json");
}
export function loadTaskRegistryFromDisk(): Map<string, TaskRecord> {
const pathname = resolveTaskRegistryPath();
const raw = loadJsonFile(pathname);
if (!raw || typeof raw !== "object") {
return new Map();
}
const record = raw as Partial<PersistedTaskRegistry>;
if (record.version !== TASK_REGISTRY_VERSION) {
return new Map();
}
const tasksRaw = record.tasks;
if (!tasksRaw || typeof tasksRaw !== "object") {
return new Map();
}
const out = new Map<string, TaskRecord>();
for (const [taskId, entry] of Object.entries(tasksRaw)) {
if (!entry || typeof entry !== "object") {
continue;
}
const typed = entry;
if (!typed.taskId || typeof typed.taskId !== "string") {
continue;
}
out.set(taskId, typed);
}
return out;
}
export function saveTaskRegistryToDisk(tasks: Map<string, TaskRecord>) {
const pathname = resolveTaskRegistryPath();
const serialized: Record<string, TaskRecord> = {};
for (const [taskId, entry] of tasks.entries()) {
serialized[taskId] = entry;
}
const out: PersistedTaskRegistry = {
version: TASK_REGISTRY_VERSION,
tasks: serialized,
};
saveJsonFile(pathname, out);
}

View File

@@ -0,0 +1,969 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { resetHeartbeatWakeStateForTests } from "../infra/heartbeat-wake.js";
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createTaskRecord,
findTaskByRunId,
getTaskById,
listTaskRecords,
maybeDeliverTaskStateChangeUpdate,
maybeDeliverTaskTerminalUpdate,
resetTaskRegistryForTests,
resolveTaskForLookupToken,
updateTaskNotifyPolicyById,
updateTaskRecordById,
updateTaskStateByRunId,
} from "./task-registry.js";
import { reconcileInspectableTasks, sweepTaskRegistry } from "./task-registry.maintenance.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("./task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.mock("../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
async function loadFreshTaskRegistryModulesForControlTest() {
vi.resetModules();
vi.doMock("./task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.doMock("../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.doMock("../agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
return await import("./task-registry.js");
}
async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) {
const startedAt = Date.now();
for (;;) {
try {
assertion();
return;
} catch (error) {
if (Date.now() - startedAt >= timeoutMs) {
throw error;
}
await new Promise((resolve) => setTimeout(resolve, stepMs));
}
}
}
async function flushAsyncWork(times = 4) {
for (let index = 0; index < times; index += 1) {
await Promise.resolve();
}
}
describe("task-registry", () => {
afterEach(() => {
vi.useRealTimers();
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetSystemEventsForTest();
resetHeartbeatWakeStateForTests();
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
});
it("updates task status from lifecycle events", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:acp:child",
runId: "run-1",
task: "Do the thing",
status: "running",
deliveryStatus: "not_applicable",
startedAt: 100,
});
emitAgentEvent({
runId: "run-1",
stream: "assistant",
data: {
text: "working",
},
});
emitAgentEvent({
runId: "run-1",
stream: "lifecycle",
data: {
phase: "end",
endedAt: 250,
},
});
expect(findTaskByRunId("run-1")).toMatchObject({
runtime: "acp",
status: "done",
endedAt: 250,
});
});
});
it("delivers ACP completion to the requester channel when a delivery origin exists", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "telegram",
to: "telegram:123",
via: "direct",
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
threadId: "321",
},
childSessionKey: "agent:main:acp:child",
runId: "run-delivery",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
startedAt: 100,
});
emitAgentEvent({
runId: "run-delivery",
stream: "lifecycle",
data: {
phase: "end",
endedAt: 250,
},
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-delivery")).toMatchObject({
status: "done",
deliveryStatus: "delivered",
}),
);
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
threadId: "321",
content: expect.stringContaining("Background task done: ACP background task"),
mirror: expect.objectContaining({
sessionKey: "agent:main:main",
}),
}),
),
);
expect(peekSystemEvents("agent:main:main")).toEqual([]);
});
});
it("records delivery failure and queues a session fallback when direct delivery misses", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockRejectedValueOnce(new Error("telegram unavailable"));
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-delivery-fail",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
startedAt: 100,
});
emitAgentEvent({
runId: "run-delivery-fail",
stream: "lifecycle",
data: {
phase: "error",
endedAt: 250,
error: "Permission denied by ACP runtime",
},
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-delivery-fail")).toMatchObject({
status: "failed",
deliveryStatus: "failed",
error: "Permission denied by ACP runtime",
}),
);
await waitForAssertion(() =>
expect(peekSystemEvents("agent:main:main")).toEqual([
expect.stringContaining("Background task failed: ACP background task"),
]),
);
});
});
it("marks internal fallback delivery as session queued instead of delivered", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:acp:child",
runId: "run-session-queued",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
startedAt: 100,
});
emitAgentEvent({
runId: "run-session-queued",
stream: "lifecycle",
data: {
phase: "end",
endedAt: 250,
},
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-session-queued")).toMatchObject({
status: "done",
deliveryStatus: "session_queued",
}),
);
expect(peekSystemEvents("agent:main:main")).toEqual([
expect.stringContaining("Background task done: ACP background task"),
]);
expect(hoisted.sendMessageMock).not.toHaveBeenCalled();
});
});
it("does not include internal progress detail in the terminal channel message", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "telegram",
to: "telegram:123",
via: "direct",
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
threadId: "321",
},
childSessionKey: "agent:main:acp:child",
runId: "run-detail-leak",
task: "Create the file and verify it",
status: "running",
deliveryStatus: "pending",
startedAt: 100,
});
updateTaskRecordById(findTaskByRunId("run-detail-leak")!.taskId, {
progressSummary:
"I am loading the local session context and checking helper command availability before writing the file.",
});
emitAgentEvent({
runId: "run-detail-leak",
stream: "lifecycle",
data: {
phase: "end",
endedAt: 250,
},
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content: "Background task done: ACP background task (run run-deta).",
}),
),
);
});
});
it("keeps distinct task records when different producers share a runId", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
source: "background_cli",
runtime: "cli",
requesterSessionKey: "agent:codex:acp:child",
childSessionKey: "agent:codex:acp:child",
runId: "run-shared",
task: "Child ACP execution",
status: "running",
deliveryStatus: "not_applicable",
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
runId: "run-shared",
task: "Spawn ACP child",
status: "running",
deliveryStatus: "pending",
});
expect(listTaskRecords().filter((task) => task.runId === "run-shared")).toHaveLength(2);
expect(findTaskByRunId("run-shared")).toMatchObject({
source: "sessions_spawn",
runtime: "acp",
task: "Spawn ACP child",
});
});
});
it("suppresses duplicate ACP delivery when a preferred spawned task shares the runId", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "telegram",
to: "telegram:123",
via: "direct",
});
const directTask = createTaskRecord({
source: "unknown",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-shared-delivery",
task: "Direct ACP child",
status: "done",
deliveryStatus: "pending",
});
const spawnedTask = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-shared-delivery",
task: "Spawn ACP child",
status: "done",
deliveryStatus: "pending",
});
await maybeDeliverTaskTerminalUpdate(directTask.taskId);
await maybeDeliverTaskTerminalUpdate(spawnedTask.taskId);
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1);
expect(listTaskRecords().filter((task) => task.runId === "run-shared-delivery")).toHaveLength(
1,
);
expect(findTaskByRunId("run-shared-delivery")).toMatchObject({
taskId: directTask.taskId,
source: "sessions_spawn",
deliveryStatus: "delivered",
});
});
});
it("collapses ACP run-owned task creation onto the existing spawned task", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const spawnedTask = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-collapse",
task: "Spawn ACP child",
status: "running",
deliveryStatus: "pending",
streamLogPath: "/tmp/stream.jsonl",
});
const directTask = createTaskRecord({
source: "unknown",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-collapse",
task: "Direct ACP child",
status: "running",
});
expect(directTask.taskId).toBe(spawnedTask.taskId);
expect(listTaskRecords().filter((task) => task.runId === "run-collapse")).toHaveLength(1);
expect(findTaskByRunId("run-collapse")).toMatchObject({
source: "sessions_spawn",
task: "Spawn ACP child",
streamLogPath: "/tmp/stream.jsonl",
});
});
});
it("delivers a terminal ACP update only once when multiple notifiers race", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "telegram",
to: "telegram:123",
via: "direct",
});
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-racing-delivery",
task: "Investigate issue",
status: "done",
deliveryStatus: "pending",
});
const first = maybeDeliverTaskTerminalUpdate(task.taskId);
const second = maybeDeliverTaskTerminalUpdate(task.taskId);
await Promise.all([first, second]);
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1);
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
idempotencyKey: `task-terminal:${task.taskId}:done`,
mirror: expect.objectContaining({
idempotencyKey: `task-terminal:${task.taskId}:done`,
}),
}),
);
expect(findTaskByRunId("run-racing-delivery")).toMatchObject({
deliveryStatus: "delivered",
});
});
});
it("restores persisted tasks from disk on the next lookup", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:subagent:child",
runId: "run-restore",
task: "Restore me",
status: "running",
deliveryStatus: "pending",
});
resetTaskRegistryForTests({
persist: false,
});
expect(resolveTaskForLookupToken(task.taskId)).toMatchObject({
taskId: task.taskId,
runId: "run-restore",
task: "Restore me",
});
});
});
it("projects inspection-time orphaned tasks as lost without mutating the registry", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:acp:missing",
runId: "run-lost",
task: "Missing child",
status: "running",
deliveryStatus: "pending",
});
updateTaskRecordById(task.taskId, {
lastEventAt: Date.now() - 10 * 60_000,
});
const tasks = reconcileInspectableTasks();
expect(tasks[0]).toMatchObject({
runId: "run-lost",
status: "lost",
error: "backing session missing",
});
expect(getTaskById(task.taskId)).toMatchObject({
status: "running",
});
expect(peekSystemEvents("agent:main:main")).toEqual([]);
});
});
it("prunes old terminal tasks during maintenance sweeps", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
source: "background_cli",
runtime: "cli",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:main",
runId: "run-prune",
task: "Old completed task",
status: "done",
deliveryStatus: "not_applicable",
startedAt: Date.now() - 9 * 24 * 60 * 60_000,
});
updateTaskRecordById(task.taskId, {
endedAt: Date.now() - 8 * 24 * 60 * 60_000,
lastEventAt: Date.now() - 8 * 24 * 60 * 60_000,
});
expect(sweepTaskRegistry()).toEqual({
reconciled: 0,
pruned: 1,
});
expect(listTaskRecords()).toEqual([]);
});
});
it("delivers concise state-change updates only when notify policy requests them", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "discord",
to: "discord:123",
via: "direct",
});
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "discord",
to: "discord:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-state-change",
task: "Investigate issue",
status: "accepted",
notifyPolicy: "done_only",
});
updateTaskStateByRunId({
runId: "run-state-change",
status: "running",
eventSummary: "Started.",
});
await waitForAssertion(() => expect(hoisted.sendMessageMock).not.toHaveBeenCalled());
updateTaskNotifyPolicyById({
taskId: task.taskId,
notifyPolicy: "state_changes",
});
updateTaskStateByRunId({
runId: "run-state-change",
eventSummary: "No output for 60s. It may be waiting for input.",
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content:
"Background task update: ACP background task. No output for 60s. It may be waiting for input.",
}),
),
);
expect(findTaskByRunId("run-state-change")).toMatchObject({
notifyPolicy: "state_changes",
lastNotifiedEventAt: expect.any(Number),
recentEvents: expect.arrayContaining([
expect.objectContaining({
kind: "progress",
summary: "No output for 60s. It may be waiting for input.",
}),
]),
});
await maybeDeliverTaskStateChangeUpdate(task.taskId);
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1);
});
});
it("keeps background ACP progress off the foreground lane and only sends a terminal notify", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSystemEventsForTest();
hoisted.sendMessageMock.mockResolvedValue({
channel: "discord",
to: "discord:123",
via: "direct",
});
vi.useFakeTimers();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "discord",
to: "discord:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-quiet-terminal",
task: "Create the file",
status: "running",
deliveryStatus: "pending",
});
const relay = startAcpSpawnParentStreamRelay({
runId: "run-quiet-terminal",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
agentId: "codex",
surfaceUpdates: false,
streamFlushMs: 1,
noOutputNoticeMs: 1_000,
noOutputPollMs: 250,
});
relay.notifyStarted();
emitAgentEvent({
runId: "run-quiet-terminal",
stream: "assistant",
data: {
delta: "working on it",
},
});
vi.advanceTimersByTime(10);
expect(peekSystemEvents("agent:main:main")).toEqual([]);
expect(hoisted.sendMessageMock).not.toHaveBeenCalled();
emitAgentEvent({
runId: "run-quiet-terminal",
stream: "lifecycle",
data: {
phase: "end",
endedAt: 250,
},
});
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
to: "discord:123",
content: "Background task done: ACP background task (run run-quie).",
}),
);
expect(peekSystemEvents("agent:main:main")).toEqual([]);
relay.dispose();
vi.useRealTimers();
});
});
it("delivers a concise terminal failure message without internal ACP chatter", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSystemEventsForTest();
hoisted.sendMessageMock.mockResolvedValue({
channel: "discord",
to: "discord:123",
via: "direct",
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "discord",
to: "discord:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-failure-terminal",
task: "Write the file",
status: "running",
deliveryStatus: "pending",
progressSummary:
"I am loading session context and checking helper availability before writing the file.",
});
emitAgentEvent({
runId: "run-failure-terminal",
stream: "lifecycle",
data: {
phase: "error",
endedAt: 250,
error: "Permission denied by ACP runtime",
},
});
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
to: "discord:123",
content:
"Background task failed: ACP background task (run run-fail). Permission denied by ACP runtime",
}),
);
expect(peekSystemEvents("agent:main:main")).toEqual([]);
});
});
it("emits concise state-change updates without surfacing raw ACP chatter", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSystemEventsForTest();
hoisted.sendMessageMock.mockResolvedValue({
channel: "discord",
to: "discord:123",
via: "direct",
});
vi.useFakeTimers();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "discord",
to: "discord:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-state-stream",
task: "Create the file",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "state_changes",
});
const relay = startAcpSpawnParentStreamRelay({
runId: "run-state-stream",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
agentId: "codex",
surfaceUpdates: false,
streamFlushMs: 1,
noOutputNoticeMs: 1_000,
noOutputPollMs: 250,
});
relay.notifyStarted();
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content: "Background task update: ACP background task. Started.",
}),
);
hoisted.sendMessageMock.mockClear();
vi.advanceTimersByTime(1_500);
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content:
"Background task update: ACP background task. No output for 1s. It may be waiting for input.",
}),
);
expect(peekSystemEvents("agent:main:main")).toEqual([]);
relay.dispose();
vi.useRealTimers();
});
});
it("cancels ACP-backed tasks through the ACP session manager", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
const registry = await loadFreshTaskRegistryModulesForControlTest();
process.env.OPENCLAW_STATE_DIR = root;
registry.resetTaskRegistryForTests();
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const task = registry.createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-cancel-acp",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
const result = await registry.cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.cancelSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:codex:acp:child",
reason: "task-cancel",
}),
);
expect(result).toMatchObject({
found: true,
cancelled: true,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled",
error: "Cancelled by operator.",
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
content: "Background task cancelled: ACP background task (run run-canc).",
}),
),
);
});
});
it("cancels subagent-backed tasks through subagent control", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
const registry = await loadFreshTaskRegistryModulesForControlTest();
process.env.OPENCLAW_STATE_DIR = root;
registry.resetTaskRegistryForTests();
hoisted.killSubagentRunAdminMock.mockResolvedValue({
found: true,
killed: true,
});
const task = registry.createTaskRecord({
source: "sessions_spawn",
runtime: "subagent",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:worker:subagent:child",
runId: "run-cancel-subagent",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
const result = await registry.cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:worker:subagent:child",
}),
);
expect(result).toMatchObject({
found: true,
cancelled: true,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled",
error: "Cancelled by operator.",
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
content: "Background task cancelled: Subagent task (run run-canc).",
}),
),
);
});
});
});

976
src/tasks/task-registry.ts Normal file
View File

@@ -0,0 +1,976 @@
import crypto from "node:crypto";
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
import { killSubagentRunAdmin } from "../agents/subagent-control.js";
import type { OpenClawConfig } from "../config/config.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { loadTaskRegistryFromDisk, saveTaskRegistryToDisk } from "./task-registry.store.js";
import type {
TaskBindingTargetKind,
TaskDeliveryStatus,
TaskEventKind,
TaskEventRecord,
TaskNotifyPolicy,
TaskRecord,
TaskRegistrySnapshot,
TaskRuntime,
TaskSource,
TaskStatus,
} from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/registry");
const tasks = new Map<string, TaskRecord>();
const taskIdsByRunId = new Map<string, Set<string>>();
const tasksWithPendingDelivery = new Set<string>();
let listenerStarted = false;
let listenerStop: (() => void) | null = null;
let restoreAttempted = false;
let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runtime.js")> | null =
null;
function cloneTaskRecord(record: TaskRecord): TaskRecord {
return {
...record,
...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}),
...(record.recentEvents
? { recentEvents: record.recentEvents.map((event) => ({ ...event })) }
: {}),
};
}
function persistTaskRegistry() {
saveTaskRegistryToDisk(tasks);
}
function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus {
return requesterSessionKey.trim() ? "pending" : "parent_missing";
}
function ensureNotifyPolicy(params: {
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
requesterSessionKey: string;
}): TaskNotifyPolicy {
if (params.notifyPolicy) {
return params.notifyPolicy;
}
const deliveryStatus = params.deliveryStatus ?? ensureDeliveryStatus(params.requesterSessionKey);
return deliveryStatus === "not_applicable" ? "silent" : "done_only";
}
function normalizeTaskSummary(value: string | null | undefined): string | undefined {
const normalized = value?.replace(/\s+/g, " ").trim();
return normalized || undefined;
}
const TASK_RECENT_EVENT_LIMIT = 12;
function appendTaskEvent(
current: TaskRecord,
event: {
at: number;
kind: TaskEventKind;
summary?: string | null;
},
): TaskEventRecord[] {
const summary = normalizeTaskSummary(event.summary);
const nextEvent: TaskEventRecord = {
at: event.at,
kind: event.kind,
...(summary ? { summary } : {}),
};
const previous = current.recentEvents ?? [];
const merged = [...previous, nextEvent];
return merged.slice(-TASK_RECENT_EVENT_LIMIT);
}
function loadTaskRegistryDeliveryRuntime() {
deliveryRuntimePromise ??= import("./task-registry-delivery-runtime.js");
return deliveryRuntimePromise;
}
function addRunIdIndex(taskId: string, runId?: string) {
const trimmed = runId?.trim();
if (!trimmed) {
return;
}
let ids = taskIdsByRunId.get(trimmed);
if (!ids) {
ids = new Set<string>();
taskIdsByRunId.set(trimmed, ids);
}
ids.add(taskId);
}
function rebuildRunIdIndex() {
taskIdsByRunId.clear();
for (const [taskId, task] of tasks.entries()) {
addRunIdIndex(taskId, task.runId);
}
}
function getTasksByRunId(runId: string): TaskRecord[] {
const ids = taskIdsByRunId.get(runId.trim());
if (!ids || ids.size === 0) {
return [];
}
return [...ids]
.map((taskId) => tasks.get(taskId))
.filter((task): task is TaskRecord => Boolean(task));
}
function taskLookupPriority(task: TaskRecord): number {
const sourcePriority =
task.source === "sessions_spawn" ? 0 : task.source === "background_cli" ? 1 : 2;
const runtimePriority = task.runtime === "cli" ? 1 : 0;
return sourcePriority * 10 + runtimePriority;
}
function pickPreferredRunIdTask(matches: TaskRecord[]): TaskRecord | undefined {
return [...matches].toSorted((left, right) => {
const priorityDiff = taskLookupPriority(left) - taskLookupPriority(right);
if (priorityDiff !== 0) {
return priorityDiff;
}
return left.createdAt - right.createdAt;
})[0];
}
function normalizeComparableText(value: string | undefined): string {
return value?.trim() ?? "";
}
function findExistingTaskForCreate(params: {
source: TaskSource;
runtime: TaskRuntime;
requesterSessionKey: string;
childSessionKey?: string;
runId?: string;
bindingTargetKind?: TaskBindingTargetKind;
label?: string;
task: string;
}): TaskRecord | undefined {
const runId = params.runId?.trim();
const exact = runId
? getTasksByRunId(runId).find(
(task) =>
task.source === params.source &&
task.runtime === params.runtime &&
normalizeComparableText(task.requesterSessionKey) ===
normalizeComparableText(params.requesterSessionKey) &&
normalizeComparableText(task.childSessionKey) ===
normalizeComparableText(params.childSessionKey) &&
normalizeComparableText(task.bindingTargetKind) ===
normalizeComparableText(params.bindingTargetKind) &&
normalizeComparableText(task.label) === normalizeComparableText(params.label) &&
normalizeComparableText(task.task) === normalizeComparableText(params.task),
)
: undefined;
if (exact) {
return exact;
}
if (!runId || params.runtime !== "acp") {
return undefined;
}
const siblingMatches = getTasksByRunId(runId).filter(
(task) =>
task.runtime === params.runtime &&
normalizeComparableText(task.requesterSessionKey) ===
normalizeComparableText(params.requesterSessionKey) &&
normalizeComparableText(task.childSessionKey) ===
normalizeComparableText(params.childSessionKey),
);
if (siblingMatches.length === 0) {
return undefined;
}
return pickPreferredRunIdTask(siblingMatches);
}
function sourceUpgradePriority(source: TaskSource): number {
return source === "sessions_spawn" ? 0 : source === "background_cli" ? 1 : 2;
}
function mergeExistingTaskForCreate(
existing: TaskRecord,
params: {
source: TaskSource;
requesterOrigin?: TaskRecord["requesterOrigin"];
bindingTargetKind?: TaskBindingTargetKind;
label?: string;
task: string;
deliveryStatus?: TaskDeliveryStatus;
notifyPolicy?: TaskNotifyPolicy;
streamLogPath?: string;
},
): TaskRecord {
const patch: Partial<TaskRecord> = {};
if (sourceUpgradePriority(params.source) < sourceUpgradePriority(existing.source)) {
patch.source = params.source;
}
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
if (requesterOrigin && !existing.requesterOrigin) {
patch.requesterOrigin = requesterOrigin;
}
if (params.bindingTargetKind && !existing.bindingTargetKind) {
patch.bindingTargetKind = params.bindingTargetKind;
}
if (params.label?.trim() && !existing.label?.trim()) {
patch.label = params.label.trim();
}
if (params.streamLogPath?.trim() && !existing.streamLogPath?.trim()) {
patch.streamLogPath = params.streamLogPath.trim();
}
if (params.source === "sessions_spawn" && existing.source !== "sessions_spawn") {
patch.task = params.task;
}
if (params.deliveryStatus === "pending" && existing.deliveryStatus !== "delivered") {
patch.deliveryStatus = "pending";
}
const notifyPolicy = ensureNotifyPolicy({
notifyPolicy: params.notifyPolicy,
deliveryStatus: params.deliveryStatus,
requesterSessionKey: existing.requesterSessionKey,
});
if (notifyPolicy !== existing.notifyPolicy && existing.notifyPolicy === "silent") {
patch.notifyPolicy = notifyPolicy;
}
if (Object.keys(patch).length === 0) {
return cloneTaskRecord(existing);
}
return updateTask(existing.taskId, patch) ?? cloneTaskRecord(existing);
}
function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string {
return `task-terminal:${task.taskId}:${task.status}`;
}
function restoreTaskRegistryOnce() {
if (restoreAttempted) {
return;
}
restoreAttempted = true;
try {
const restored = loadTaskRegistryFromDisk();
if (restored.size === 0) {
return;
}
for (const [taskId, task] of restored.entries()) {
tasks.set(taskId, task);
}
rebuildRunIdIndex();
} catch (error) {
log.warn("Failed to restore task registry", { error });
}
}
export function ensureTaskRegistryReady() {
restoreTaskRegistryOnce();
ensureListener();
}
function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | null {
const current = tasks.get(taskId);
if (!current) {
return null;
}
const next = { ...current, ...patch };
tasks.set(taskId, next);
if (patch.runId && patch.runId !== current.runId) {
rebuildRunIdIndex();
}
persistTaskRegistry();
return cloneTaskRecord(next);
}
function formatTaskTerminalEvent(task: TaskRecord): string {
// User-facing task notifications stay intentionally terse. Detailed runtime chatter lives
// in task metadata for inspection, not in the default channel ping.
const title =
task.label?.trim() ||
(task.runtime === "acp"
? "ACP background task"
: task.runtime === "subagent"
? "Subagent task"
: task.task.trim() || "Background task");
const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : "";
const summary = task.terminalSummary?.trim();
if (task.status === "done") {
return summary
? `Background task done: ${title}${runLabel}. ${summary}`
: `Background task done: ${title}${runLabel}.`;
}
if (task.status === "timed_out") {
return `Background task timed out: ${title}${runLabel}.`;
}
if (task.status === "lost") {
return `Background task lost: ${title}${runLabel}. ${task.error ?? "Backing session disappeared."}`;
}
if (task.status === "cancelled") {
return `Background task cancelled: ${title}${runLabel}.`;
}
const error = task.error?.trim();
return error
? `Background task failed: ${title}${runLabel}. ${error}`
: `Background task failed: ${title}${runLabel}.`;
}
function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean {
const origin = normalizeDeliveryContext(task.requesterOrigin);
const channel = origin?.channel?.trim();
const to = origin?.to?.trim();
return Boolean(channel && to && isDeliverableMessageChannel(channel));
}
function queueTaskSystemEvent(task: TaskRecord, text: string) {
const requesterSessionKey = task.requesterSessionKey.trim();
if (!requesterSessionKey) {
return false;
}
enqueueSystemEvent(text, {
sessionKey: requesterSessionKey,
contextKey: `task:${task.taskId}`,
deliveryContext: task.requesterOrigin,
});
requestHeartbeatNow({
reason: "background-task",
sessionKey: requesterSessionKey,
});
return true;
}
function formatTaskStateChangeEvent(task: TaskRecord, event: TaskEventRecord): string | null {
const title =
task.label?.trim() ||
(task.runtime === "acp"
? "ACP background task"
: task.runtime === "subagent"
? "Subagent task"
: task.task.trim() || "Background task");
if (event.kind === "running") {
return `Background task started: ${title}.`;
}
if (event.kind === "progress") {
return event.summary ? `Background task update: ${title}. ${event.summary}` : null;
}
return null;
}
function shouldAutoDeliverTaskUpdate(task: TaskRecord): boolean {
if (task.notifyPolicy === "silent") {
return false;
}
if (task.runtime === "subagent" && task.status !== "cancelled") {
return false;
}
if (
task.status !== "done" &&
task.status !== "failed" &&
task.status !== "timed_out" &&
task.status !== "lost" &&
task.status !== "cancelled"
) {
return false;
}
return task.deliveryStatus === "pending";
}
function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean {
return (
task.notifyPolicy === "state_changes" &&
task.deliveryStatus === "pending" &&
task.status !== "done" &&
task.status !== "failed" &&
task.status !== "timed_out" &&
task.status !== "lost" &&
task.status !== "cancelled"
);
}
function shouldSuppressDuplicateTerminalDelivery(task: TaskRecord): boolean {
if (task.runtime !== "acp" || !task.runId?.trim()) {
return false;
}
const preferred = pickPreferredRunIdTask(getTasksByRunId(task.runId));
return Boolean(preferred && preferred.taskId !== task.taskId);
}
export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<TaskRecord | null> {
ensureTaskRegistryReady();
const current = tasks.get(taskId);
if (!current || !shouldAutoDeliverTaskUpdate(current)) {
return current ? cloneTaskRecord(current) : null;
}
if (tasksWithPendingDelivery.has(taskId)) {
return cloneTaskRecord(current);
}
tasksWithPendingDelivery.add(taskId);
try {
const latest = tasks.get(taskId);
if (!latest || !shouldAutoDeliverTaskUpdate(latest)) {
return latest ? cloneTaskRecord(latest) : null;
}
if (shouldSuppressDuplicateTerminalDelivery(latest)) {
return updateTask(taskId, {
deliveryStatus: "not_applicable",
lastEventAt: Date.now(),
});
}
if (!latest.requesterSessionKey.trim()) {
return updateTask(taskId, {
deliveryStatus: "parent_missing",
lastEventAt: Date.now(),
});
}
const eventText = formatTaskTerminalEvent(latest);
if (!canDeliverTaskToRequesterOrigin(latest)) {
try {
queueTaskSystemEvent(latest, eventText);
return updateTask(taskId, {
deliveryStatus: "session_queued",
lastEventAt: Date.now(),
});
} catch (error) {
log.warn("Failed to queue background task session delivery", {
taskId,
requesterSessionKey: latest.requesterSessionKey,
error,
});
return updateTask(taskId, {
deliveryStatus: "failed",
lastEventAt: Date.now(),
});
}
}
try {
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
const origin = normalizeDeliveryContext(latest.requesterOrigin);
const requesterAgentId = parseAgentSessionKey(latest.requesterSessionKey)?.agentId;
await sendMessage({
channel: origin?.channel,
to: origin?.to ?? "",
accountId: origin?.accountId,
threadId: origin?.threadId,
content: eventText,
agentId: requesterAgentId,
idempotencyKey: taskTerminalDeliveryIdempotencyKey(latest),
mirror: {
sessionKey: latest.requesterSessionKey,
agentId: requesterAgentId,
idempotencyKey: taskTerminalDeliveryIdempotencyKey(latest),
},
});
return updateTask(taskId, {
deliveryStatus: "delivered",
lastEventAt: Date.now(),
});
} catch (error) {
log.warn("Failed to deliver background task update", {
taskId,
requesterSessionKey: latest.requesterSessionKey,
requesterOrigin: latest.requesterOrigin,
error,
});
try {
queueTaskSystemEvent(latest, eventText);
} catch (fallbackError) {
log.warn("Failed to queue background task fallback event", {
taskId,
requesterSessionKey: latest.requesterSessionKey,
error: fallbackError,
});
}
return updateTask(taskId, {
deliveryStatus: "failed",
lastEventAt: Date.now(),
});
}
} finally {
tasksWithPendingDelivery.delete(taskId);
}
}
export async function maybeDeliverTaskStateChangeUpdate(
taskId: string,
): Promise<TaskRecord | null> {
ensureTaskRegistryReady();
const current = tasks.get(taskId);
if (!current || !shouldAutoDeliverTaskStateChange(current)) {
return current ? cloneTaskRecord(current) : null;
}
const latestEvent = current.recentEvents?.at(-1);
if (!latestEvent || (current.lastNotifiedEventAt ?? 0) >= latestEvent.at) {
return cloneTaskRecord(current);
}
const eventText = formatTaskStateChangeEvent(current, latestEvent);
if (!eventText) {
return cloneTaskRecord(current);
}
try {
if (!canDeliverTaskToRequesterOrigin(current)) {
queueTaskSystemEvent(current, eventText);
return updateTask(taskId, {
lastNotifiedEventAt: latestEvent.at,
lastEventAt: Date.now(),
});
}
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
const origin = normalizeDeliveryContext(current.requesterOrigin);
const requesterAgentId = parseAgentSessionKey(current.requesterSessionKey)?.agentId;
await sendMessage({
channel: origin?.channel,
to: origin?.to ?? "",
accountId: origin?.accountId,
threadId: origin?.threadId,
content: eventText,
agentId: requesterAgentId,
idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`,
mirror: {
sessionKey: current.requesterSessionKey,
agentId: requesterAgentId,
idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`,
},
});
return updateTask(taskId, {
lastNotifiedEventAt: latestEvent.at,
lastEventAt: Date.now(),
});
} catch (error) {
log.warn("Failed to deliver background task state change", {
taskId,
requesterSessionKey: current.requesterSessionKey,
error,
});
return cloneTaskRecord(current);
}
}
export function updateTaskRecordById(
taskId: string,
patch: Partial<TaskRecord>,
): TaskRecord | null {
ensureTaskRegistryReady();
return updateTask(taskId, patch);
}
function updateTasksByRunId(runId: string, patch: Partial<TaskRecord>): TaskRecord[] {
const ids = taskIdsByRunId.get(runId.trim());
if (!ids || ids.size === 0) {
return [];
}
const updated: TaskRecord[] = [];
for (const taskId of ids) {
const task = updateTask(taskId, patch);
if (task) {
updated.push(task);
}
}
return updated;
}
function ensureListener() {
if (listenerStarted) {
return;
}
listenerStarted = true;
listenerStop = onAgentEvent((evt) => {
restoreTaskRegistryOnce();
const ids = taskIdsByRunId.get(evt.runId);
if (!ids || ids.size === 0) {
return;
}
const now = evt.ts || Date.now();
for (const taskId of ids) {
const current = tasks.get(taskId);
if (!current) {
continue;
}
const patch: Partial<TaskRecord> = {
lastEventAt: now,
};
if (evt.stream === "lifecycle") {
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : undefined;
const startedAt =
typeof evt.data?.startedAt === "number" ? evt.data.startedAt : current.startedAt;
const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : undefined;
if (startedAt) {
patch.startedAt = startedAt;
}
if (phase === "start") {
patch.status = "running";
} else if (phase === "end") {
patch.status = evt.data?.aborted === true ? "timed_out" : "done";
patch.endedAt = endedAt ?? now;
} else if (phase === "error") {
patch.status = "failed";
patch.endedAt = endedAt ?? now;
patch.error = typeof evt.data?.error === "string" ? evt.data.error : current.error;
}
} else if (evt.stream === "error") {
patch.error = typeof evt.data?.error === "string" ? evt.data.error : current.error;
}
if (patch.status && patch.status !== current.status) {
patch.recentEvents = appendTaskEvent(current, {
at: now,
kind: patch.status,
summary:
patch.status === "failed"
? (patch.error ?? current.error)
: patch.status === "done"
? current.terminalSummary
: undefined,
});
}
const updated = updateTask(taskId, patch);
if (updated) {
void maybeDeliverTaskStateChangeUpdate(taskId);
void maybeDeliverTaskTerminalUpdate(taskId);
}
}
});
}
export function createTaskRecord(params: {
source: TaskSource;
runtime: TaskRuntime;
requesterSessionKey: string;
requesterOrigin?: TaskRecord["requesterOrigin"];
childSessionKey?: string;
runId?: string;
bindingTargetKind?: TaskBindingTargetKind;
label?: string;
task: string;
status?: TaskStatus;
deliveryStatus?: TaskDeliveryStatus;
notifyPolicy?: TaskNotifyPolicy;
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
terminalSummary?: string | null;
transcriptPath?: string;
streamLogPath?: string;
backend?: string;
agentSessionId?: string;
backendSessionId?: string;
}): TaskRecord {
ensureTaskRegistryReady();
const existing = findExistingTaskForCreate(params);
if (existing) {
return mergeExistingTaskForCreate(existing, params);
}
const now = Date.now();
const taskId = crypto.randomUUID();
const status = params.status ?? "accepted";
const deliveryStatus = params.deliveryStatus ?? ensureDeliveryStatus(params.requesterSessionKey);
const notifyPolicy = ensureNotifyPolicy({
notifyPolicy: params.notifyPolicy,
deliveryStatus,
requesterSessionKey: params.requesterSessionKey,
});
const lastEventAt = params.lastEventAt ?? params.startedAt ?? now;
const record: TaskRecord = {
taskId,
source: params.source,
runtime: params.runtime,
requesterSessionKey: params.requesterSessionKey,
requesterOrigin: normalizeDeliveryContext(params.requesterOrigin),
childSessionKey: params.childSessionKey,
runId: params.runId?.trim() || undefined,
bindingTargetKind: params.bindingTargetKind,
label: params.label?.trim() || undefined,
task: params.task,
status,
deliveryStatus,
notifyPolicy,
createdAt: now,
startedAt: params.startedAt,
lastEventAt,
progressSummary: normalizeTaskSummary(params.progressSummary),
terminalSummary: normalizeTaskSummary(params.terminalSummary),
recentEvents: appendTaskEvent(
{
taskId,
source: params.source,
runtime: params.runtime,
requesterSessionKey: params.requesterSessionKey,
task: params.task,
status,
deliveryStatus,
notifyPolicy,
createdAt: now,
} as TaskRecord,
{
at: lastEventAt,
kind: status,
},
),
transcriptPath: params.transcriptPath,
streamLogPath: params.streamLogPath,
backend: params.backend,
agentSessionId: params.agentSessionId,
backendSessionId: params.backendSessionId,
};
tasks.set(taskId, record);
addRunIdIndex(taskId, record.runId);
persistTaskRegistry();
return cloneTaskRecord(record);
}
export function updateTaskStateByRunId(params: {
runId: string;
status?: TaskStatus;
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
error?: string;
progressSummary?: string | null;
terminalSummary?: string | null;
eventSummary?: string | null;
}) {
ensureTaskRegistryReady();
const ids = taskIdsByRunId.get(params.runId.trim());
if (!ids || ids.size === 0) {
return [];
}
const updated: TaskRecord[] = [];
for (const taskId of ids) {
const current = tasks.get(taskId);
if (!current) {
continue;
}
const patch: Partial<TaskRecord> = {};
const nextStatus = params.status ?? current.status;
const eventAt = params.lastEventAt ?? params.endedAt ?? Date.now();
if (params.status) {
patch.status = params.status;
}
if (params.startedAt != null) {
patch.startedAt = params.startedAt;
}
if (params.endedAt != null) {
patch.endedAt = params.endedAt;
}
if (params.lastEventAt != null) {
patch.lastEventAt = params.lastEventAt;
}
if (params.error !== undefined) {
patch.error = params.error;
}
if (params.progressSummary !== undefined) {
patch.progressSummary = normalizeTaskSummary(params.progressSummary);
}
if (params.terminalSummary !== undefined) {
patch.terminalSummary = normalizeTaskSummary(params.terminalSummary);
}
const eventSummary =
normalizeTaskSummary(params.eventSummary) ??
(nextStatus === "failed"
? normalizeTaskSummary(params.error ?? current.error)
: nextStatus === "done"
? normalizeTaskSummary(params.terminalSummary ?? current.terminalSummary)
: undefined);
const shouldAppendEvent =
(params.status && params.status !== current.status) ||
Boolean(normalizeTaskSummary(params.eventSummary));
if (shouldAppendEvent) {
patch.recentEvents = appendTaskEvent(current, {
at: eventAt,
kind: params.status && params.status !== current.status ? params.status : "progress",
summary: eventSummary,
});
}
const task = updateTask(taskId, patch);
if (task) {
updated.push(task);
}
}
for (const task of updated) {
void maybeDeliverTaskStateChangeUpdate(task.taskId);
void maybeDeliverTaskTerminalUpdate(task.taskId);
}
return updated;
}
export function updateTaskDeliveryByRunId(params: {
runId: string;
deliveryStatus: TaskDeliveryStatus;
}) {
ensureTaskRegistryReady();
return updateTasksByRunId(params.runId, {
deliveryStatus: params.deliveryStatus,
});
}
export function updateTaskNotifyPolicyById(params: {
taskId: string;
notifyPolicy: TaskNotifyPolicy;
}): TaskRecord | null {
ensureTaskRegistryReady();
return updateTask(params.taskId, {
notifyPolicy: params.notifyPolicy,
lastEventAt: Date.now(),
});
}
export async function cancelTaskById(params: {
cfg: OpenClawConfig;
taskId: string;
}): Promise<{ found: boolean; cancelled: boolean; reason?: string; task?: TaskRecord }> {
ensureTaskRegistryReady();
const task = tasks.get(params.taskId.trim());
if (!task) {
return { found: false, cancelled: false, reason: "Task not found." };
}
if (
task.status === "done" ||
task.status === "failed" ||
task.status === "timed_out" ||
task.status === "lost" ||
task.status === "cancelled"
) {
return {
found: true,
cancelled: false,
reason: "Task is already terminal.",
task: cloneTaskRecord(task),
};
}
const childSessionKey = task.childSessionKey?.trim();
if (!childSessionKey) {
return {
found: true,
cancelled: false,
reason: "Task has no cancellable child session.",
task: cloneTaskRecord(task),
};
}
try {
if (task.runtime === "acp") {
await getAcpSessionManager().cancelSession({
cfg: params.cfg,
sessionKey: childSessionKey,
reason: "task-cancel",
});
} else if (task.runtime === "subagent") {
const result = await killSubagentRunAdmin({
cfg: params.cfg,
sessionKey: childSessionKey,
});
if (!result.found || !result.killed) {
return {
found: true,
cancelled: false,
reason: result.found ? "Subagent was not running." : "Subagent task not found.",
task: cloneTaskRecord(task),
};
}
} else {
return {
found: true,
cancelled: false,
reason: "Task runtime does not support cancellation yet.",
task: cloneTaskRecord(task),
};
}
const updated = updateTask(task.taskId, {
status: "cancelled",
endedAt: Date.now(),
lastEventAt: Date.now(),
error: "Cancelled by operator.",
recentEvents: appendTaskEvent(task, {
at: Date.now(),
kind: "cancelled",
summary: "Cancelled by operator.",
}),
});
if (updated) {
void maybeDeliverTaskTerminalUpdate(updated.taskId);
}
return {
found: true,
cancelled: true,
task: updated ?? cloneTaskRecord(task),
};
} catch (error) {
return {
found: true,
cancelled: false,
reason: error instanceof Error ? error.message : String(error),
task: cloneTaskRecord(task),
};
}
}
export function listTaskRecords(): TaskRecord[] {
ensureTaskRegistryReady();
return [...tasks.values()]
.map((task) => cloneTaskRecord(task))
.toSorted((a, b) => b.createdAt - a.createdAt);
}
export function getTaskRegistrySnapshot(): TaskRegistrySnapshot {
return {
tasks: listTaskRecords(),
};
}
export function getTaskById(taskId: string): TaskRecord | undefined {
ensureTaskRegistryReady();
const task = tasks.get(taskId.trim());
return task ? cloneTaskRecord(task) : undefined;
}
export function findTaskByRunId(runId: string): TaskRecord | undefined {
ensureTaskRegistryReady();
const task = pickPreferredRunIdTask(getTasksByRunId(runId));
return task ? cloneTaskRecord(task) : undefined;
}
export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined {
const key = sessionKey.trim();
if (!key) {
return undefined;
}
return listTaskRecords().find(
(task) => task.childSessionKey === key || task.requesterSessionKey === key,
);
}
export function resolveTaskForLookupToken(token: string): TaskRecord | undefined {
const lookup = token.trim();
if (!lookup) {
return undefined;
}
return getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForSessionKey(lookup);
}
export function deleteTaskRecordById(taskId: string): boolean {
ensureTaskRegistryReady();
const current = tasks.get(taskId);
if (!current) {
return false;
}
tasks.delete(taskId);
rebuildRunIdIndex();
persistTaskRegistry();
return true;
}
export function resetTaskRegistryForTests(opts?: { persist?: boolean }) {
tasks.clear();
taskIdsByRunId.clear();
restoreAttempted = false;
if (listenerStop) {
listenerStop();
listenerStop = null;
}
listenerStarted = false;
if (opts?.persist !== false) {
persistTaskRegistry();
}
}

View File

@@ -0,0 +1,68 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
export type TaskRuntime = "subagent" | "acp" | "cli";
export type TaskStatus =
| "accepted"
| "running"
| "done"
| "failed"
| "timed_out"
| "cancelled"
| "lost";
export type TaskDeliveryStatus =
| "pending"
| "delivered"
| "session_queued"
| "failed"
| "parent_missing"
| "not_applicable";
export type TaskNotifyPolicy = "done_only" | "state_changes" | "silent";
export type TaskBindingTargetKind = "subagent" | "session";
export type TaskSource = "sessions_spawn" | "background_cli" | "unknown";
export type TaskEventKind = TaskStatus | "progress";
export type TaskEventRecord = {
at: number;
kind: TaskEventKind;
summary?: string;
};
export type TaskRecord = {
taskId: string;
source: TaskSource;
runtime: TaskRuntime;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
childSessionKey?: string;
runId?: string;
bindingTargetKind?: TaskBindingTargetKind;
label?: string;
task: string;
status: TaskStatus;
deliveryStatus: TaskDeliveryStatus;
notifyPolicy: TaskNotifyPolicy;
createdAt: number;
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
error?: string;
progressSummary?: string;
terminalSummary?: string;
recentEvents?: TaskEventRecord[];
lastNotifiedEventAt?: number;
transcriptPath?: string;
streamLogPath?: string;
backend?: string;
agentSessionId?: string;
backendSessionId?: string;
};
export type TaskRegistrySnapshot = {
tasks: TaskRecord[];
};