diff --git a/CHANGELOG.md b/CHANGELOG.md index 20098144c4f..c2f3f459f12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ Docs: https://docs.openclaw.ai - Memory/plugins: move the pre-compaction memory flush plan behind the active memory plugin contract so `memory-core` owns flush prompts and target-path policy instead of hardcoded core logic. - MiniMax: trim model catalog to M2.7 only, removing legacy M2, M2.1, M2.5, and VL-01 models. (#54487) Thanks @liyuan97. - Plugins/runtime: expose `runHeartbeatOnce` in the plugin runtime `system` namespace so plugins can trigger a single heartbeat cycle with an explicit delivery target override (e.g. `heartbeat: { target: "last" }`). (#40299) Thanks @loveyana. +- Background tasks: keep durable lifecycle records for ACP/subagent spawned work and deliver ACP completion/failure updates through the real requester chat path instead of session-only stream events. - Agents/compaction: preserve the post-compaction AGENTS refresh on stale-usage preflight compaction for both immediate replies and queued followups. (#49479) Thanks @jared596. - Agents/compaction: surface safeguard-specific cancel reasons and relabel benign manual `/compact` no-op cases as skipped instead of failed. (#51072) Thanks @afurm. - Docs: add `pnpm docs:check-links:anchors` for Mintlify anchor validation while keeping `scripts/docs-link-audit.mjs` as the stable link-audit entrypoint. (#55912) Thanks @velvet-shark. diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index 2974c01151e..5ac48a357f9 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -3,6 +3,8 @@ import type { OpenClawConfig } from "../../config/config.js"; import { logVerbose } from "../../globals.js"; import { normalizeAgentId } from "../../routing/session-key.js"; import { isAcpSessionKey } from "../../sessions/session-key-utils.js"; +import { createTaskRecord, updateTaskStateByRunId } from "../../tasks/task-registry.js"; +import type { DeliveryContext } from "../../utils/delivery-context.js"; import { AcpRuntimeError, toAcpRuntimeError, @@ -75,6 +77,43 @@ import { SessionActorQueue } from "./session-actor-queue.js"; const ACP_TURN_TIMEOUT_GRACE_MS = 1_000; const ACP_TURN_TIMEOUT_CLEANUP_GRACE_MS = 2_000; const ACP_TURN_TIMEOUT_REASON = "turn-timeout"; +const ACP_BACKGROUND_TASK_TEXT_MAX_LENGTH = 160; +const ACP_BACKGROUND_TASK_PROGRESS_MAX_LENGTH = 240; + +function summarizeBackgroundTaskText(text: string): string { + const normalized = normalizeText(text) ?? "ACP background task"; + if (normalized.length <= ACP_BACKGROUND_TASK_TEXT_MAX_LENGTH) { + return normalized; + } + return `${normalized.slice(0, ACP_BACKGROUND_TASK_TEXT_MAX_LENGTH - 1)}…`; +} + +function appendBackgroundTaskProgressSummary(current: string, chunk: string): string { + const normalizedChunk = normalizeText(chunk)?.replace(/\s+/g, " "); + if (!normalizedChunk) { + return current; + } + const combined = current ? `${current} ${normalizedChunk}` : normalizedChunk; + if (combined.length <= ACP_BACKGROUND_TASK_PROGRESS_MAX_LENGTH) { + return combined; + } + return `${combined.slice(0, ACP_BACKGROUND_TASK_PROGRESS_MAX_LENGTH - 1)}…`; +} + +function resolveBackgroundTaskFailureStatus(error: AcpRuntimeError): "failed" | "timed_out" { + return /\btimed out\b/i.test(error.message) ? "timed_out" : "failed"; +} + +type BackgroundTaskContext = { + requesterSessionKey: string; + requesterOrigin?: DeliveryContext; + childSessionKey: string; + runId: string; + label?: string; + task: string; +}; + +type BackgroundTaskStatePatch = Omit[0], "runId">; export class AcpSessionManager { private readonly actorQueue = new SessionActorQueue(); @@ -614,6 +653,19 @@ export class AcpSessionManager { async () => { const turnStartedAt = Date.now(); const actorKey = normalizeActorKey(sessionKey); + const taskContext = + input.mode === "prompt" + ? this.resolveBackgroundTaskContext({ + cfg: input.cfg, + sessionKey, + requestId: input.requestId, + text: input.text, + }) + : null; + if (taskContext) { + this.createBackgroundTaskRecord(taskContext, turnStartedAt); + } + let taskProgressSummary = ""; for (let attempt = 0; attempt < 2; attempt += 1) { const resolution = this.resolveSession({ cfg: input.cfg, @@ -696,6 +748,19 @@ export class AcpSessionManager { ); } else if (event.type === "text_delta" || event.type === "tool_call") { sawTurnOutput = true; + if (event.type === "text_delta" && event.stream !== "thought" && event.text) { + taskProgressSummary = appendBackgroundTaskProgressSummary( + taskProgressSummary, + event.text, + ); + } + if (taskContext) { + this.updateBackgroundTaskState(taskContext.runId, { + status: "running", + lastEventAt: Date.now(), + progressSummary: taskProgressSummary || null, + }); + } } if (input.onEvent) { await input.onEvent(event); @@ -734,6 +799,16 @@ export class AcpSessionManager { this.recordTurnCompletion({ startedAt: turnStartedAt, }); + if (taskContext) { + this.updateBackgroundTaskState(taskContext.runId, { + status: "done", + endedAt: Date.now(), + lastEventAt: Date.now(), + error: undefined, + progressSummary: taskProgressSummary || null, + terminalSummary: null, + }); + } await this.setSessionState({ cfg: input.cfg, sessionKey, @@ -762,6 +837,16 @@ export class AcpSessionManager { startedAt: turnStartedAt, errorCode: acpError.code, }); + if (taskContext) { + this.updateBackgroundTaskState(taskContext.runId, { + status: resolveBackgroundTaskFailureStatus(acpError), + endedAt: Date.now(), + lastEventAt: Date.now(), + error: acpError.message, + progressSummary: taskProgressSummary || null, + terminalSummary: null, + }); + } await this.setSessionState({ cfg: input.cfg, sessionKey, @@ -1729,4 +1814,66 @@ export class AcpSessionManager { (a.agentSessionId ?? "") === (b.agentSessionId ?? "") ); } + + private resolveBackgroundTaskContext(params: { + cfg: OpenClawConfig; + sessionKey: string; + requestId: string; + text: string; + }): BackgroundTaskContext | null { + const childEntry = this.deps.readSessionEntry({ + cfg: params.cfg, + sessionKey: params.sessionKey, + })?.entry; + const requesterSessionKey = + normalizeText(childEntry?.spawnedBy) ?? normalizeText(childEntry?.parentSessionKey); + if (!requesterSessionKey) { + return null; + } + const parentEntry = this.deps.readSessionEntry({ + cfg: params.cfg, + sessionKey: requesterSessionKey, + })?.entry; + return { + requesterSessionKey, + requesterOrigin: parentEntry?.deliveryContext ?? childEntry?.deliveryContext, + childSessionKey: params.sessionKey, + runId: params.requestId, + label: normalizeText(childEntry?.label), + task: summarizeBackgroundTaskText(params.text), + }; + } + + private createBackgroundTaskRecord(context: BackgroundTaskContext, startedAt: number): void { + try { + createTaskRecord({ + source: "unknown", + runtime: "acp", + requesterSessionKey: context.requesterSessionKey, + requesterOrigin: context.requesterOrigin, + childSessionKey: context.childSessionKey, + runId: context.runId, + bindingTargetKind: "session", + label: context.label, + task: context.task, + status: "running", + startedAt, + }); + } catch (error) { + logVerbose( + `acp-manager: failed creating background task for ${context.runId}: ${String(error)}`, + ); + } + } + + private updateBackgroundTaskState(runId: string, patch: BackgroundTaskStatePatch): void { + try { + updateTaskStateByRunId({ + ...patch, + runId, + }); + } catch (error) { + logVerbose(`acp-manager: failed updating background task for ${runId}: ${String(error)}`); + } + } } diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index 7dc1bcb0058..b9ca4348079 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -1,8 +1,10 @@ import { setTimeout as scheduleNativeTimeout } from "node:timers"; import { setTimeout as sleep } from "node:timers/promises"; -import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import type { AcpSessionRuntimeOptions, SessionAcpMeta } from "../../config/sessions/types.js"; +import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; +import { withTempDir } from "../../test-helpers/temp-dir.js"; import type { AcpRuntime, AcpRuntimeCapabilities } from "../runtime/types.js"; const hoisted = vi.hoisted(() => { @@ -44,6 +46,7 @@ const baseCfg = { dispatch: { enabled: true }, }, } as const; +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; function createRuntime(): { runtime: AcpRuntime; @@ -168,6 +171,15 @@ describe("AcpSessionManager", () => { hoisted.requireAcpRuntimeBackendMock.mockReset(); }); + afterEach(() => { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetTaskRegistryForTests(); + }); + it("marks ACP-shaped sessions without metadata as stale", () => { hoisted.readAcpSessionEntryMock.mockReturnValue(null); const manager = new AcpSessionManager(); @@ -236,6 +248,74 @@ describe("AcpSessionManager", () => { ); }); + it("tracks parented direct ACP turns in the task registry", async () => { + await withTempDir({ prefix: "openclaw-acp-manager-task-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + const runtimeState = createRuntime(); + runtimeState.runTurn.mockImplementation(async function* () { + yield { + type: "text_delta" as const, + stream: "output" as const, + text: "Write failed: permission denied for /root/oc-acp-write-should-fail.txt.", + }; + yield { type: "done" as const }; + }); + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => { + const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey; + if (sessionKey === "agent:codex:acp:child-1") { + return { + sessionKey, + storeSessionKey: sessionKey, + entry: { + sessionId: "child-1", + updatedAt: Date.now(), + spawnedBy: "agent:quant:telegram:quant:direct:822430204", + label: "Quant patch", + }, + acp: readySessionMeta(), + }; + } + if (sessionKey === "agent:quant:telegram:quant:direct:822430204") { + return { + sessionKey, + storeSessionKey: sessionKey, + entry: { + sessionId: "parent-1", + updatedAt: Date.now(), + }, + }; + } + return null; + }); + + const manager = new AcpSessionManager(); + await manager.runTurn({ + cfg: baseCfg, + sessionKey: "agent:codex:acp:child-1", + text: "Implement the feature and report back", + mode: "prompt", + requestId: "direct-parented-run", + }); + + expect(findTaskByRunId("direct-parented-run")).toMatchObject({ + source: "unknown", + runtime: "acp", + requesterSessionKey: "agent:quant:telegram:quant:direct:822430204", + childSessionKey: "agent:codex:acp:child-1", + label: "Quant patch", + task: "Implement the feature and report back", + status: "done", + progressSummary: "Write failed: permission denied for /root/oc-acp-write-should-fail.txt.", + }); + }); + }); + it("serializes concurrent turns for the same ACP session", async () => { const runtimeState = createRuntime(); hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ diff --git a/src/acp/session-interaction-mode.ts b/src/acp/session-interaction-mode.ts new file mode 100644 index 00000000000..3ceb82090c1 --- /dev/null +++ b/src/acp/session-interaction-mode.ts @@ -0,0 +1,29 @@ +import type { SessionEntry } from "../config/sessions/types.js"; + +export type AcpSessionInteractionMode = "interactive" | "parent-owned-background"; + +type SessionInteractionEntry = Pick; + +function normalizeText(value: string | undefined): string | undefined { + const trimmed = value?.trim(); + return trimmed ? trimmed : undefined; +} + +export function resolveAcpSessionInteractionMode( + entry?: SessionInteractionEntry | null, +): AcpSessionInteractionMode { + // Parent-owned oneshot ACP sessions are background work delegated from another session. + // They should report back through the parent task notifier instead of speaking directly + // on the user-facing channel themselves. + if (entry?.acp?.mode !== "oneshot") { + return "interactive"; + } + if (normalizeText(entry.spawnedBy) || normalizeText(entry.parentSessionKey)) { + return "parent-owned-background"; + } + return "interactive"; +} + +export function isParentOwnedBackgroundAcpSession(entry?: SessionInteractionEntry | null): boolean { + return resolveAcpSessionInteractionMode(entry) === "parent-owned-background"; +} diff --git a/src/agents/acp-spawn-parent-stream.test.ts b/src/agents/acp-spawn-parent-stream.test.ts index 9592760d470..cc98436cddd 100644 --- a/src/agents/acp-spawn-parent-stream.test.ts +++ b/src/agents/acp-spawn-parent-stream.test.ts @@ -20,14 +20,25 @@ vi.mock("../infra/heartbeat-wake.js", async (importOriginal) => { ); }); -vi.mock("../acp/runtime/session-meta.js", () => ({ - readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args), -})); +vi.mock("../acp/runtime/session-meta.js", async (importOriginal) => { + return await mergeMockedModule( + await importOriginal(), + () => ({ + readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args), + }), + ); +}); -vi.mock("../config/sessions/paths.js", () => ({ - resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args), - resolveSessionFilePathOptions: (...args: unknown[]) => resolveSessionFilePathOptionsMock(...args), -})); +vi.mock("../config/sessions/paths.js", async (importOriginal) => { + return await mergeMockedModule( + await importOriginal(), + () => ({ + resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args), + resolveSessionFilePathOptions: (...args: unknown[]) => + resolveSessionFilePathOptionsMock(...args), + }), + ); +}); let emitAgentEvent: typeof import("../infra/agent-events.js").emitAgentEvent; let resolveAcpSpawnStreamLogPath: typeof import("./acp-spawn-parent-stream.js").resolveAcpSpawnStreamLogPath; @@ -48,14 +59,28 @@ async function loadFreshAcpSpawnParentStreamModulesForTest() { }), ); }); - vi.doMock("../acp/runtime/session-meta.js", () => ({ - readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args), - })); - vi.doMock("../config/sessions/paths.js", () => ({ - resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args), - resolveSessionFilePathOptions: (...args: unknown[]) => - resolveSessionFilePathOptionsMock(...args), - })); + vi.doMock("../acp/runtime/session-meta.js", async () => { + return await mergeMockedModule( + await vi.importActual( + "../acp/runtime/session-meta.js", + ), + () => ({ + readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args), + }), + ); + }); + vi.doMock("../config/sessions/paths.js", async () => { + return await mergeMockedModule( + await vi.importActual( + "../config/sessions/paths.js", + ), + () => ({ + resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args), + resolveSessionFilePathOptions: (...args: unknown[]) => + resolveSessionFilePathOptionsMock(...args), + }), + ); + }); const [agentEvents, relayModule] = await Promise.all([ import("../infra/agent-events.js"), import("./acp-spawn-parent-stream.js"), @@ -219,6 +244,39 @@ describe("startAcpSpawnParentStreamRelay", () => { relay.dispose(); }); + it("can keep background relays out of the parent session while still logging", () => { + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-quiet", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child-quiet", + agentId: "codex", + surfaceUpdates: false, + streamFlushMs: 10, + noOutputNoticeMs: 120_000, + }); + + relay.notifyStarted(); + emitAgentEvent({ + runId: "run-quiet", + stream: "assistant", + data: { + delta: "hello from child", + }, + }); + vi.advanceTimersByTime(15); + emitAgentEvent({ + runId: "run-quiet", + stream: "lifecycle", + data: { + phase: "end", + }, + }); + + expect(collectedTexts()).toEqual([]); + expect(requestHeartbeatNowMock).not.toHaveBeenCalled(); + relay.dispose(); + }); + it("preserves delta whitespace boundaries in progress relays", () => { const relay = startAcpSpawnParentStreamRelay({ runId: "run-5", diff --git a/src/agents/acp-spawn-parent-stream.ts b/src/agents/acp-spawn-parent-stream.ts index 36b113386c2..ee4deeb1f28 100644 --- a/src/agents/acp-spawn-parent-stream.ts +++ b/src/agents/acp-spawn-parent-stream.ts @@ -6,6 +6,7 @@ import { onAgentEvent } from "../infra/agent-events.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { scopedHeartbeatWakeOptions } from "../routing/session-key.js"; +import { updateTaskStateByRunId } from "../tasks/task-registry.js"; const DEFAULT_STREAM_FLUSH_MS = 2_500; const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000; @@ -79,6 +80,7 @@ export function startAcpSpawnParentStreamRelay(params: { childSessionKey: string; agentId: string; logPath?: string; + surfaceUpdates?: boolean; streamFlushMs?: number; noOutputNoticeMs?: number; noOutputPollMs?: number; @@ -178,7 +180,11 @@ export function startAcpSpawnParentStreamRelay(params: { ...fields, }); }; + const shouldSurfaceUpdates = params.surfaceUpdates !== false; const wake = () => { + if (!shouldSurfaceUpdates) { + return; + } requestHeartbeatNow( scopedHeartbeatWakeOptions(parentSessionKey, { reason: "acp:spawn:stream", @@ -191,10 +197,18 @@ export function startAcpSpawnParentStreamRelay(params: { return; } logEvent("system_event", { contextKey, text: cleaned }); + if (!shouldSurfaceUpdates) { + return; + } enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey }); wake(); }; const emitStartNotice = () => { + updateTaskStateByRunId({ + runId, + lastEventAt: Date.now(), + eventSummary: "Started.", + }); emit( `Started ${relayLabel} session ${params.childSessionKey}. Streaming progress updates to parent session.`, `${contextPrefix}:start`, @@ -257,6 +271,11 @@ export function startAcpSpawnParentStreamRelay(params: { return; } stallNotified = true; + updateTaskStateByRunId({ + runId, + lastEventAt: Date.now(), + eventSummary: `No output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for input.`, + }); emit( `${relayLabel} has produced no output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for interactive input.`, `${contextPrefix}:stall`, @@ -298,6 +317,11 @@ export function startAcpSpawnParentStreamRelay(params: { if (stallNotified) { stallNotified = false; + updateTaskStateByRunId({ + runId, + lastEventAt: Date.now(), + eventSummary: "Resumed output.", + }); emit(`${relayLabel} resumed output.`, `${contextPrefix}:resumed`); } diff --git a/src/agents/acp-spawn.test.ts b/src/agents/acp-spawn.test.ts index 1bee7537ad5..ad289b5c251 100644 --- a/src/agents/acp-spawn.test.ts +++ b/src/agents/acp-spawn.test.ts @@ -6,7 +6,8 @@ import { setRuntimeConfigSnapshot, type OpenClawConfig, } from "../config/config.js"; -import * as sessionConfig from "../config/sessions.js"; +import * as sessionPaths from "../config/sessions/paths.js"; +import * as sessionStore from "../config/sessions/store.js"; import * as sessionTranscript from "../config/sessions/transcript.js"; import * as gatewayCall from "../gateway/call.js"; import * as heartbeatWake from "../infra/heartbeat-wake.js"; @@ -17,6 +18,7 @@ import { type SessionBindingPlacement, type SessionBindingRecord, } from "../infra/outbound/session-binding-service.js"; +import { resetTaskRegistryForTests } from "../tasks/task-registry.js"; import * as acpSpawnParentStream from "./acp-spawn-parent-stream.js"; function createDefaultSpawnConfig(): OpenClawConfig { @@ -78,8 +80,8 @@ const hoisted = vi.hoisted(() => { const callGatewaySpy = vi.spyOn(gatewayCall, "callGateway"); const getAcpSessionManagerSpy = vi.spyOn(acpSessionManager, "getAcpSessionManager"); -const loadSessionStoreSpy = vi.spyOn(sessionConfig, "loadSessionStore"); -const resolveStorePathSpy = vi.spyOn(sessionConfig, "resolveStorePath"); +const loadSessionStoreSpy = vi.spyOn(sessionStore, "loadSessionStore"); +const resolveStorePathSpy = vi.spyOn(sessionPaths, "resolveStorePath"); const resolveSessionTranscriptFileSpy = vi.spyOn(sessionTranscript, "resolveSessionTranscriptFile"); const areHeartbeatsEnabledSpy = vi.spyOn(heartbeatWake, "areHeartbeatsEnabled"); const startAcpSpawnParentStreamRelaySpy = vi.spyOn( @@ -250,6 +252,7 @@ function enableLineCurrentConversationBindings(): void { describe("spawnAcpDirect", () => { beforeEach(() => { replaceSpawnConfig(createDefaultSpawnConfig()); + resetTaskRegistryForTests(); hoisted.areHeartbeatsEnabledMock.mockReset().mockReturnValue(true); hoisted.callGatewayMock.mockReset(); @@ -414,6 +417,7 @@ describe("spawnAcpDirect", () => { }); afterEach(() => { + resetTaskRegistryForTests(); sessionBindingServiceTesting.resetSessionBindingAdaptersForTests(); clearRuntimeConfigSnapshot(); }); @@ -673,15 +677,15 @@ describe("spawnAcpDirect", () => { it.each([ { - name: "inlines delivery for run-mode spawns from non-subagent requester sessions", + name: "does not inline delivery for run-mode spawns from non-subagent requester sessions", ctx: createRequesterContext(), expectedAgentCall: { - deliver: true, - channel: "telegram", - to: "telegram:6098642967", - threadId: "1", + deliver: false, + channel: undefined, + to: undefined, + threadId: undefined, } satisfies AgentCallParams, - expectTranscriptPersistence: true, + expectTranscriptPersistence: false, }, { name: "does not inline delivery for run-mode spawns from subagent requester sessions", diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index b3c5698dd0f..e0230e17ab2 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -26,8 +26,10 @@ import { import { parseDurationMs } from "../cli/parse-duration.js"; import { loadConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/config.js"; -import { loadSessionStore, resolveStorePath, type SessionEntry } from "../config/sessions.js"; +import { resolveStorePath } from "../config/sessions/paths.js"; +import { loadSessionStore } from "../config/sessions/store.js"; import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js"; +import type { SessionEntry } from "../config/sessions/types.js"; import { callGateway } from "../gateway/call.js"; import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js"; import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js"; @@ -42,6 +44,7 @@ import { normalizeAgentId, parseAgentSessionKey, } from "../routing/session-key.js"; +import { createTaskRecord } from "../tasks/task-registry.js"; import { deliveryContextFromSession, formatConversationTarget, @@ -710,15 +713,11 @@ function resolveAcpSpawnBootstrapDeliveryPlan(params: { const hasDeliveryTarget = Boolean(params.requester.origin?.channel && inferredDeliveryTo); // Thread-bound session spawns always deliver inline to their bound thread. - // Run-mode spawns use stream-to-parent when the requester is a subagent - // orchestrator with an active heartbeat relay route. For all other run-mode - // spawns from non-subagent requester sessions, fall back to inline delivery - // so the result reaches the originating channel. + // Background run-mode spawns should stay internal and report back through + // the parent task lifecycle notifier instead of letting the child ACP + // session write raw output directly into the originating channel. const useInlineDelivery = - hasDeliveryTarget && - !params.effectiveStreamToParent && - (params.spawnMode === "session" || - (!params.requester.isSubagentSession && !params.requestThreadBinding)); + hasDeliveryTarget && !params.effectiveStreamToParent && params.spawnMode === "session"; return { useInlineDelivery, @@ -953,6 +952,29 @@ export async function spawnAcpDirect( }); } parentRelay?.notifyStarted(); + try { + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: requesterInternalKey, + requesterOrigin: requesterState.origin, + childSessionKey: sessionKey, + runId: childRunId, + bindingTargetKind: "session", + label: params.label, + task: params.task, + status: "running", + deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing", + startedAt: Date.now(), + streamLogPath, + }); + } catch (error) { + log.warn("Failed to create background task for ACP spawn", { + sessionKey, + runId: childRunId, + error, + }); + } return { status: "accepted", childSessionKey: sessionKey, @@ -963,6 +985,29 @@ export async function spawnAcpDirect( }; } + try { + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: requesterInternalKey, + requesterOrigin: requesterState.origin, + childSessionKey: sessionKey, + runId: childRunId, + bindingTargetKind: "session", + label: params.label, + task: params.task, + status: "running", + deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing", + startedAt: Date.now(), + }); + } catch (error) { + log.warn("Failed to create background task for ACP spawn", { + sessionKey, + runId: childRunId, + error, + }); + } + return { status: "accepted", childSessionKey: sessionKey, diff --git a/src/agents/subagent-registry-lifecycle.ts b/src/agents/subagent-registry-lifecycle.ts index 3a02cdb4dbf..ae146141662 100644 --- a/src/agents/subagent-registry-lifecycle.ts +++ b/src/agents/subagent-registry-lifecycle.ts @@ -1,6 +1,7 @@ import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import { defaultRuntime } from "../runtime.js"; import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; +import { updateTaskDeliveryByRunId, updateTaskStateByRunId } from "../tasks/task-registry.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { captureSubagentCompletionReply, @@ -151,6 +152,10 @@ export function createSubagentRegistryLifecycleController(params: { entry: SubagentRunRecord; reason: "retry-limit" | "expiry"; }) => { + updateTaskDeliveryByRunId({ + runId: giveUpParams.runId, + deliveryStatus: "failed", + }); giveUpParams.entry.wakeOnDescendantSettle = undefined; giveUpParams.entry.fallbackFrozenResultText = undefined; giveUpParams.entry.fallbackFrozenResultCapturedAt = undefined; @@ -263,6 +268,10 @@ export function createSubagentRegistryLifecycleController(params: { return; } if (didAnnounce) { + updateTaskDeliveryByRunId({ + runId, + deliveryStatus: "delivered", + }); entry.wakeOnDescendantSettle = undefined; entry.fallbackFrozenResultText = undefined; entry.fallbackFrozenResultCapturedAt = undefined; @@ -315,6 +324,10 @@ export function createSubagentRegistryLifecycleController(params: { } if (deferredDecision.kind === "give-up") { + updateTaskDeliveryByRunId({ + runId, + deliveryStatus: "failed", + }); entry.wakeOnDescendantSettle = undefined; entry.fallbackFrozenResultText = undefined; entry.fallbackFrozenResultCapturedAt = undefined; @@ -443,6 +456,21 @@ export function createSubagentRegistryLifecycleController(params: { if (mutated) { params.persist(); } + updateTaskStateByRunId({ + runId: entry.runId, + status: + completeParams.outcome.status === "ok" + ? "done" + : completeParams.outcome.status === "timeout" + ? "timed_out" + : "failed", + startedAt: entry.startedAt, + endedAt: entry.endedAt, + lastEventAt: entry.endedAt ?? Date.now(), + error: completeParams.outcome.status === "error" ? completeParams.outcome.error : undefined, + progressSummary: entry.frozenResultText ?? undefined, + terminalSummary: null, + }); try { await persistSubagentSessionTiming(entry); diff --git a/src/agents/subagent-registry-run-manager.ts b/src/agents/subagent-registry-run-manager.ts index fbdd03fd9e4..4085ae60562 100644 --- a/src/agents/subagent-registry-run-manager.ts +++ b/src/agents/subagent-registry-run-manager.ts @@ -1,6 +1,7 @@ import { loadConfig } from "../config/config.js"; import { callGateway } from "../gateway/call.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { createTaskRecord } from "../tasks/task-registry.js"; import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js"; import type { SubagentRunOutcome } from "./subagent-announce.js"; @@ -315,6 +316,29 @@ export function createSubagentRunManager(params: { attachmentsRootDir: registerParams.attachmentsRootDir, retainAttachmentsOnKeep: registerParams.retainAttachmentsOnKeep, }); + try { + createTaskRecord({ + source: "sessions_spawn", + runtime: "subagent", + requesterSessionKey: registerParams.requesterSessionKey, + requesterOrigin, + childSessionKey: registerParams.childSessionKey, + runId: registerParams.runId, + bindingTargetKind: "subagent", + label: registerParams.label, + task: registerParams.task, + status: "running", + deliveryStatus: + registerParams.expectsCompletionMessage === false ? "not_applicable" : "pending", + startedAt: now, + lastEventAt: now, + }); + } catch (error) { + log.warn("Failed to create background task for subagent run", { + runId: registerParams.runId, + error, + }); + } params.ensureListener(); params.persist(); if (archiveAtMs) { diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index c5d1308af8b..82dda015e82 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -5,6 +5,7 @@ import type { SubagentEndReason } from "../context-engine/types.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { resetTaskRegistryForTests } from "../tasks/task-registry.js"; import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { ensureRuntimePluginsLoaded } from "./runtime-plugins.js"; import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js"; @@ -555,6 +556,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) { endedHookInFlightRunIds.clear(); clearAllPendingLifecycleErrors(); resetAnnounceQueuesForTests(); + resetTaskRegistryForTests({ persist: opts?.persist }); stopSweeper(); restoreAttempted = false; if (listenerStop) { diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index b5672b64756..795ef895b9f 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -3,7 +3,7 @@ import { loadConfig } from "../../config/config.js"; import { callGateway } from "../../gateway/call.js"; import { normalizeDeliveryContext } from "../../utils/delivery-context.js"; import type { GatewayMessageChannel } from "../../utils/message-channel.js"; -import { ACP_SPAWN_MODES, ACP_SPAWN_STREAM_TARGETS, spawnAcpDirect } from "../acp-spawn.js"; +import { spawnAcpDirect } from "../acp-spawn.js"; import { optionalStringEnum } from "../schema/typebox.js"; import type { SpawnedToolContext } from "../spawned-context.js"; import { registerSubagentRun } from "../subagent-registry.js"; @@ -18,6 +18,8 @@ import { const SESSIONS_SPAWN_RUNTIMES = ["subagent", "acp"] as const; const SESSIONS_SPAWN_SANDBOX_MODES = ["inherit", "require"] as const; +// Keep the schema local to avoid a circular import through acp-spawn/openclaw-tools. +const SESSIONS_SPAWN_ACP_STREAM_TARGETS = ["parent"] as const; const UNSUPPORTED_SESSIONS_SPAWN_PARAM_KEYS = [ "target", "transport", @@ -90,7 +92,7 @@ const SessionsSpawnToolSchema = Type.Object({ mode: optionalStringEnum(SUBAGENT_SPAWN_MODES), cleanup: optionalStringEnum(["delete", "keep"] as const), sandbox: optionalStringEnum(SESSIONS_SPAWN_SANDBOX_MODES), - streamTo: optionalStringEnum(ACP_SPAWN_STREAM_TARGETS), + streamTo: optionalStringEnum(SESSIONS_SPAWN_ACP_STREAM_TARGETS), // Inline attachments (snapshot-by-value). // NOTE: Attachment contents are redacted from transcript persistence by sanitizeToolCallInputs. @@ -205,7 +207,7 @@ export function createSessionsSpawnTool( agentId: requestedAgentId, resumeSessionId, cwd, - mode: mode && ACP_SPAWN_MODES.includes(mode) ? mode : undefined, + mode: mode === "run" || mode === "session" ? mode : undefined, thread, sandbox, streamTo, diff --git a/src/auto-reply/reply/commands-acp/runtime-options.ts b/src/auto-reply/reply/commands-acp/runtime-options.ts index 341b78f0360..6d0dc7da7a4 100644 --- a/src/auto-reply/reply/commands-acp/runtime-options.ts +++ b/src/auto-reply/reply/commands-acp/runtime-options.ts @@ -8,6 +8,7 @@ import { validateRuntimePermissionProfileInput, } from "../../../acp/control-plane/runtime-options.js"; import { resolveAcpSessionIdentifierLinesFromIdentity } from "../../../acp/runtime/session-identifiers.js"; +import { findLatestTaskForSessionKey } from "../../../tasks/task-registry.js"; import type { CommandHandlerResult, HandleCommandsParams } from "../commands-types.js"; import { ACP_CWD_USAGE, @@ -122,6 +123,7 @@ export async function handleAcpStatusAction( fallbackCode: "ACP_TURN_FAILED", fallbackMessage: "Could not read ACP session status.", onSuccess: (status) => { + const linkedTask = findLatestTaskForSessionKey(status.sessionKey); const sessionIdentifierLines = resolveAcpSessionIdentifierLinesFromIdentity({ backend: status.backend, identity: status.identity, @@ -135,6 +137,13 @@ export async function handleAcpStatusAction( ...sessionIdentifierLines, `sessionMode: ${status.mode}`, `state: ${status.state}`, + ...(linkedTask + ? [ + `taskId: ${linkedTask.taskId}`, + `taskStatus: ${linkedTask.status}`, + `delivery: ${linkedTask.deliveryStatus}`, + ] + : []), `runtimeOptions: ${formatRuntimeOptionsText(status.runtimeOptions)}`, `capabilities: ${formatAcpCapabilitiesText(status.capabilities.controls)}`, `lastActivityAt: ${new Date(status.lastActivityAt).toISOString()}`, diff --git a/src/auto-reply/reply/commands-subagents/action-info.ts b/src/auto-reply/reply/commands-subagents/action-info.ts index ef20c668198..aa97b891b01 100644 --- a/src/auto-reply/reply/commands-subagents/action-info.ts +++ b/src/auto-reply/reply/commands-subagents/action-info.ts @@ -1,6 +1,7 @@ import { countPendingDescendantRuns } from "../../../agents/subagent-registry.js"; import { loadSessionStore, resolveStorePath } from "../../../config/sessions.js"; import { formatDurationCompact } from "../../../shared/subagents-format.js"; +import { findTaskByRunId } from "../../../tasks/task-registry.js"; import type { CommandHandlerResult } from "../commands-types.js"; import { formatRunLabel } from "../subagents-utils.js"; import { @@ -36,6 +37,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command const outcome = run.outcome ? `${run.outcome.status}${run.outcome.error ? ` (${run.outcome.error})` : ""}` : "n/a"; + const linkedTask = findTaskByRunId(run.runId); const lines = [ "ℹ️ Subagent info", @@ -43,6 +45,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command `Label: ${formatRunLabel(run)}`, `Task: ${run.task}`, `Run: ${run.runId}`, + linkedTask ? `TaskId: ${linkedTask.taskId}` : undefined, `Session: ${run.childSessionKey}`, `SessionId: ${sessionEntry?.sessionId ?? "n/a"}`, `Transcript: ${sessionEntry?.sessionFile ?? "n/a"}`, @@ -54,6 +57,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command run.archiveAtMs ? `Archive: ${formatTimestampWithAge(run.archiveAtMs)}` : undefined, run.cleanupHandled ? "Cleanup handled: yes" : undefined, `Outcome: ${outcome}`, + linkedTask ? `Delivery: ${linkedTask.deliveryStatus}` : undefined, ].filter(Boolean); return stopWithText(lines.join("\n")); diff --git a/src/auto-reply/reply/dispatch-acp-delivery.test.ts b/src/auto-reply/reply/dispatch-acp-delivery.test.ts index fd0008f5602..bfca2a4ba27 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.test.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.test.ts @@ -192,4 +192,33 @@ describe("createAcpDispatchDeliveryCoordinator", () => { expect(onReplyStart).not.toHaveBeenCalled(); }); + + it("keeps parent-owned background ACP child delivery silent while preserving accumulated output", async () => { + const dispatcher = createDispatcher(); + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "telegram", + Surface: "telegram", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher, + inboundAudio: false, + suppressUserDelivery: true, + shouldRouteToOriginating: true, + originatingChannel: "telegram", + originatingTo: "telegram:123", + }); + + const blockDelivered = await coordinator.deliver("block", { text: "working on it" }); + const finalDelivered = await coordinator.deliver("final", { text: "done" }); + await coordinator.settleVisibleText(); + + expect(blockDelivered).toBe(false); + expect(finalDelivered).toBe(false); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(coordinator.getAccumulatedBlockText()).toBe("working on it"); + expect(coordinator.hasDeliveredVisibleText()).toBe(false); + }); }); diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index fe74a5093ba..67a12370176 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -79,6 +79,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { inboundAudio: boolean; sessionTtsAuto?: TtsAutoMode; ttsChannel?: string; + suppressUserDelivery?: boolean; shouldRouteToOriginating: boolean; originatingChannel?: string; originatingTo?: string; @@ -184,6 +185,10 @@ export function createAcpDispatchDeliveryCoordinator(params: { await startReplyLifecycleOnce(); } + if (params.suppressUserDelivery) { + return false; + } + const ttsPayload = meta?.skipTts ? payload : await maybeApplyTtsToPayload({ diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 006b63ed211..2be7d1913a9 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -315,6 +315,7 @@ export async function tryDispatchAcpReply(params: { inboundAudio: boolean; sessionTtsAuto?: TtsAutoMode; ttsChannel?: string; + suppressUserDelivery?: boolean; shouldRouteToOriginating: boolean; originatingChannel?: string; originatingTo?: string; @@ -347,6 +348,7 @@ export async function tryDispatchAcpReply(params: { inboundAudio: params.inboundAudio, sessionTtsAuto: params.sessionTtsAuto, ttsChannel: params.ttsChannel, + suppressUserDelivery: params.suppressUserDelivery, shouldRouteToOriginating: params.shouldRouteToOriginating, originatingChannel: params.originatingChannel, originatingTo: params.originatingTo, @@ -357,6 +359,7 @@ export async function tryDispatchAcpReply(params: { resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined), ); const shouldEmitResolvedIdentityNotice = + !params.suppressUserDelivery && identityPendingBeforeTurn && (Boolean(params.ctx.MessageThreadId != null && String(params.ctx.MessageThreadId).trim()) || hasBoundConversationForSession({ diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index b963633ca29..a6f3ee3834b 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -1,4 +1,5 @@ import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; +import { isParentOwnedBackgroundAcpSession } from "../../acp/session-interaction-mode.js"; import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveConversationBindingRecord, @@ -39,11 +40,12 @@ import { import { getGlobalHookRunner, getGlobalPluginRegistry } from "../../plugins/hook-runner-global.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { normalizeTtsAutoMode, resolveConfiguredTtsMode } from "../../tts/tts-config.js"; -import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js"; +import { normalizeMessageChannel } from "../../utils/message-channel.js"; import type { FinalizedMsgContext } from "../templating.js"; import type { BlockReplyContext, GetReplyOptions, ReplyPayload } from "../types.js"; import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; +import { resolveReplyRoutingDecision } from "./routing-policy.js"; import { resolveRunTypingPolicy } from "./typing-policy.js"; let routeReplyRuntimePromise: Promise | null = null; @@ -247,25 +249,19 @@ export async function dispatchReplyFromConfig(params: { // flow when the provider handles its own messages. // // Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts` - const originatingChannel = normalizeMessageChannel(ctx.OriginatingChannel); - const originatingTo = ctx.OriginatingTo; - const providerChannel = normalizeMessageChannel(ctx.Provider); - const surfaceChannel = normalizeMessageChannel(ctx.Surface); - // Prefer provider channel because surface may carry origin metadata in relayed flows. - const currentSurface = providerChannel ?? surfaceChannel; - const isInternalWebchatTurn = - currentSurface === INTERNAL_MESSAGE_CHANNEL && - (surfaceChannel === INTERNAL_MESSAGE_CHANNEL || !surfaceChannel) && - ctx.ExplicitDeliverRoute !== true; + const suppressAcpChildUserDelivery = isParentOwnedBackgroundAcpSession(sessionStoreEntry.entry); const routeReplyRuntime = await loadRouteReplyRuntime(); - const shouldRouteToOriginating = Boolean( - !isInternalWebchatTurn && - routeReplyRuntime.isRoutableChannel(originatingChannel) && - originatingTo && - originatingChannel !== currentSurface, - ); - const shouldSuppressTyping = - shouldRouteToOriginating || originatingChannel === INTERNAL_MESSAGE_CHANNEL; + const { originatingChannel, currentSurface, shouldRouteToOriginating, shouldSuppressTyping } = + resolveReplyRoutingDecision({ + provider: ctx.Provider, + surface: ctx.Surface, + explicitDeliverRoute: ctx.ExplicitDeliverRoute, + originatingChannel: ctx.OriginatingChannel, + originatingTo: ctx.OriginatingTo, + suppressDirectUserDelivery: suppressAcpChildUserDelivery, + isRoutableChannel: routeReplyRuntime.isRoutableChannel, + }); + const originatingTo = ctx.OriginatingTo; const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface; /** @@ -601,6 +597,7 @@ export async function dispatchReplyFromConfig(params: { inboundAudio, sessionTtsAuto, ttsChannel, + suppressUserDelivery: suppressAcpChildUserDelivery, shouldRouteToOriginating, originatingChannel, originatingTo, diff --git a/src/auto-reply/reply/routing-policy.test.ts b/src/auto-reply/reply/routing-policy.test.ts new file mode 100644 index 00000000000..f5f9d5004b5 --- /dev/null +++ b/src/auto-reply/reply/routing-policy.test.ts @@ -0,0 +1,62 @@ +import { describe, expect, it } from "vitest"; +import { resolveReplyRoutingDecision } from "./routing-policy.js"; + +function isRoutableChannel(channel: string | undefined) { + return Boolean( + channel && + ["telegram", "slack", "discord", "signal", "imessage", "whatsapp", "feishu"].includes(channel), + ); +} + +describe("resolveReplyRoutingDecision", () => { + it("routes replies to the originating channel when the current provider differs", () => { + expect( + resolveReplyRoutingDecision({ + provider: "slack", + surface: "slack", + originatingChannel: "telegram", + originatingTo: "telegram:123", + isRoutableChannel, + }), + ).toMatchObject({ + originatingChannel: "telegram", + currentSurface: "slack", + shouldRouteToOriginating: true, + shouldSuppressTyping: true, + }); + }); + + it("does not route external replies from internal webchat without explicit delivery", () => { + expect( + resolveReplyRoutingDecision({ + provider: "webchat", + surface: "webchat", + explicitDeliverRoute: false, + originatingChannel: "telegram", + originatingTo: "telegram:123", + isRoutableChannel, + }), + ).toMatchObject({ + currentSurface: "webchat", + isInternalWebchatTurn: true, + shouldRouteToOriginating: false, + }); + }); + + it("suppresses direct user delivery for parent-owned background ACP children", () => { + expect( + resolveReplyRoutingDecision({ + provider: "discord", + surface: "discord", + originatingChannel: "telegram", + originatingTo: "telegram:123", + suppressDirectUserDelivery: true, + isRoutableChannel, + }), + ).toMatchObject({ + currentSurface: "discord", + shouldRouteToOriginating: false, + shouldSuppressTyping: true, + }); + }); +}); diff --git a/src/auto-reply/reply/routing-policy.ts b/src/auto-reply/reply/routing-policy.ts new file mode 100644 index 00000000000..cf3ab17013d --- /dev/null +++ b/src/auto-reply/reply/routing-policy.ts @@ -0,0 +1,37 @@ +import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js"; + +export function resolveReplyRoutingDecision(params: { + provider?: string; + surface?: string; + explicitDeliverRoute?: boolean; + originatingChannel?: string; + originatingTo?: string; + suppressDirectUserDelivery?: boolean; + isRoutableChannel: (channel: string | undefined) => boolean; +}) { + const originatingChannel = normalizeMessageChannel(params.originatingChannel); + const providerChannel = normalizeMessageChannel(params.provider); + const surfaceChannel = normalizeMessageChannel(params.surface); + const currentSurface = providerChannel ?? surfaceChannel; + const isInternalWebchatTurn = + currentSurface === INTERNAL_MESSAGE_CHANNEL && + (surfaceChannel === INTERNAL_MESSAGE_CHANNEL || !surfaceChannel) && + params.explicitDeliverRoute !== true; + const shouldRouteToOriginating = Boolean( + !params.suppressDirectUserDelivery && + !isInternalWebchatTurn && + params.isRoutableChannel(originatingChannel) && + params.originatingTo && + originatingChannel !== currentSurface, + ); + return { + originatingChannel, + currentSurface, + isInternalWebchatTurn, + shouldRouteToOriginating, + shouldSuppressTyping: + params.suppressDirectUserDelivery === true || + shouldRouteToOriginating || + originatingChannel === INTERNAL_MESSAGE_CHANNEL, + }; +} diff --git a/src/cli/program/command-registry.test.ts b/src/cli/program/command-registry.test.ts index 3451ce64907..7736f67fcfe 100644 --- a/src/cli/program/command-registry.test.ts +++ b/src/cli/program/command-registry.test.ts @@ -39,6 +39,8 @@ vi.mock("./register.status-health-sessions.js", () => ({ program.command("status"); program.command("health"); program.command("sessions"); + const tasks = program.command("tasks"); + tasks.command("show"); }, })); @@ -75,6 +77,7 @@ describe("command-registry", () => { expect(names).toContain("agents"); expect(names).toContain("backup"); expect(names).toContain("sessions"); + expect(names).toContain("tasks"); expect(names).not.toContain("agent"); expect(names).not.toContain("status"); expect(names).not.toContain("doctor"); @@ -139,6 +142,7 @@ describe("command-registry", () => { expect(names).toContain("status"); expect(names).toContain("health"); expect(names).toContain("sessions"); + expect(names).toContain("tasks"); }); it("replaces placeholders when loading a grouped entry by secondary command name", async () => { diff --git a/src/cli/program/command-registry.ts b/src/cli/program/command-registry.ts index 11980d3e3d9..d016380d1ef 100644 --- a/src/cli/program/command-registry.ts +++ b/src/cli/program/command-registry.ts @@ -197,6 +197,11 @@ const coreEntries: CoreCliEntry[] = [ description: "List stored conversation sessions", hasSubcommands: true, }, + { + name: "tasks", + description: "Inspect durable background task state", + hasSubcommands: true, + }, ], register: async ({ program }) => { const mod = await import("./register.status-health-sessions.js"); diff --git a/src/cli/program/core-command-descriptors.ts b/src/cli/program/core-command-descriptors.ts index e36fca0ad24..302c086fc4e 100644 --- a/src/cli/program/core-command-descriptors.ts +++ b/src/cli/program/core-command-descriptors.ts @@ -81,6 +81,11 @@ export const CORE_CLI_COMMAND_DESCRIPTORS = [ description: "List stored conversation sessions", hasSubcommands: true, }, + { + name: "tasks", + description: "Inspect durable background task state", + hasSubcommands: true, + }, ] as const satisfies ReadonlyArray; export function getCoreCliCommandDescriptors(): ReadonlyArray { diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index 4d8e1d4d7ed..84661ecc76b 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -6,6 +6,10 @@ const statusCommand = vi.fn(); const healthCommand = vi.fn(); const sessionsCommand = vi.fn(); const sessionsCleanupCommand = vi.fn(); +const tasksListCommand = vi.fn(); +const tasksShowCommand = vi.fn(); +const tasksNotifyCommand = vi.fn(); +const tasksCancelCommand = vi.fn(); const setVerbose = vi.fn(); const { defaultRuntime: runtime, resetRuntimeCapture } = createCliRuntimeCapture(); @@ -26,6 +30,13 @@ vi.mock("../../commands/sessions-cleanup.js", () => ({ sessionsCleanupCommand, })); +vi.mock("../../commands/tasks.js", () => ({ + tasksListCommand, + tasksShowCommand, + tasksNotifyCommand, + tasksCancelCommand, +})); + vi.mock("../../globals.js", () => ({ setVerbose, })); @@ -55,6 +66,10 @@ describe("registerStatusHealthSessionsCommands", () => { healthCommand.mockResolvedValue(undefined); sessionsCommand.mockResolvedValue(undefined); sessionsCleanupCommand.mockResolvedValue(undefined); + tasksListCommand.mockResolvedValue(undefined); + tasksShowCommand.mockResolvedValue(undefined); + tasksNotifyCommand.mockResolvedValue(undefined); + tasksCancelCommand.mockResolvedValue(undefined); }); it("runs status command with timeout and debug-derived verbose", async () => { @@ -201,4 +216,52 @@ describe("registerStatusHealthSessionsCommands", () => { runtime, ); }); + + it("runs tasks list from the parent command", async () => { + await runCli(["tasks", "--json", "--runtime", "acp", "--status", "running"]); + + expect(tasksListCommand).toHaveBeenCalledWith( + expect.objectContaining({ + json: true, + runtime: "acp", + status: "running", + }), + runtime, + ); + }); + + it("runs tasks show subcommand with lookup forwarding", async () => { + await runCli(["tasks", "show", "run-123", "--json"]); + + expect(tasksShowCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "run-123", + json: true, + }), + runtime, + ); + }); + + it("runs tasks notify subcommand with lookup and policy forwarding", async () => { + await runCli(["tasks", "notify", "run-123", "state_changes"]); + + expect(tasksNotifyCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "run-123", + notify: "state_changes", + }), + runtime, + ); + }); + + it("runs tasks cancel subcommand with lookup forwarding", async () => { + await runCli(["tasks", "cancel", "run-123"]); + + expect(tasksCancelCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "run-123", + }), + runtime, + ); + }); }); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 3a3d81abcf3..4ffc8cb9eac 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -3,6 +3,12 @@ import { healthCommand } from "../../commands/health.js"; import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js"; import { sessionsCommand } from "../../commands/sessions.js"; import { statusCommand } from "../../commands/status.js"; +import { + tasksCancelCommand, + tasksListCommand, + tasksNotifyCommand, + tasksShowCommand, +} from "../../commands/tasks.js"; import { setVerbose } from "../../globals.js"; import { defaultRuntime } from "../../runtime.js"; import { formatDocsLink } from "../../terminal/links.js"; @@ -213,4 +219,106 @@ export function registerStatusHealthSessionsCommands(program: Command) { ); }); }); + + const tasksCmd = program + .command("tasks") + .description("Inspect durable background task state") + .option("--json", "Output as JSON", false) + .option("--runtime ", "Filter by runtime (subagent, acp, cli)") + .option( + "--status ", + "Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)", + ) + .action(async (opts) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await tasksListCommand( + { + json: Boolean(opts.json), + runtime: opts.runtime as string | undefined, + status: opts.status as string | undefined, + }, + defaultRuntime, + ); + }); + }); + tasksCmd.enablePositionalOptions(); + + tasksCmd + .command("list") + .description("List tracked background tasks") + .option("--json", "Output as JSON", false) + .option("--runtime ", "Filter by runtime (subagent, acp, cli)") + .option( + "--status ", + "Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)", + ) + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as + | { + json?: boolean; + runtime?: string; + status?: string; + } + | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await tasksListCommand( + { + json: Boolean(opts.json || parentOpts?.json), + runtime: (opts.runtime as string | undefined) ?? parentOpts?.runtime, + status: (opts.status as string | undefined) ?? parentOpts?.status, + }, + defaultRuntime, + ); + }); + }); + + tasksCmd + .command("show") + .description("Show one background task by task id, run id, or session key") + .argument("", "Task id, run id, or session key") + .option("--json", "Output as JSON", false) + .action(async (lookup, opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await tasksShowCommand( + { + lookup, + json: Boolean(opts.json || parentOpts?.json), + }, + defaultRuntime, + ); + }); + }); + + tasksCmd + .command("notify") + .description("Set task notify policy") + .argument("", "Task id, run id, or session key") + .argument("", "Notify policy (done_only, state_changes, silent)") + .action(async (lookup, notify) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await tasksNotifyCommand( + { + lookup, + notify: notify as "done_only" | "state_changes" | "silent", + }, + defaultRuntime, + ); + }); + }); + + tasksCmd + .command("cancel") + .description("Cancel a running background task") + .argument("", "Task id, run id, or session key") + .action(async (lookup) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await tasksCancelCommand( + { + lookup, + }, + defaultRuntime, + ); + }); + }); } diff --git a/src/cli/run-main.exit.test.ts b/src/cli/run-main.exit.test.ts index 11d8ef7d5ad..12a2c72ff1e 100644 --- a/src/cli/run-main.exit.test.ts +++ b/src/cli/run-main.exit.test.ts @@ -8,6 +8,8 @@ const ensurePathMock = vi.hoisted(() => vi.fn()); const assertRuntimeMock = vi.hoisted(() => vi.fn()); const closeActiveMemorySearchManagersMock = vi.hoisted(() => vi.fn(async () => {})); const hasMemoryRuntimeMock = vi.hoisted(() => vi.fn(() => false)); +const ensureTaskRegistryReadyMock = vi.hoisted(() => vi.fn()); +const startTaskRegistryMaintenanceMock = vi.hoisted(() => vi.fn()); const outputRootHelpMock = vi.hoisted(() => vi.fn()); const buildProgramMock = vi.hoisted(() => vi.fn()); const maybeRunCliInContainerMock = vi.hoisted(() => @@ -49,6 +51,14 @@ vi.mock("../plugins/memory-state.js", () => ({ hasMemoryRuntime: hasMemoryRuntimeMock, })); +vi.mock("../tasks/task-registry.js", () => ({ + ensureTaskRegistryReady: ensureTaskRegistryReadyMock, +})); + +vi.mock("../tasks/task-registry.maintenance.js", () => ({ + startTaskRegistryMaintenance: startTaskRegistryMaintenanceMock, +})); + vi.mock("./program/root-help.js", () => ({ outputRootHelp: outputRootHelpMock, })); @@ -76,6 +86,8 @@ describe("runCli exit behavior", () => { expect(maybeRunCliInContainerMock).toHaveBeenCalledWith(["node", "openclaw", "status"]); expect(tryRouteCliMock).toHaveBeenCalledWith(["node", "openclaw", "status"]); expect(closeActiveMemorySearchManagersMock).not.toHaveBeenCalled(); + expect(ensureTaskRegistryReadyMock).not.toHaveBeenCalled(); + expect(startTaskRegistryMaintenanceMock).not.toHaveBeenCalled(); expect(exitSpy).not.toHaveBeenCalled(); exitSpy.mockRestore(); }); diff --git a/src/commands/tasks.test.ts b/src/commands/tasks.test.ts new file mode 100644 index 00000000000..58fad7080f0 --- /dev/null +++ b/src/commands/tasks.test.ts @@ -0,0 +1,139 @@ +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { createCliRuntimeCapture } from "../cli/test-runtime-capture.js"; + +const reconcileInspectableTasksMock = vi.fn(); +const reconcileTaskLookupTokenMock = vi.fn(); +const updateTaskNotifyPolicyByIdMock = vi.fn(); +const cancelTaskByIdMock = vi.fn(); +const getTaskByIdMock = vi.fn(); +const loadConfigMock = vi.fn(() => ({ loaded: true })); + +vi.mock("../tasks/task-registry.reconcile.js", () => ({ + reconcileInspectableTasks: (...args: unknown[]) => reconcileInspectableTasksMock(...args), + reconcileTaskLookupToken: (...args: unknown[]) => reconcileTaskLookupTokenMock(...args), +})); + +vi.mock("../tasks/task-registry.js", () => ({ + updateTaskNotifyPolicyById: (...args: unknown[]) => updateTaskNotifyPolicyByIdMock(...args), + cancelTaskById: (...args: unknown[]) => cancelTaskByIdMock(...args), + getTaskById: (...args: unknown[]) => getTaskByIdMock(...args), +})); + +vi.mock("../config/config.js", () => ({ + loadConfig: () => loadConfigMock(), +})); + +const { + defaultRuntime: runtime, + runtimeLogs, + runtimeErrors, + resetRuntimeCapture, +} = createCliRuntimeCapture(); + +let tasksListCommand: typeof import("./tasks.js").tasksListCommand; +let tasksShowCommand: typeof import("./tasks.js").tasksShowCommand; +let tasksNotifyCommand: typeof import("./tasks.js").tasksNotifyCommand; +let tasksCancelCommand: typeof import("./tasks.js").tasksCancelCommand; + +const taskFixture = { + taskId: "task-12345678", + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child", + runId: "run-12345678", + task: "Create a file", + status: "running", + deliveryStatus: "pending", + notifyPolicy: "state_changes", + createdAt: Date.parse("2026-03-29T10:00:00.000Z"), + lastEventAt: Date.parse("2026-03-29T10:00:10.000Z"), + progressSummary: "No output for 60s. It may be waiting for input.", + recentEvents: [ + { + at: Date.parse("2026-03-29T10:00:10.000Z"), + kind: "progress", + summary: "No output for 60s. It may be waiting for input.", + }, + ], +} as const; + +beforeAll(async () => { + ({ tasksListCommand, tasksShowCommand, tasksNotifyCommand, tasksCancelCommand } = + await import("./tasks.js")); +}); + +describe("tasks commands", () => { + beforeEach(() => { + vi.clearAllMocks(); + resetRuntimeCapture(); + reconcileInspectableTasksMock.mockReturnValue([]); + reconcileTaskLookupTokenMock.mockReturnValue(undefined); + updateTaskNotifyPolicyByIdMock.mockReturnValue(undefined); + cancelTaskByIdMock.mockResolvedValue({ found: false, cancelled: false, reason: "missing" }); + getTaskByIdMock.mockReturnValue(undefined); + }); + + it("lists task rows with progress summary fallback", async () => { + reconcileInspectableTasksMock.mockReturnValue([taskFixture]); + + await tasksListCommand({ runtime: "acp", status: "running" }, runtime); + + expect(runtimeLogs[0]).toContain("Background tasks: 1"); + expect(runtimeLogs.join("\n")).toContain("No output for 60s. It may be waiting for input."); + }); + + it("shows detailed task fields including notify and recent events", async () => { + reconcileTaskLookupTokenMock.mockReturnValue(taskFixture); + + await tasksShowCommand({ lookup: "run-12345678" }, runtime); + + expect(runtimeLogs.join("\n")).toContain("notify: state_changes"); + expect(runtimeLogs.join("\n")).toContain( + "progressSummary: No output for 60s. It may be waiting for input.", + ); + expect(runtimeLogs.join("\n")).toContain("recentEvent[0]: 2026-03-29T10:00:10.000Z progress"); + }); + + it("updates notify policy for an existing task", async () => { + reconcileTaskLookupTokenMock.mockReturnValue(taskFixture); + updateTaskNotifyPolicyByIdMock.mockReturnValue({ + ...taskFixture, + notifyPolicy: "silent", + }); + + await tasksNotifyCommand({ lookup: "run-12345678", notify: "silent" }, runtime); + + expect(updateTaskNotifyPolicyByIdMock).toHaveBeenCalledWith({ + taskId: "task-12345678", + notifyPolicy: "silent", + }); + expect(runtimeLogs[0]).toContain("Updated task-12345678 notify policy to silent."); + }); + + it("cancels a running task and reports the updated runtime", async () => { + reconcileTaskLookupTokenMock.mockReturnValue(taskFixture); + cancelTaskByIdMock.mockResolvedValue({ + found: true, + cancelled: true, + task: { + ...taskFixture, + status: "cancelled", + }, + }); + getTaskByIdMock.mockReturnValue({ + ...taskFixture, + status: "cancelled", + }); + + await tasksCancelCommand({ lookup: "run-12345678" }, runtime); + + expect(loadConfigMock).toHaveBeenCalled(); + expect(cancelTaskByIdMock).toHaveBeenCalledWith({ + cfg: { loaded: true }, + taskId: "task-12345678", + }); + expect(runtimeLogs[0]).toContain("Cancelled task-12345678 (acp) run run-12345678."); + expect(runtimeErrors).toEqual([]); + }); +}); diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts new file mode 100644 index 00000000000..fb5877c250d --- /dev/null +++ b/src/commands/tasks.ts @@ -0,0 +1,237 @@ +import { loadConfig } from "../config/config.js"; +import { info } from "../globals.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { cancelTaskById, getTaskById, updateTaskNotifyPolicyById } from "../tasks/task-registry.js"; +import { + reconcileInspectableTasks, + reconcileTaskLookupToken, +} from "../tasks/task-registry.reconcile.js"; +import type { TaskNotifyPolicy, TaskRecord } from "../tasks/task-registry.types.js"; +import { isRich, theme } from "../terminal/theme.js"; + +const RUNTIME_PAD = 8; +const STATUS_PAD = 10; +const DELIVERY_PAD = 14; +const ID_PAD = 10; +const RUN_PAD = 10; + +function truncate(value: string, maxChars: number) { + if (value.length <= maxChars) { + return value; + } + if (maxChars <= 1) { + return value.slice(0, maxChars); + } + return `${value.slice(0, maxChars - 1)}…`; +} + +function shortToken(value: string | undefined, maxChars = ID_PAD): string { + const trimmed = value?.trim(); + if (!trimmed) { + return "n/a"; + } + return truncate(trimmed, maxChars); +} + +function formatTaskStatusCell(status: string, rich: boolean) { + const padded = status.padEnd(STATUS_PAD); + if (!rich) { + return padded; + } + if (status === "done") { + return theme.success(padded); + } + if (status === "failed" || status === "lost" || status === "timed_out") { + return theme.error(padded); + } + if (status === "running") { + return theme.accentBright(padded); + } + return theme.muted(padded); +} + +function formatTaskRows(tasks: TaskRecord[], rich: boolean) { + const header = [ + "Task".padEnd(ID_PAD), + "Runtime".padEnd(RUNTIME_PAD), + "Status".padEnd(STATUS_PAD), + "Delivery".padEnd(DELIVERY_PAD), + "Run".padEnd(RUN_PAD), + "Child Session", + "Summary", + ].join(" "); + const lines = [rich ? theme.heading(header) : header]; + for (const task of tasks) { + const summary = truncate( + task.terminalSummary?.trim() || + task.progressSummary?.trim() || + task.label?.trim() || + task.task.trim(), + 80, + ); + const line = [ + shortToken(task.taskId).padEnd(ID_PAD), + task.runtime.padEnd(RUNTIME_PAD), + formatTaskStatusCell(task.status, rich), + task.deliveryStatus.padEnd(DELIVERY_PAD), + shortToken(task.runId, RUN_PAD).padEnd(RUN_PAD), + truncate(task.childSessionKey?.trim() || "n/a", 36).padEnd(36), + summary, + ].join(" "); + lines.push(line.trimEnd()); + } + return lines; +} + +export async function tasksListCommand( + opts: { json?: boolean; runtime?: string; status?: string }, + runtime: RuntimeEnv, +) { + const runtimeFilter = opts.runtime?.trim(); + const statusFilter = opts.status?.trim(); + const tasks = reconcileInspectableTasks().filter((task) => { + if (runtimeFilter && task.runtime !== runtimeFilter) { + return false; + } + if (statusFilter && task.status !== statusFilter) { + return false; + } + return true; + }); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + count: tasks.length, + runtime: runtimeFilter ?? null, + status: statusFilter ?? null, + tasks, + }, + null, + 2, + ), + ); + return; + } + + runtime.log(info(`Background tasks: ${tasks.length}`)); + if (runtimeFilter) { + runtime.log(info(`Runtime filter: ${runtimeFilter}`)); + } + if (statusFilter) { + runtime.log(info(`Status filter: ${statusFilter}`)); + } + if (tasks.length === 0) { + runtime.log("No background tasks found."); + return; + } + const rich = isRich(); + for (const line of formatTaskRows(tasks, rich)) { + runtime.log(line); + } +} + +export async function tasksShowCommand( + opts: { json?: boolean; lookup: string }, + runtime: RuntimeEnv, +) { + const task = reconcileTaskLookupToken(opts.lookup); + if (!task) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + + if (opts.json) { + runtime.log(JSON.stringify(task, null, 2)); + return; + } + + const lines = [ + "Background task:", + `taskId: ${task.taskId}`, + `runtime: ${task.runtime}`, + `status: ${task.status}`, + `delivery: ${task.deliveryStatus}`, + `notify: ${task.notifyPolicy}`, + `source: ${task.source}`, + `requesterSessionKey: ${task.requesterSessionKey}`, + `childSessionKey: ${task.childSessionKey ?? "n/a"}`, + `runId: ${task.runId ?? "n/a"}`, + `bindingTargetKind: ${task.bindingTargetKind ?? "n/a"}`, + `label: ${task.label ?? "n/a"}`, + `task: ${task.task}`, + `createdAt: ${new Date(task.createdAt).toISOString()}`, + `startedAt: ${task.startedAt ? new Date(task.startedAt).toISOString() : "n/a"}`, + `endedAt: ${task.endedAt ? new Date(task.endedAt).toISOString() : "n/a"}`, + `lastEventAt: ${task.lastEventAt ? new Date(task.lastEventAt).toISOString() : "n/a"}`, + ...(task.error ? [`error: ${task.error}`] : []), + ...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []), + ...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []), + ...(task.recentEvents?.length + ? task.recentEvents.map( + (event, index) => + `recentEvent[${index}]: ${new Date(event.at).toISOString()} ${event.kind}${ + event.summary ? ` ${event.summary}` : "" + }`, + ) + : []), + ...(task.streamLogPath ? [`streamLogPath: ${task.streamLogPath}`] : []), + ...(task.transcriptPath ? [`transcriptPath: ${task.transcriptPath}`] : []), + ...(task.agentSessionId ? [`agentSessionId: ${task.agentSessionId}`] : []), + ...(task.backendSessionId ? [`backendSessionId: ${task.backendSessionId}`] : []), + ]; + for (const line of lines) { + runtime.log(line); + } +} + +export async function tasksNotifyCommand( + opts: { lookup: string; notify: TaskNotifyPolicy }, + runtime: RuntimeEnv, +) { + const task = reconcileTaskLookupToken(opts.lookup); + if (!task) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const updated = updateTaskNotifyPolicyById({ + taskId: task.taskId, + notifyPolicy: opts.notify, + }); + if (!updated) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + runtime.log(`Updated ${updated.taskId} notify policy to ${updated.notifyPolicy}.`); +} + +export async function tasksCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) { + const task = reconcileTaskLookupToken(opts.lookup); + if (!task) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const result = await cancelTaskById({ + cfg: loadConfig(), + taskId: task.taskId, + }); + if (!result.found) { + runtime.error(result.reason ?? `Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + if (!result.cancelled) { + runtime.error(result.reason ?? `Could not cancel task: ${opts.lookup}`); + runtime.exit(1); + return; + } + const updated = getTaskById(task.taskId); + runtime.log( + `Cancelled ${updated?.taskId ?? task.taskId} (${updated?.runtime ?? task.runtime})${updated?.runId ? ` run ${updated.runId}` : ""}.`, + ); +} diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 504011a6e00..d7ab6d23956 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -1,9 +1,13 @@ -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { BARE_SESSION_RESET_PROMPT } from "../../auto-reply/reply/session-reset-prompt.js"; +import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; +import { withTempDir } from "../../test-helpers/temp-dir.js"; import { agentHandlers } from "./agent.js"; import { expectSubagentFollowupReactivation } from "./subagent-followup.test-helpers.js"; import type { GatewayRequestContext } from "./types.js"; +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; + const mocks = vi.hoisted(() => ({ loadSessionEntry: vi.fn(), loadGatewaySessionRow: vi.fn(), @@ -302,6 +306,15 @@ async function invokeAgentIdentityGet( } describe("gateway agent handler", () => { + afterEach(() => { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetTaskRegistryForTests(); + }); + it("preserves ACP metadata from the current stored session entry", async () => { const existingAcpMeta = { backend: "acpx", @@ -817,6 +830,30 @@ describe("gateway agent handler", () => { expect(callArgs.runContext?.messageChannel).toBe("webchat"); }); + it("tracks async gateway agent runs in the shared task registry", async () => { + await withTempDir({ prefix: "openclaw-gateway-agent-task-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + primeMainAgentRun(); + + await invokeAgent( + { + message: "background cli task", + sessionKey: "agent:main:main", + idempotencyKey: "task-registry-agent-run", + }, + { reqId: "task-registry-agent-run" }, + ); + + expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({ + source: "background_cli", + runtime: "cli", + childSessionKey: "agent:main:main", + status: "running", + }); + }); + }); + it("handles missing cliSessionIds gracefully", async () => { mockMainSessionEntry({}); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 7f5c18f8006..99dd572b5cb 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -26,7 +26,11 @@ import { classifySessionKeyShape, normalizeAgentId } from "../../routing/session import { defaultRuntime } from "../../runtime.js"; import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; -import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; +import { createTaskRecord } from "../../tasks/task-registry.js"; +import { + normalizeDeliveryContext, + normalizeSessionDeliveryFields, +} from "../../utils/delivery-context.js"; import { INTERNAL_MESSAGE_CHANNEL, isDeliverableMessageChannel, @@ -184,6 +188,30 @@ function dispatchAgentRunFromGateway(params: { respond: GatewayRequestHandlerOptions["respond"]; context: GatewayRequestHandlerOptions["context"]; }) { + if (params.ingressOpts.sessionKey?.trim()) { + try { + createTaskRecord({ + source: "background_cli", + runtime: "cli", + requesterSessionKey: params.ingressOpts.sessionKey, + requesterOrigin: normalizeDeliveryContext({ + channel: params.ingressOpts.channel, + to: params.ingressOpts.to, + accountId: params.ingressOpts.accountId, + threadId: params.ingressOpts.threadId, + }), + childSessionKey: params.ingressOpts.sessionKey, + runId: params.runId, + bindingTargetKind: "session", + task: params.ingressOpts.message, + status: "running", + deliveryStatus: "not_applicable", + startedAt: Date.now(), + }); + } catch { + // Best-effort only: background task tracking must not block agent runs. + } + } void agentCommandFromIngress(params.ingressOpts, defaultRuntime, params.context.deps) .then((result) => { const payload = { diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 637f5fadf0b..895936aea98 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -73,6 +73,7 @@ import { } from "../secrets/runtime.js"; import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js"; import { runSetupWizard } from "../wizard/setup.js"; import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js"; import { startChannelHealthMonitor } from "./channel-health-monitor.js"; @@ -880,6 +881,7 @@ export async function startGatewayServer( }); if (!minimalTestGateway) { + startTaskRegistryMaintenance(); ({ tickInterval, healthInterval, dedupeCleanup, mediaCleanup } = startGatewayMaintenanceTimers({ broadcast, diff --git a/src/tasks/task-registry-delivery-runtime.ts b/src/tasks/task-registry-delivery-runtime.ts new file mode 100644 index 00000000000..0ffc31c2431 --- /dev/null +++ b/src/tasks/task-registry-delivery-runtime.ts @@ -0,0 +1 @@ +export { sendMessage } from "../infra/outbound/message.js"; diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts new file mode 100644 index 00000000000..2685761e0f8 --- /dev/null +++ b/src/tasks/task-registry.maintenance.ts @@ -0,0 +1,173 @@ +import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; +import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import { parseAgentSessionKey } from "../routing/session-key.js"; +import { + deleteTaskRecordById, + ensureTaskRegistryReady, + getTaskById, + listTaskRecords, + maybeDeliverTaskTerminalUpdate, + resolveTaskForLookupToken, + updateTaskRecordById, +} from "./task-registry.js"; +import type { TaskRecord } from "./task-registry.types.js"; + +const TASK_RECONCILE_GRACE_MS = 5 * 60_000; +const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; +const TASK_SWEEP_INTERVAL_MS = 60_000; + +let sweeper: NodeJS.Timeout | null = null; + +function findSessionEntryByKey(store: Record, sessionKey: string): unknown { + const direct = store[sessionKey]; + if (direct) { + return direct; + } + const normalized = sessionKey.toLowerCase(); + for (const [key, entry] of Object.entries(store)) { + if (key.toLowerCase() === normalized) { + return entry; + } + } + return undefined; +} + +function isActiveTask(task: TaskRecord): boolean { + return task.status === "accepted" || task.status === "running"; +} + +function isTerminalTask(task: TaskRecord): boolean { + return !isActiveTask(task); +} + +function hasLostGraceExpired(task: TaskRecord, now: number): boolean { + const referenceAt = task.lastEventAt ?? task.startedAt ?? task.createdAt; + return now - referenceAt >= TASK_RECONCILE_GRACE_MS; +} + +function hasBackingSession(task: TaskRecord): boolean { + const childSessionKey = task.childSessionKey?.trim(); + if (!childSessionKey) { + return true; + } + if (task.runtime === "acp") { + const acpEntry = readAcpSessionEntry({ + sessionKey: childSessionKey, + }); + if (!acpEntry || acpEntry.storeReadFailed) { + return true; + } + return Boolean(acpEntry.entry); + } + if (task.runtime === "subagent" || task.runtime === "cli") { + const agentId = parseAgentSessionKey(childSessionKey)?.agentId; + const storePath = resolveStorePath(undefined, { agentId }); + const store = loadSessionStore(storePath); + return Boolean(findSessionEntryByKey(store, childSessionKey)); + } + return true; +} + +function shouldMarkLost(task: TaskRecord, now: number): boolean { + if (!isActiveTask(task)) { + return false; + } + if (!hasLostGraceExpired(task, now)) { + return false; + } + return !hasBackingSession(task); +} + +function shouldPruneTerminalTask(task: TaskRecord, now: number): boolean { + if (!isTerminalTask(task)) { + return false; + } + const terminalAt = task.endedAt ?? task.lastEventAt ?? task.createdAt; + return now - terminalAt >= TASK_RETENTION_MS; +} + +function markTaskLost(task: TaskRecord, now: number): TaskRecord { + const updated = + updateTaskRecordById(task.taskId, { + status: "lost", + endedAt: task.endedAt ?? now, + lastEventAt: now, + error: task.error ?? "backing session missing", + }) ?? task; + void maybeDeliverTaskTerminalUpdate(updated.taskId); + return updated; +} + +function projectTaskLost(task: TaskRecord, now: number): TaskRecord { + return { + ...task, + status: "lost", + endedAt: task.endedAt ?? now, + lastEventAt: now, + error: task.error ?? "backing session missing", + }; +} + +export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): TaskRecord { + const now = Date.now(); + if (!shouldMarkLost(task, now)) { + return task; + } + return projectTaskLost(task, now); +} + +export function reconcileInspectableTasks(): TaskRecord[] { + ensureTaskRegistryReady(); + return listTaskRecords().map((task) => reconcileTaskRecordForOperatorInspection(task)); +} + +export function reconcileTaskLookupToken(token: string): TaskRecord | undefined { + ensureTaskRegistryReady(); + const task = resolveTaskForLookupToken(token); + return task ? reconcileTaskRecordForOperatorInspection(task) : undefined; +} + +export function sweepTaskRegistry(): { reconciled: number; pruned: number } { + ensureTaskRegistryReady(); + const now = Date.now(); + let reconciled = 0; + let pruned = 0; + for (const task of listTaskRecords()) { + if (shouldMarkLost(task, now)) { + const next = markTaskLost(task, now); + if (next.status === "lost") { + reconciled += 1; + } + continue; + } + if (shouldPruneTerminalTask(task, now) && deleteTaskRecordById(task.taskId)) { + pruned += 1; + } + } + return { reconciled, pruned }; +} + +export function startTaskRegistryMaintenance() { + ensureTaskRegistryReady(); + void sweepTaskRegistry(); + if (sweeper) { + return; + } + sweeper = setInterval(() => { + void sweepTaskRegistry(); + }, TASK_SWEEP_INTERVAL_MS); + sweeper.unref?.(); +} + +export function stopTaskRegistryMaintenanceForTests() { + if (!sweeper) { + return; + } + clearInterval(sweeper); + sweeper = null; +} + +export function getReconciledTaskById(taskId: string): TaskRecord | undefined { + const task = getTaskById(taskId); + return task ? reconcileTaskRecordForOperatorInspection(task) : undefined; +} diff --git a/src/tasks/task-registry.reconcile.ts b/src/tasks/task-registry.reconcile.ts new file mode 100644 index 00000000000..780cabe8558 --- /dev/null +++ b/src/tasks/task-registry.reconcile.ts @@ -0,0 +1,5 @@ +export { + reconcileInspectableTasks, + reconcileTaskLookupToken, + reconcileTaskRecordForOperatorInspection, +} from "./task-registry.maintenance.js"; diff --git a/src/tasks/task-registry.store.ts b/src/tasks/task-registry.store.ts new file mode 100644 index 00000000000..75c9d4b9aba --- /dev/null +++ b/src/tasks/task-registry.store.ts @@ -0,0 +1,68 @@ +import os from "node:os"; +import path from "node:path"; +import { resolveStateDir } from "../config/paths.js"; +import { loadJsonFile, saveJsonFile } from "../infra/json-file.js"; +import type { TaskRecord } from "./task-registry.types.js"; + +type PersistedTaskRegistry = { + version: 1; + tasks: Record; +}; + +const TASK_REGISTRY_VERSION = 1 as const; + +function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string { + const explicit = env.OPENCLAW_STATE_DIR?.trim(); + if (explicit) { + return resolveStateDir(env); + } + if (env.VITEST || env.NODE_ENV === "test") { + return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid)); + } + return resolveStateDir(env); +} + +export function resolveTaskRegistryPath(): string { + return path.join(resolveTaskStateDir(process.env), "tasks", "runs.json"); +} + +export function loadTaskRegistryFromDisk(): Map { + const pathname = resolveTaskRegistryPath(); + const raw = loadJsonFile(pathname); + if (!raw || typeof raw !== "object") { + return new Map(); + } + const record = raw as Partial; + if (record.version !== TASK_REGISTRY_VERSION) { + return new Map(); + } + const tasksRaw = record.tasks; + if (!tasksRaw || typeof tasksRaw !== "object") { + return new Map(); + } + const out = new Map(); + for (const [taskId, entry] of Object.entries(tasksRaw)) { + if (!entry || typeof entry !== "object") { + continue; + } + const typed = entry; + if (!typed.taskId || typeof typed.taskId !== "string") { + continue; + } + out.set(taskId, typed); + } + return out; +} + +export function saveTaskRegistryToDisk(tasks: Map) { + const pathname = resolveTaskRegistryPath(); + const serialized: Record = {}; + for (const [taskId, entry] of tasks.entries()) { + serialized[taskId] = entry; + } + const out: PersistedTaskRegistry = { + version: TASK_REGISTRY_VERSION, + tasks: serialized, + }; + saveJsonFile(pathname, out); +} diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts new file mode 100644 index 00000000000..21caf29ad90 --- /dev/null +++ b/src/tasks/task-registry.test.ts @@ -0,0 +1,969 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; +import { resetHeartbeatWakeStateForTests } from "../infra/heartbeat-wake.js"; +import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { + createTaskRecord, + findTaskByRunId, + getTaskById, + listTaskRecords, + maybeDeliverTaskStateChangeUpdate, + maybeDeliverTaskTerminalUpdate, + resetTaskRegistryForTests, + resolveTaskForLookupToken, + updateTaskNotifyPolicyById, + updateTaskRecordById, + updateTaskStateByRunId, +} from "./task-registry.js"; +import { reconcileInspectableTasks, sweepTaskRegistry } from "./task-registry.maintenance.js"; + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; +const hoisted = vi.hoisted(() => { + const sendMessageMock = vi.fn(); + const cancelSessionMock = vi.fn(); + const killSubagentRunAdminMock = vi.fn(); + return { + sendMessageMock, + cancelSessionMock, + killSubagentRunAdminMock, + }; +}); + +vi.mock("./task-registry-delivery-runtime.js", () => ({ + sendMessage: hoisted.sendMessageMock, +})); + +vi.mock("../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => ({ + cancelSession: hoisted.cancelSessionMock, + }), +})); + +vi.mock("../agents/subagent-control.js", () => ({ + killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params), +})); + +async function loadFreshTaskRegistryModulesForControlTest() { + vi.resetModules(); + vi.doMock("./task-registry-delivery-runtime.js", () => ({ + sendMessage: hoisted.sendMessageMock, + })); + vi.doMock("../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => ({ + cancelSession: hoisted.cancelSessionMock, + }), + })); + vi.doMock("../agents/subagent-control.js", () => ({ + killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params), + })); + return await import("./task-registry.js"); +} + +async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) { + const startedAt = Date.now(); + for (;;) { + try { + assertion(); + return; + } catch (error) { + if (Date.now() - startedAt >= timeoutMs) { + throw error; + } + await new Promise((resolve) => setTimeout(resolve, stepMs)); + } + } +} + +async function flushAsyncWork(times = 4) { + for (let index = 0; index < times; index += 1) { + await Promise.resolve(); + } +} + +describe("task-registry", () => { + afterEach(() => { + vi.useRealTimers(); + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetSystemEventsForTest(); + resetHeartbeatWakeStateForTests(); + resetTaskRegistryForTests(); + hoisted.sendMessageMock.mockReset(); + hoisted.cancelSessionMock.mockReset(); + hoisted.killSubagentRunAdminMock.mockReset(); + }); + + it("updates task status from lifecycle events", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:acp:child", + runId: "run-1", + task: "Do the thing", + status: "running", + deliveryStatus: "not_applicable", + startedAt: 100, + }); + + emitAgentEvent({ + runId: "run-1", + stream: "assistant", + data: { + text: "working", + }, + }); + emitAgentEvent({ + runId: "run-1", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 250, + }, + }); + + expect(findTaskByRunId("run-1")).toMatchObject({ + runtime: "acp", + status: "done", + endedAt: 250, + }); + }); + }); + + it("delivers ACP completion to the requester channel when a delivery origin exists", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "telegram", + to: "telegram:123", + via: "direct", + }); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + threadId: "321", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-delivery", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + startedAt: 100, + }); + + emitAgentEvent({ + runId: "run-delivery", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 250, + }, + }); + + await waitForAssertion(() => + expect(findTaskByRunId("run-delivery")).toMatchObject({ + status: "done", + deliveryStatus: "delivered", + }), + ); + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "telegram:123", + threadId: "321", + content: expect.stringContaining("Background task done: ACP background task"), + mirror: expect.objectContaining({ + sessionKey: "agent:main:main", + }), + }), + ), + ); + expect(peekSystemEvents("agent:main:main")).toEqual([]); + }); + }); + + it("records delivery failure and queues a session fallback when direct delivery misses", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + hoisted.sendMessageMock.mockRejectedValueOnce(new Error("telegram unavailable")); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-delivery-fail", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + startedAt: 100, + }); + + emitAgentEvent({ + runId: "run-delivery-fail", + stream: "lifecycle", + data: { + phase: "error", + endedAt: 250, + error: "Permission denied by ACP runtime", + }, + }); + + await waitForAssertion(() => + expect(findTaskByRunId("run-delivery-fail")).toMatchObject({ + status: "failed", + deliveryStatus: "failed", + error: "Permission denied by ACP runtime", + }), + ); + await waitForAssertion(() => + expect(peekSystemEvents("agent:main:main")).toEqual([ + expect.stringContaining("Background task failed: ACP background task"), + ]), + ); + }); + }); + + it("marks internal fallback delivery as session queued instead of delivered", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:acp:child", + runId: "run-session-queued", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + startedAt: 100, + }); + + emitAgentEvent({ + runId: "run-session-queued", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 250, + }, + }); + + await waitForAssertion(() => + expect(findTaskByRunId("run-session-queued")).toMatchObject({ + status: "done", + deliveryStatus: "session_queued", + }), + ); + expect(peekSystemEvents("agent:main:main")).toEqual([ + expect.stringContaining("Background task done: ACP background task"), + ]); + expect(hoisted.sendMessageMock).not.toHaveBeenCalled(); + }); + }); + + it("does not include internal progress detail in the terminal channel message", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "telegram", + to: "telegram:123", + via: "direct", + }); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + threadId: "321", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-detail-leak", + task: "Create the file and verify it", + status: "running", + deliveryStatus: "pending", + startedAt: 100, + }); + + updateTaskRecordById(findTaskByRunId("run-detail-leak")!.taskId, { + progressSummary: + "I am loading the local session context and checking helper command availability before writing the file.", + }); + + emitAgentEvent({ + runId: "run-detail-leak", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 250, + }, + }); + + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + content: "Background task done: ACP background task (run run-deta).", + }), + ), + ); + }); + }); + + it("keeps distinct task records when different producers share a runId", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + createTaskRecord({ + source: "background_cli", + runtime: "cli", + requesterSessionKey: "agent:codex:acp:child", + childSessionKey: "agent:codex:acp:child", + runId: "run-shared", + task: "Child ACP execution", + status: "running", + deliveryStatus: "not_applicable", + }); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child", + runId: "run-shared", + task: "Spawn ACP child", + status: "running", + deliveryStatus: "pending", + }); + + expect(listTaskRecords().filter((task) => task.runId === "run-shared")).toHaveLength(2); + expect(findTaskByRunId("run-shared")).toMatchObject({ + source: "sessions_spawn", + runtime: "acp", + task: "Spawn ACP child", + }); + }); + }); + + it("suppresses duplicate ACP delivery when a preferred spawned task shares the runId", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "telegram", + to: "telegram:123", + via: "direct", + }); + + const directTask = createTaskRecord({ + source: "unknown", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-shared-delivery", + task: "Direct ACP child", + status: "done", + deliveryStatus: "pending", + }); + const spawnedTask = createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-shared-delivery", + task: "Spawn ACP child", + status: "done", + deliveryStatus: "pending", + }); + + await maybeDeliverTaskTerminalUpdate(directTask.taskId); + await maybeDeliverTaskTerminalUpdate(spawnedTask.taskId); + + expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1); + expect(listTaskRecords().filter((task) => task.runId === "run-shared-delivery")).toHaveLength( + 1, + ); + expect(findTaskByRunId("run-shared-delivery")).toMatchObject({ + taskId: directTask.taskId, + source: "sessions_spawn", + deliveryStatus: "delivered", + }); + }); + }); + + it("collapses ACP run-owned task creation onto the existing spawned task", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + const spawnedTask = createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-collapse", + task: "Spawn ACP child", + status: "running", + deliveryStatus: "pending", + streamLogPath: "/tmp/stream.jsonl", + }); + + const directTask = createTaskRecord({ + source: "unknown", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-collapse", + task: "Direct ACP child", + status: "running", + }); + + expect(directTask.taskId).toBe(spawnedTask.taskId); + expect(listTaskRecords().filter((task) => task.runId === "run-collapse")).toHaveLength(1); + expect(findTaskByRunId("run-collapse")).toMatchObject({ + source: "sessions_spawn", + task: "Spawn ACP child", + streamLogPath: "/tmp/stream.jsonl", + }); + }); + }); + + it("delivers a terminal ACP update only once when multiple notifiers race", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "telegram", + to: "telegram:123", + via: "direct", + }); + + const task = createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-racing-delivery", + task: "Investigate issue", + status: "done", + deliveryStatus: "pending", + }); + + const first = maybeDeliverTaskTerminalUpdate(task.taskId); + const second = maybeDeliverTaskTerminalUpdate(task.taskId); + await Promise.all([first, second]); + + expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1); + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + idempotencyKey: `task-terminal:${task.taskId}:done`, + mirror: expect.objectContaining({ + idempotencyKey: `task-terminal:${task.taskId}:done`, + }), + }), + ); + expect(findTaskByRunId("run-racing-delivery")).toMatchObject({ + deliveryStatus: "delivered", + }); + }); + }); + + it("restores persisted tasks from disk on the next lookup", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + const task = createTaskRecord({ + source: "sessions_spawn", + runtime: "subagent", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:subagent:child", + runId: "run-restore", + task: "Restore me", + status: "running", + deliveryStatus: "pending", + }); + + resetTaskRegistryForTests({ + persist: false, + }); + + expect(resolveTaskForLookupToken(task.taskId)).toMatchObject({ + taskId: task.taskId, + runId: "run-restore", + task: "Restore me", + }); + }); + }); + + it("projects inspection-time orphaned tasks as lost without mutating the registry", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + const task = createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:acp:missing", + runId: "run-lost", + task: "Missing child", + status: "running", + deliveryStatus: "pending", + }); + updateTaskRecordById(task.taskId, { + lastEventAt: Date.now() - 10 * 60_000, + }); + + const tasks = reconcileInspectableTasks(); + expect(tasks[0]).toMatchObject({ + runId: "run-lost", + status: "lost", + error: "backing session missing", + }); + expect(getTaskById(task.taskId)).toMatchObject({ + status: "running", + }); + expect(peekSystemEvents("agent:main:main")).toEqual([]); + }); + }); + + it("prunes old terminal tasks during maintenance sweeps", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + const task = createTaskRecord({ + source: "background_cli", + runtime: "cli", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:main", + runId: "run-prune", + task: "Old completed task", + status: "done", + deliveryStatus: "not_applicable", + startedAt: Date.now() - 9 * 24 * 60 * 60_000, + }); + updateTaskRecordById(task.taskId, { + endedAt: Date.now() - 8 * 24 * 60 * 60_000, + lastEventAt: Date.now() - 8 * 24 * 60 * 60_000, + }); + + expect(sweepTaskRegistry()).toEqual({ + reconciled: 0, + pruned: 1, + }); + expect(listTaskRecords()).toEqual([]); + }); + }); + + it("delivers concise state-change updates only when notify policy requests them", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "discord", + to: "discord:123", + via: "direct", + }); + + const task = createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "discord", + to: "discord:123", + }, + childSessionKey: "agent:codex:acp:child", + runId: "run-state-change", + task: "Investigate issue", + status: "accepted", + notifyPolicy: "done_only", + }); + + updateTaskStateByRunId({ + runId: "run-state-change", + status: "running", + eventSummary: "Started.", + }); + await waitForAssertion(() => expect(hoisted.sendMessageMock).not.toHaveBeenCalled()); + + updateTaskNotifyPolicyById({ + taskId: task.taskId, + notifyPolicy: "state_changes", + }); + updateTaskStateByRunId({ + runId: "run-state-change", + eventSummary: "No output for 60s. It may be waiting for input.", + }); + + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + content: + "Background task update: ACP background task. No output for 60s. It may be waiting for input.", + }), + ), + ); + expect(findTaskByRunId("run-state-change")).toMatchObject({ + notifyPolicy: "state_changes", + lastNotifiedEventAt: expect.any(Number), + recentEvents: expect.arrayContaining([ + expect.objectContaining({ + kind: "progress", + summary: "No output for 60s. It may be waiting for input.", + }), + ]), + }); + await maybeDeliverTaskStateChangeUpdate(task.taskId); + expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1); + }); + }); + + it("keeps background ACP progress off the foreground lane and only sends a terminal notify", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetSystemEventsForTest(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "discord", + to: "discord:123", + via: "direct", + }); + vi.useFakeTimers(); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "discord", + to: "discord:123", + }, + childSessionKey: "agent:codex:acp:child", + runId: "run-quiet-terminal", + task: "Create the file", + status: "running", + deliveryStatus: "pending", + }); + + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-quiet-terminal", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child", + agentId: "codex", + surfaceUpdates: false, + streamFlushMs: 1, + noOutputNoticeMs: 1_000, + noOutputPollMs: 250, + }); + + relay.notifyStarted(); + emitAgentEvent({ + runId: "run-quiet-terminal", + stream: "assistant", + data: { + delta: "working on it", + }, + }); + vi.advanceTimersByTime(10); + + expect(peekSystemEvents("agent:main:main")).toEqual([]); + expect(hoisted.sendMessageMock).not.toHaveBeenCalled(); + + emitAgentEvent({ + runId: "run-quiet-terminal", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 250, + }, + }); + await flushAsyncWork(); + + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + to: "discord:123", + content: "Background task done: ACP background task (run run-quie).", + }), + ); + expect(peekSystemEvents("agent:main:main")).toEqual([]); + relay.dispose(); + vi.useRealTimers(); + }); + }); + + it("delivers a concise terminal failure message without internal ACP chatter", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetSystemEventsForTest(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "discord", + to: "discord:123", + via: "direct", + }); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "discord", + to: "discord:123", + }, + childSessionKey: "agent:codex:acp:child", + runId: "run-failure-terminal", + task: "Write the file", + status: "running", + deliveryStatus: "pending", + progressSummary: + "I am loading session context and checking helper availability before writing the file.", + }); + + emitAgentEvent({ + runId: "run-failure-terminal", + stream: "lifecycle", + data: { + phase: "error", + endedAt: 250, + error: "Permission denied by ACP runtime", + }, + }); + await flushAsyncWork(); + + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + to: "discord:123", + content: + "Background task failed: ACP background task (run run-fail). Permission denied by ACP runtime", + }), + ); + expect(peekSystemEvents("agent:main:main")).toEqual([]); + }); + }); + + it("emits concise state-change updates without surfacing raw ACP chatter", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetSystemEventsForTest(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "discord", + to: "discord:123", + via: "direct", + }); + vi.useFakeTimers(); + + createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "discord", + to: "discord:123", + }, + childSessionKey: "agent:codex:acp:child", + runId: "run-state-stream", + task: "Create the file", + status: "running", + deliveryStatus: "pending", + notifyPolicy: "state_changes", + }); + + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-state-stream", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child", + agentId: "codex", + surfaceUpdates: false, + streamFlushMs: 1, + noOutputNoticeMs: 1_000, + noOutputPollMs: 250, + }); + + relay.notifyStarted(); + await flushAsyncWork(); + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + content: "Background task update: ACP background task. Started.", + }), + ); + + hoisted.sendMessageMock.mockClear(); + vi.advanceTimersByTime(1_500); + await flushAsyncWork(); + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + content: + "Background task update: ACP background task. No output for 1s. It may be waiting for input.", + }), + ); + + expect(peekSystemEvents("agent:main:main")).toEqual([]); + relay.dispose(); + vi.useRealTimers(); + }); + }); + + it("cancels ACP-backed tasks through the ACP session manager", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + const registry = await loadFreshTaskRegistryModulesForControlTest(); + process.env.OPENCLAW_STATE_DIR = root; + registry.resetTaskRegistryForTests(); + hoisted.cancelSessionMock.mockResolvedValue(undefined); + + const task = registry.createTaskRecord({ + source: "sessions_spawn", + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:codex:acp:child", + runId: "run-cancel-acp", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + }); + + const result = await registry.cancelTaskById({ + cfg: {} as never, + taskId: task.taskId, + }); + + expect(hoisted.cancelSessionMock).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: {}, + sessionKey: "agent:codex:acp:child", + reason: "task-cancel", + }), + ); + expect(result).toMatchObject({ + found: true, + cancelled: true, + task: expect.objectContaining({ + taskId: task.taskId, + status: "cancelled", + error: "Cancelled by operator.", + }), + }); + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "telegram:123", + content: "Background task cancelled: ACP background task (run run-canc).", + }), + ), + ); + }); + }); + + it("cancels subagent-backed tasks through subagent control", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + const registry = await loadFreshTaskRegistryModulesForControlTest(); + process.env.OPENCLAW_STATE_DIR = root; + registry.resetTaskRegistryForTests(); + hoisted.killSubagentRunAdminMock.mockResolvedValue({ + found: true, + killed: true, + }); + + const task = registry.createTaskRecord({ + source: "sessions_spawn", + runtime: "subagent", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:worker:subagent:child", + runId: "run-cancel-subagent", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + }); + + const result = await registry.cancelTaskById({ + cfg: {} as never, + taskId: task.taskId, + }); + + expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: {}, + sessionKey: "agent:worker:subagent:child", + }), + ); + expect(result).toMatchObject({ + found: true, + cancelled: true, + task: expect.objectContaining({ + taskId: task.taskId, + status: "cancelled", + error: "Cancelled by operator.", + }), + }); + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "telegram:123", + content: "Background task cancelled: Subagent task (run run-canc).", + }), + ), + ); + }); + }); +}); diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts new file mode 100644 index 00000000000..b521050f8cd --- /dev/null +++ b/src/tasks/task-registry.ts @@ -0,0 +1,976 @@ +import crypto from "node:crypto"; +import { getAcpSessionManager } from "../acp/control-plane/manager.js"; +import { killSubagentRunAdmin } from "../agents/subagent-control.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { onAgentEvent } from "../infra/agent-events.js"; +import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; +import { enqueueSystemEvent } from "../infra/system-events.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { parseAgentSessionKey } from "../routing/session-key.js"; +import { normalizeDeliveryContext } from "../utils/delivery-context.js"; +import { isDeliverableMessageChannel } from "../utils/message-channel.js"; +import { loadTaskRegistryFromDisk, saveTaskRegistryToDisk } from "./task-registry.store.js"; +import type { + TaskBindingTargetKind, + TaskDeliveryStatus, + TaskEventKind, + TaskEventRecord, + TaskNotifyPolicy, + TaskRecord, + TaskRegistrySnapshot, + TaskRuntime, + TaskSource, + TaskStatus, +} from "./task-registry.types.js"; + +const log = createSubsystemLogger("tasks/registry"); + +const tasks = new Map(); +const taskIdsByRunId = new Map>(); +const tasksWithPendingDelivery = new Set(); +let listenerStarted = false; +let listenerStop: (() => void) | null = null; +let restoreAttempted = false; +let deliveryRuntimePromise: Promise | null = + null; + +function cloneTaskRecord(record: TaskRecord): TaskRecord { + return { + ...record, + ...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}), + ...(record.recentEvents + ? { recentEvents: record.recentEvents.map((event) => ({ ...event })) } + : {}), + }; +} + +function persistTaskRegistry() { + saveTaskRegistryToDisk(tasks); +} + +function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus { + return requesterSessionKey.trim() ? "pending" : "parent_missing"; +} + +function ensureNotifyPolicy(params: { + notifyPolicy?: TaskNotifyPolicy; + deliveryStatus?: TaskDeliveryStatus; + requesterSessionKey: string; +}): TaskNotifyPolicy { + if (params.notifyPolicy) { + return params.notifyPolicy; + } + const deliveryStatus = params.deliveryStatus ?? ensureDeliveryStatus(params.requesterSessionKey); + return deliveryStatus === "not_applicable" ? "silent" : "done_only"; +} + +function normalizeTaskSummary(value: string | null | undefined): string | undefined { + const normalized = value?.replace(/\s+/g, " ").trim(); + return normalized || undefined; +} + +const TASK_RECENT_EVENT_LIMIT = 12; + +function appendTaskEvent( + current: TaskRecord, + event: { + at: number; + kind: TaskEventKind; + summary?: string | null; + }, +): TaskEventRecord[] { + const summary = normalizeTaskSummary(event.summary); + const nextEvent: TaskEventRecord = { + at: event.at, + kind: event.kind, + ...(summary ? { summary } : {}), + }; + const previous = current.recentEvents ?? []; + const merged = [...previous, nextEvent]; + return merged.slice(-TASK_RECENT_EVENT_LIMIT); +} + +function loadTaskRegistryDeliveryRuntime() { + deliveryRuntimePromise ??= import("./task-registry-delivery-runtime.js"); + return deliveryRuntimePromise; +} + +function addRunIdIndex(taskId: string, runId?: string) { + const trimmed = runId?.trim(); + if (!trimmed) { + return; + } + let ids = taskIdsByRunId.get(trimmed); + if (!ids) { + ids = new Set(); + taskIdsByRunId.set(trimmed, ids); + } + ids.add(taskId); +} + +function rebuildRunIdIndex() { + taskIdsByRunId.clear(); + for (const [taskId, task] of tasks.entries()) { + addRunIdIndex(taskId, task.runId); + } +} + +function getTasksByRunId(runId: string): TaskRecord[] { + const ids = taskIdsByRunId.get(runId.trim()); + if (!ids || ids.size === 0) { + return []; + } + return [...ids] + .map((taskId) => tasks.get(taskId)) + .filter((task): task is TaskRecord => Boolean(task)); +} + +function taskLookupPriority(task: TaskRecord): number { + const sourcePriority = + task.source === "sessions_spawn" ? 0 : task.source === "background_cli" ? 1 : 2; + const runtimePriority = task.runtime === "cli" ? 1 : 0; + return sourcePriority * 10 + runtimePriority; +} + +function pickPreferredRunIdTask(matches: TaskRecord[]): TaskRecord | undefined { + return [...matches].toSorted((left, right) => { + const priorityDiff = taskLookupPriority(left) - taskLookupPriority(right); + if (priorityDiff !== 0) { + return priorityDiff; + } + return left.createdAt - right.createdAt; + })[0]; +} + +function normalizeComparableText(value: string | undefined): string { + return value?.trim() ?? ""; +} + +function findExistingTaskForCreate(params: { + source: TaskSource; + runtime: TaskRuntime; + requesterSessionKey: string; + childSessionKey?: string; + runId?: string; + bindingTargetKind?: TaskBindingTargetKind; + label?: string; + task: string; +}): TaskRecord | undefined { + const runId = params.runId?.trim(); + const exact = runId + ? getTasksByRunId(runId).find( + (task) => + task.source === params.source && + task.runtime === params.runtime && + normalizeComparableText(task.requesterSessionKey) === + normalizeComparableText(params.requesterSessionKey) && + normalizeComparableText(task.childSessionKey) === + normalizeComparableText(params.childSessionKey) && + normalizeComparableText(task.bindingTargetKind) === + normalizeComparableText(params.bindingTargetKind) && + normalizeComparableText(task.label) === normalizeComparableText(params.label) && + normalizeComparableText(task.task) === normalizeComparableText(params.task), + ) + : undefined; + if (exact) { + return exact; + } + if (!runId || params.runtime !== "acp") { + return undefined; + } + const siblingMatches = getTasksByRunId(runId).filter( + (task) => + task.runtime === params.runtime && + normalizeComparableText(task.requesterSessionKey) === + normalizeComparableText(params.requesterSessionKey) && + normalizeComparableText(task.childSessionKey) === + normalizeComparableText(params.childSessionKey), + ); + if (siblingMatches.length === 0) { + return undefined; + } + return pickPreferredRunIdTask(siblingMatches); +} + +function sourceUpgradePriority(source: TaskSource): number { + return source === "sessions_spawn" ? 0 : source === "background_cli" ? 1 : 2; +} + +function mergeExistingTaskForCreate( + existing: TaskRecord, + params: { + source: TaskSource; + requesterOrigin?: TaskRecord["requesterOrigin"]; + bindingTargetKind?: TaskBindingTargetKind; + label?: string; + task: string; + deliveryStatus?: TaskDeliveryStatus; + notifyPolicy?: TaskNotifyPolicy; + streamLogPath?: string; + }, +): TaskRecord { + const patch: Partial = {}; + if (sourceUpgradePriority(params.source) < sourceUpgradePriority(existing.source)) { + patch.source = params.source; + } + const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); + if (requesterOrigin && !existing.requesterOrigin) { + patch.requesterOrigin = requesterOrigin; + } + if (params.bindingTargetKind && !existing.bindingTargetKind) { + patch.bindingTargetKind = params.bindingTargetKind; + } + if (params.label?.trim() && !existing.label?.trim()) { + patch.label = params.label.trim(); + } + if (params.streamLogPath?.trim() && !existing.streamLogPath?.trim()) { + patch.streamLogPath = params.streamLogPath.trim(); + } + if (params.source === "sessions_spawn" && existing.source !== "sessions_spawn") { + patch.task = params.task; + } + if (params.deliveryStatus === "pending" && existing.deliveryStatus !== "delivered") { + patch.deliveryStatus = "pending"; + } + const notifyPolicy = ensureNotifyPolicy({ + notifyPolicy: params.notifyPolicy, + deliveryStatus: params.deliveryStatus, + requesterSessionKey: existing.requesterSessionKey, + }); + if (notifyPolicy !== existing.notifyPolicy && existing.notifyPolicy === "silent") { + patch.notifyPolicy = notifyPolicy; + } + if (Object.keys(patch).length === 0) { + return cloneTaskRecord(existing); + } + return updateTask(existing.taskId, patch) ?? cloneTaskRecord(existing); +} + +function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string { + return `task-terminal:${task.taskId}:${task.status}`; +} + +function restoreTaskRegistryOnce() { + if (restoreAttempted) { + return; + } + restoreAttempted = true; + try { + const restored = loadTaskRegistryFromDisk(); + if (restored.size === 0) { + return; + } + for (const [taskId, task] of restored.entries()) { + tasks.set(taskId, task); + } + rebuildRunIdIndex(); + } catch (error) { + log.warn("Failed to restore task registry", { error }); + } +} + +export function ensureTaskRegistryReady() { + restoreTaskRegistryOnce(); + ensureListener(); +} + +function updateTask(taskId: string, patch: Partial): TaskRecord | null { + const current = tasks.get(taskId); + if (!current) { + return null; + } + const next = { ...current, ...patch }; + tasks.set(taskId, next); + if (patch.runId && patch.runId !== current.runId) { + rebuildRunIdIndex(); + } + persistTaskRegistry(); + return cloneTaskRecord(next); +} + +function formatTaskTerminalEvent(task: TaskRecord): string { + // User-facing task notifications stay intentionally terse. Detailed runtime chatter lives + // in task metadata for inspection, not in the default channel ping. + const title = + task.label?.trim() || + (task.runtime === "acp" + ? "ACP background task" + : task.runtime === "subagent" + ? "Subagent task" + : task.task.trim() || "Background task"); + const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : ""; + const summary = task.terminalSummary?.trim(); + if (task.status === "done") { + return summary + ? `Background task done: ${title}${runLabel}. ${summary}` + : `Background task done: ${title}${runLabel}.`; + } + if (task.status === "timed_out") { + return `Background task timed out: ${title}${runLabel}.`; + } + if (task.status === "lost") { + return `Background task lost: ${title}${runLabel}. ${task.error ?? "Backing session disappeared."}`; + } + if (task.status === "cancelled") { + return `Background task cancelled: ${title}${runLabel}.`; + } + const error = task.error?.trim(); + return error + ? `Background task failed: ${title}${runLabel}. ${error}` + : `Background task failed: ${title}${runLabel}.`; +} + +function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean { + const origin = normalizeDeliveryContext(task.requesterOrigin); + const channel = origin?.channel?.trim(); + const to = origin?.to?.trim(); + return Boolean(channel && to && isDeliverableMessageChannel(channel)); +} + +function queueTaskSystemEvent(task: TaskRecord, text: string) { + const requesterSessionKey = task.requesterSessionKey.trim(); + if (!requesterSessionKey) { + return false; + } + enqueueSystemEvent(text, { + sessionKey: requesterSessionKey, + contextKey: `task:${task.taskId}`, + deliveryContext: task.requesterOrigin, + }); + requestHeartbeatNow({ + reason: "background-task", + sessionKey: requesterSessionKey, + }); + return true; +} + +function formatTaskStateChangeEvent(task: TaskRecord, event: TaskEventRecord): string | null { + const title = + task.label?.trim() || + (task.runtime === "acp" + ? "ACP background task" + : task.runtime === "subagent" + ? "Subagent task" + : task.task.trim() || "Background task"); + if (event.kind === "running") { + return `Background task started: ${title}.`; + } + if (event.kind === "progress") { + return event.summary ? `Background task update: ${title}. ${event.summary}` : null; + } + return null; +} + +function shouldAutoDeliverTaskUpdate(task: TaskRecord): boolean { + if (task.notifyPolicy === "silent") { + return false; + } + if (task.runtime === "subagent" && task.status !== "cancelled") { + return false; + } + if ( + task.status !== "done" && + task.status !== "failed" && + task.status !== "timed_out" && + task.status !== "lost" && + task.status !== "cancelled" + ) { + return false; + } + return task.deliveryStatus === "pending"; +} + +function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean { + return ( + task.notifyPolicy === "state_changes" && + task.deliveryStatus === "pending" && + task.status !== "done" && + task.status !== "failed" && + task.status !== "timed_out" && + task.status !== "lost" && + task.status !== "cancelled" + ); +} + +function shouldSuppressDuplicateTerminalDelivery(task: TaskRecord): boolean { + if (task.runtime !== "acp" || !task.runId?.trim()) { + return false; + } + const preferred = pickPreferredRunIdTask(getTasksByRunId(task.runId)); + return Boolean(preferred && preferred.taskId !== task.taskId); +} + +export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise { + ensureTaskRegistryReady(); + const current = tasks.get(taskId); + if (!current || !shouldAutoDeliverTaskUpdate(current)) { + return current ? cloneTaskRecord(current) : null; + } + if (tasksWithPendingDelivery.has(taskId)) { + return cloneTaskRecord(current); + } + tasksWithPendingDelivery.add(taskId); + try { + const latest = tasks.get(taskId); + if (!latest || !shouldAutoDeliverTaskUpdate(latest)) { + return latest ? cloneTaskRecord(latest) : null; + } + if (shouldSuppressDuplicateTerminalDelivery(latest)) { + return updateTask(taskId, { + deliveryStatus: "not_applicable", + lastEventAt: Date.now(), + }); + } + if (!latest.requesterSessionKey.trim()) { + return updateTask(taskId, { + deliveryStatus: "parent_missing", + lastEventAt: Date.now(), + }); + } + const eventText = formatTaskTerminalEvent(latest); + if (!canDeliverTaskToRequesterOrigin(latest)) { + try { + queueTaskSystemEvent(latest, eventText); + return updateTask(taskId, { + deliveryStatus: "session_queued", + lastEventAt: Date.now(), + }); + } catch (error) { + log.warn("Failed to queue background task session delivery", { + taskId, + requesterSessionKey: latest.requesterSessionKey, + error, + }); + return updateTask(taskId, { + deliveryStatus: "failed", + lastEventAt: Date.now(), + }); + } + } + try { + const { sendMessage } = await loadTaskRegistryDeliveryRuntime(); + const origin = normalizeDeliveryContext(latest.requesterOrigin); + const requesterAgentId = parseAgentSessionKey(latest.requesterSessionKey)?.agentId; + await sendMessage({ + channel: origin?.channel, + to: origin?.to ?? "", + accountId: origin?.accountId, + threadId: origin?.threadId, + content: eventText, + agentId: requesterAgentId, + idempotencyKey: taskTerminalDeliveryIdempotencyKey(latest), + mirror: { + sessionKey: latest.requesterSessionKey, + agentId: requesterAgentId, + idempotencyKey: taskTerminalDeliveryIdempotencyKey(latest), + }, + }); + return updateTask(taskId, { + deliveryStatus: "delivered", + lastEventAt: Date.now(), + }); + } catch (error) { + log.warn("Failed to deliver background task update", { + taskId, + requesterSessionKey: latest.requesterSessionKey, + requesterOrigin: latest.requesterOrigin, + error, + }); + try { + queueTaskSystemEvent(latest, eventText); + } catch (fallbackError) { + log.warn("Failed to queue background task fallback event", { + taskId, + requesterSessionKey: latest.requesterSessionKey, + error: fallbackError, + }); + } + return updateTask(taskId, { + deliveryStatus: "failed", + lastEventAt: Date.now(), + }); + } + } finally { + tasksWithPendingDelivery.delete(taskId); + } +} + +export async function maybeDeliverTaskStateChangeUpdate( + taskId: string, +): Promise { + ensureTaskRegistryReady(); + const current = tasks.get(taskId); + if (!current || !shouldAutoDeliverTaskStateChange(current)) { + return current ? cloneTaskRecord(current) : null; + } + const latestEvent = current.recentEvents?.at(-1); + if (!latestEvent || (current.lastNotifiedEventAt ?? 0) >= latestEvent.at) { + return cloneTaskRecord(current); + } + const eventText = formatTaskStateChangeEvent(current, latestEvent); + if (!eventText) { + return cloneTaskRecord(current); + } + try { + if (!canDeliverTaskToRequesterOrigin(current)) { + queueTaskSystemEvent(current, eventText); + return updateTask(taskId, { + lastNotifiedEventAt: latestEvent.at, + lastEventAt: Date.now(), + }); + } + const { sendMessage } = await loadTaskRegistryDeliveryRuntime(); + const origin = normalizeDeliveryContext(current.requesterOrigin); + const requesterAgentId = parseAgentSessionKey(current.requesterSessionKey)?.agentId; + await sendMessage({ + channel: origin?.channel, + to: origin?.to ?? "", + accountId: origin?.accountId, + threadId: origin?.threadId, + content: eventText, + agentId: requesterAgentId, + idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`, + mirror: { + sessionKey: current.requesterSessionKey, + agentId: requesterAgentId, + idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`, + }, + }); + return updateTask(taskId, { + lastNotifiedEventAt: latestEvent.at, + lastEventAt: Date.now(), + }); + } catch (error) { + log.warn("Failed to deliver background task state change", { + taskId, + requesterSessionKey: current.requesterSessionKey, + error, + }); + return cloneTaskRecord(current); + } +} + +export function updateTaskRecordById( + taskId: string, + patch: Partial, +): TaskRecord | null { + ensureTaskRegistryReady(); + return updateTask(taskId, patch); +} + +function updateTasksByRunId(runId: string, patch: Partial): TaskRecord[] { + const ids = taskIdsByRunId.get(runId.trim()); + if (!ids || ids.size === 0) { + return []; + } + const updated: TaskRecord[] = []; + for (const taskId of ids) { + const task = updateTask(taskId, patch); + if (task) { + updated.push(task); + } + } + return updated; +} + +function ensureListener() { + if (listenerStarted) { + return; + } + listenerStarted = true; + listenerStop = onAgentEvent((evt) => { + restoreTaskRegistryOnce(); + const ids = taskIdsByRunId.get(evt.runId); + if (!ids || ids.size === 0) { + return; + } + const now = evt.ts || Date.now(); + for (const taskId of ids) { + const current = tasks.get(taskId); + if (!current) { + continue; + } + const patch: Partial = { + lastEventAt: now, + }; + if (evt.stream === "lifecycle") { + const phase = typeof evt.data?.phase === "string" ? evt.data.phase : undefined; + const startedAt = + typeof evt.data?.startedAt === "number" ? evt.data.startedAt : current.startedAt; + const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : undefined; + if (startedAt) { + patch.startedAt = startedAt; + } + if (phase === "start") { + patch.status = "running"; + } else if (phase === "end") { + patch.status = evt.data?.aborted === true ? "timed_out" : "done"; + patch.endedAt = endedAt ?? now; + } else if (phase === "error") { + patch.status = "failed"; + patch.endedAt = endedAt ?? now; + patch.error = typeof evt.data?.error === "string" ? evt.data.error : current.error; + } + } else if (evt.stream === "error") { + patch.error = typeof evt.data?.error === "string" ? evt.data.error : current.error; + } + if (patch.status && patch.status !== current.status) { + patch.recentEvents = appendTaskEvent(current, { + at: now, + kind: patch.status, + summary: + patch.status === "failed" + ? (patch.error ?? current.error) + : patch.status === "done" + ? current.terminalSummary + : undefined, + }); + } + const updated = updateTask(taskId, patch); + if (updated) { + void maybeDeliverTaskStateChangeUpdate(taskId); + void maybeDeliverTaskTerminalUpdate(taskId); + } + } + }); +} + +export function createTaskRecord(params: { + source: TaskSource; + runtime: TaskRuntime; + requesterSessionKey: string; + requesterOrigin?: TaskRecord["requesterOrigin"]; + childSessionKey?: string; + runId?: string; + bindingTargetKind?: TaskBindingTargetKind; + label?: string; + task: string; + status?: TaskStatus; + deliveryStatus?: TaskDeliveryStatus; + notifyPolicy?: TaskNotifyPolicy; + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; + terminalSummary?: string | null; + transcriptPath?: string; + streamLogPath?: string; + backend?: string; + agentSessionId?: string; + backendSessionId?: string; +}): TaskRecord { + ensureTaskRegistryReady(); + const existing = findExistingTaskForCreate(params); + if (existing) { + return mergeExistingTaskForCreate(existing, params); + } + const now = Date.now(); + const taskId = crypto.randomUUID(); + const status = params.status ?? "accepted"; + const deliveryStatus = params.deliveryStatus ?? ensureDeliveryStatus(params.requesterSessionKey); + const notifyPolicy = ensureNotifyPolicy({ + notifyPolicy: params.notifyPolicy, + deliveryStatus, + requesterSessionKey: params.requesterSessionKey, + }); + const lastEventAt = params.lastEventAt ?? params.startedAt ?? now; + const record: TaskRecord = { + taskId, + source: params.source, + runtime: params.runtime, + requesterSessionKey: params.requesterSessionKey, + requesterOrigin: normalizeDeliveryContext(params.requesterOrigin), + childSessionKey: params.childSessionKey, + runId: params.runId?.trim() || undefined, + bindingTargetKind: params.bindingTargetKind, + label: params.label?.trim() || undefined, + task: params.task, + status, + deliveryStatus, + notifyPolicy, + createdAt: now, + startedAt: params.startedAt, + lastEventAt, + progressSummary: normalizeTaskSummary(params.progressSummary), + terminalSummary: normalizeTaskSummary(params.terminalSummary), + recentEvents: appendTaskEvent( + { + taskId, + source: params.source, + runtime: params.runtime, + requesterSessionKey: params.requesterSessionKey, + task: params.task, + status, + deliveryStatus, + notifyPolicy, + createdAt: now, + } as TaskRecord, + { + at: lastEventAt, + kind: status, + }, + ), + transcriptPath: params.transcriptPath, + streamLogPath: params.streamLogPath, + backend: params.backend, + agentSessionId: params.agentSessionId, + backendSessionId: params.backendSessionId, + }; + tasks.set(taskId, record); + addRunIdIndex(taskId, record.runId); + persistTaskRegistry(); + return cloneTaskRecord(record); +} + +export function updateTaskStateByRunId(params: { + runId: string; + status?: TaskStatus; + startedAt?: number; + endedAt?: number; + lastEventAt?: number; + error?: string; + progressSummary?: string | null; + terminalSummary?: string | null; + eventSummary?: string | null; +}) { + ensureTaskRegistryReady(); + const ids = taskIdsByRunId.get(params.runId.trim()); + if (!ids || ids.size === 0) { + return []; + } + const updated: TaskRecord[] = []; + for (const taskId of ids) { + const current = tasks.get(taskId); + if (!current) { + continue; + } + const patch: Partial = {}; + const nextStatus = params.status ?? current.status; + const eventAt = params.lastEventAt ?? params.endedAt ?? Date.now(); + if (params.status) { + patch.status = params.status; + } + if (params.startedAt != null) { + patch.startedAt = params.startedAt; + } + if (params.endedAt != null) { + patch.endedAt = params.endedAt; + } + if (params.lastEventAt != null) { + patch.lastEventAt = params.lastEventAt; + } + if (params.error !== undefined) { + patch.error = params.error; + } + if (params.progressSummary !== undefined) { + patch.progressSummary = normalizeTaskSummary(params.progressSummary); + } + if (params.terminalSummary !== undefined) { + patch.terminalSummary = normalizeTaskSummary(params.terminalSummary); + } + const eventSummary = + normalizeTaskSummary(params.eventSummary) ?? + (nextStatus === "failed" + ? normalizeTaskSummary(params.error ?? current.error) + : nextStatus === "done" + ? normalizeTaskSummary(params.terminalSummary ?? current.terminalSummary) + : undefined); + const shouldAppendEvent = + (params.status && params.status !== current.status) || + Boolean(normalizeTaskSummary(params.eventSummary)); + if (shouldAppendEvent) { + patch.recentEvents = appendTaskEvent(current, { + at: eventAt, + kind: params.status && params.status !== current.status ? params.status : "progress", + summary: eventSummary, + }); + } + const task = updateTask(taskId, patch); + if (task) { + updated.push(task); + } + } + for (const task of updated) { + void maybeDeliverTaskStateChangeUpdate(task.taskId); + void maybeDeliverTaskTerminalUpdate(task.taskId); + } + return updated; +} + +export function updateTaskDeliveryByRunId(params: { + runId: string; + deliveryStatus: TaskDeliveryStatus; +}) { + ensureTaskRegistryReady(); + return updateTasksByRunId(params.runId, { + deliveryStatus: params.deliveryStatus, + }); +} + +export function updateTaskNotifyPolicyById(params: { + taskId: string; + notifyPolicy: TaskNotifyPolicy; +}): TaskRecord | null { + ensureTaskRegistryReady(); + return updateTask(params.taskId, { + notifyPolicy: params.notifyPolicy, + lastEventAt: Date.now(), + }); +} + +export async function cancelTaskById(params: { + cfg: OpenClawConfig; + taskId: string; +}): Promise<{ found: boolean; cancelled: boolean; reason?: string; task?: TaskRecord }> { + ensureTaskRegistryReady(); + const task = tasks.get(params.taskId.trim()); + if (!task) { + return { found: false, cancelled: false, reason: "Task not found." }; + } + if ( + task.status === "done" || + task.status === "failed" || + task.status === "timed_out" || + task.status === "lost" || + task.status === "cancelled" + ) { + return { + found: true, + cancelled: false, + reason: "Task is already terminal.", + task: cloneTaskRecord(task), + }; + } + const childSessionKey = task.childSessionKey?.trim(); + if (!childSessionKey) { + return { + found: true, + cancelled: false, + reason: "Task has no cancellable child session.", + task: cloneTaskRecord(task), + }; + } + try { + if (task.runtime === "acp") { + await getAcpSessionManager().cancelSession({ + cfg: params.cfg, + sessionKey: childSessionKey, + reason: "task-cancel", + }); + } else if (task.runtime === "subagent") { + const result = await killSubagentRunAdmin({ + cfg: params.cfg, + sessionKey: childSessionKey, + }); + if (!result.found || !result.killed) { + return { + found: true, + cancelled: false, + reason: result.found ? "Subagent was not running." : "Subagent task not found.", + task: cloneTaskRecord(task), + }; + } + } else { + return { + found: true, + cancelled: false, + reason: "Task runtime does not support cancellation yet.", + task: cloneTaskRecord(task), + }; + } + const updated = updateTask(task.taskId, { + status: "cancelled", + endedAt: Date.now(), + lastEventAt: Date.now(), + error: "Cancelled by operator.", + recentEvents: appendTaskEvent(task, { + at: Date.now(), + kind: "cancelled", + summary: "Cancelled by operator.", + }), + }); + if (updated) { + void maybeDeliverTaskTerminalUpdate(updated.taskId); + } + return { + found: true, + cancelled: true, + task: updated ?? cloneTaskRecord(task), + }; + } catch (error) { + return { + found: true, + cancelled: false, + reason: error instanceof Error ? error.message : String(error), + task: cloneTaskRecord(task), + }; + } +} + +export function listTaskRecords(): TaskRecord[] { + ensureTaskRegistryReady(); + return [...tasks.values()] + .map((task) => cloneTaskRecord(task)) + .toSorted((a, b) => b.createdAt - a.createdAt); +} + +export function getTaskRegistrySnapshot(): TaskRegistrySnapshot { + return { + tasks: listTaskRecords(), + }; +} + +export function getTaskById(taskId: string): TaskRecord | undefined { + ensureTaskRegistryReady(); + const task = tasks.get(taskId.trim()); + return task ? cloneTaskRecord(task) : undefined; +} + +export function findTaskByRunId(runId: string): TaskRecord | undefined { + ensureTaskRegistryReady(); + const task = pickPreferredRunIdTask(getTasksByRunId(runId)); + return task ? cloneTaskRecord(task) : undefined; +} + +export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined { + const key = sessionKey.trim(); + if (!key) { + return undefined; + } + return listTaskRecords().find( + (task) => task.childSessionKey === key || task.requesterSessionKey === key, + ); +} + +export function resolveTaskForLookupToken(token: string): TaskRecord | undefined { + const lookup = token.trim(); + if (!lookup) { + return undefined; + } + return getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForSessionKey(lookup); +} + +export function deleteTaskRecordById(taskId: string): boolean { + ensureTaskRegistryReady(); + const current = tasks.get(taskId); + if (!current) { + return false; + } + tasks.delete(taskId); + rebuildRunIdIndex(); + persistTaskRegistry(); + return true; +} + +export function resetTaskRegistryForTests(opts?: { persist?: boolean }) { + tasks.clear(); + taskIdsByRunId.clear(); + restoreAttempted = false; + if (listenerStop) { + listenerStop(); + listenerStop = null; + } + listenerStarted = false; + if (opts?.persist !== false) { + persistTaskRegistry(); + } +} diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts new file mode 100644 index 00000000000..caaeb812d5c --- /dev/null +++ b/src/tasks/task-registry.types.ts @@ -0,0 +1,68 @@ +import type { DeliveryContext } from "../utils/delivery-context.js"; + +export type TaskRuntime = "subagent" | "acp" | "cli"; + +export type TaskStatus = + | "accepted" + | "running" + | "done" + | "failed" + | "timed_out" + | "cancelled" + | "lost"; + +export type TaskDeliveryStatus = + | "pending" + | "delivered" + | "session_queued" + | "failed" + | "parent_missing" + | "not_applicable"; + +export type TaskNotifyPolicy = "done_only" | "state_changes" | "silent"; + +export type TaskBindingTargetKind = "subagent" | "session"; + +export type TaskSource = "sessions_spawn" | "background_cli" | "unknown"; + +export type TaskEventKind = TaskStatus | "progress"; + +export type TaskEventRecord = { + at: number; + kind: TaskEventKind; + summary?: string; +}; + +export type TaskRecord = { + taskId: string; + source: TaskSource; + runtime: TaskRuntime; + requesterSessionKey: string; + requesterOrigin?: DeliveryContext; + childSessionKey?: string; + runId?: string; + bindingTargetKind?: TaskBindingTargetKind; + label?: string; + task: string; + status: TaskStatus; + deliveryStatus: TaskDeliveryStatus; + notifyPolicy: TaskNotifyPolicy; + createdAt: number; + startedAt?: number; + endedAt?: number; + lastEventAt?: number; + error?: string; + progressSummary?: string; + terminalSummary?: string; + recentEvents?: TaskEventRecord[]; + lastNotifiedEventAt?: number; + transcriptPath?: string; + streamLogPath?: string; + backend?: string; + agentSessionId?: string; + backendSessionId?: string; +}; + +export type TaskRegistrySnapshot = { + tasks: TaskRecord[]; +};