diff --git a/.env.example b/.env.example index bf55cb4..f646b77 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,7 @@ AGENT_MAX_RECURSIVE_DEPTH=3 # Schema-driven orchestration limits AGENT_STATE_ROOT=.ai_ops/state +AGENT_PROJECT_CONTEXT_PATH=.ai_ops/project-context.json AGENT_TOPOLOGY_MAX_DEPTH=4 AGENT_TOPOLOGY_MAX_RETRIES=2 AGENT_RELATIONSHIP_MAX_CHILDREN=4 diff --git a/AGENTS.md b/AGENTS.md index 217451f..32142fa 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -25,6 +25,7 @@ - `AGENT_MAX_RECURSIVE_DEPTH` - Orchestration/context limits: - `AGENT_STATE_ROOT` + - `AGENT_PROJECT_CONTEXT_PATH` - `AGENT_TOPOLOGY_MAX_DEPTH` - `AGENT_TOPOLOGY_MAX_RETRIES` - `AGENT_RELATIONSHIP_MAX_CHILDREN` diff --git a/README.md b/README.md index 327d1b6..df32c47 100644 --- a/README.md +++ b/README.md @@ -2,40 +2,41 @@ TypeScript runtime for deterministic multi-agent execution with: -- OpenAI Codex SDK integration (`@openai/codex-sdk`) -- Anthropic Claude Agent SDK integration (`@anthropic-ai/claude-agent-sdk`) +- OpenAI Codex SDK (`@openai/codex-sdk`) +- Anthropic Claude Agent SDK (`@anthropic-ai/claude-agent-sdk`) - Schema-validated orchestration (`AgentManifest`) -- Stateless node handoffs via persisted state/context payloads +- DAG execution with topology-aware fan-out (`parallel`, `hierarchical`, `retry-unrolled`) +- Project-scoped persistent context store +- Typed domain events for edge-triggered routing - Resource provisioning (git worktrees + deterministic port ranges) -- MCP configuration layer with handler-based policy hooks +- MCP configuration layer with handler policy hooks -## Current Status +## Architecture Summary -- Provider entrypoints (`codex`, `claude`) run with session limits and resource provisioning. -- Schema-driven orchestration is implemented as reusable modules under `src/agents`. -- Recursive `AgentManager.runRecursiveAgent(...)` supports fanout/fan-in orchestration with abort propagation. -- Tool clearance allowlist/banlist is modeled, but hard security enforcement is still a TODO at tool execution boundaries. +- `SchemaDrivenExecutionEngine.runSession(...)` is the single execution entrypoint. +- `PipelineExecutor` owns runtime control flow and topology dispatch. +- `AgentManager` is an internal utility used by the pipeline when fan-out/retry-unrolled behavior is required. +- Session state is persisted under `AGENT_STATE_ROOT`. +- Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with domains: + - `globalFlags` + - `artifactPointers` + - `taskQueue` ## Repository Layout -- `src/agents`: - - `manager.ts`: queue-based concurrency limits + recursive fanout/fan-in orchestration. - - `runtime.ts`: env-driven runtime singletons and defaults. - - `manifest.ts`: `AgentManifest` schema parsing + validation (strict DAG). - - `persona-registry.ts`: prompt templating + persona behavior events. - - `pipeline.ts`: actor-oriented DAG runner with retries and state-dependent routing. - - `state-context.ts`: persisted state + stateless handoff reconstruction. - - `provisioning.ts`: extensible resource orchestration + child suballocation support. - - `orchestration.ts`: `SchemaDrivenExecutionEngine` facade. -- `src/mcp`: MCP config types, conversions, and handler resolution. -- `src/examples`: provider entrypoints (`codex.ts`, `claude.ts`). -- `tests`: unit coverage for manager, manifest, pipeline/orchestration, state context, MCP, and provisioning behavior. -- `docs/orchestration-engine.md`: design notes for the orchestration architecture. - -## Prerequisites - -- Node.js 18+ -- npm +- `src/agents` + - `orchestration.ts`: engine facade and runtime wiring + - `pipeline.ts`: DAG runner, retry matrix, abort propagation, domain-event routing + - `manifest.ts`: schema parsing/validation for personas/topologies/edges + - `manager.ts`: recursive fan-out utility used by pipeline + - `state-context.ts`: persisted node handoffs + session state + - `project-context.ts`: project-scoped store + - `domain-events.ts`: typed domain event schema + bus + - `runtime.ts`: env-driven defaults/singletons + - `provisioning.ts`: resource provisioning and child suballocation helpers +- `src/mcp`: MCP config types/conversion/handlers +- `src/examples`: provider entrypoints (`codex.ts`, `claude.ts`) +- `tests`: manager, manifest, pipeline/orchestration, state, provisioning, MCP ## Setup @@ -45,207 +46,53 @@ cp .env.example .env cp mcp.config.example.json mcp.config.json ``` -Fill in any values you need in `.env`. - ## Run -Run Codex example: - ```bash -npm run codex -- "Summarize what this repository does." +npm run codex -- "Summarize this repository." +npm run claude -- "Summarize this repository." ``` -Run Claude example: - -```bash -npm run claude -- "Summarize what this repository does." -``` - -Run via unified entrypoint: +Or via unified entrypoint: ```bash npm run dev -- codex "List potential improvements." npm run dev -- claude "List potential improvements." ``` -## Schema-Driven Orchestration +## Manifest Semantics -The orchestration engine is exposed as library modules (not yet wired into `src/index.ts` by default). +`AgentManifest` (schema `"1"`) validates: -Core pieces: +- supported topologies (`sequential`, `parallel`, `hierarchical`, `retry-unrolled`) +- persona definitions and tool-clearance metadata +- relationship DAG and unknown persona references +- strict pipeline DAG +- topology constraints (`maxDepth`, `maxRetries`) -- `parseAgentManifest(...)` validates the full orchestration schema. -- `PersonaRegistry` injects runtime context into templated system prompts. -- `PipelineExecutor` executes a strict DAG of actor nodes. -- `FileSystemStateContextManager` enforces stateless handoffs. -- `SchemaDrivenExecutionEngine` composes all of the above with env-driven limits. +Pipeline edges can route via: -### AgentManifest Overview +- legacy status triggers (`on`: `success`, `validation_fail`, `failure`, `always`, ...) +- domain event triggers (`event`: typed domain events) +- conditions (`state_flag`, `history_has_event`, `file_exists`, `always`) -`AgentManifest` (schema version `"1"`) includes: +## Domain Events -- `topologies`: any of `hierarchical`, `retry-unrolled`, `sequential` -- `personas`: identity, prompt template, tool clearance metadata -- `relationships`: parent-child persona edges and constraints -- `pipeline`: strict DAG with entry node, nodes, and edges -- `topologyConstraints`: max depth and retry ceilings +Domain events are typed and can trigger edges directly: -Edge routing supports: +- planning: `requirements_defined`, `tasks_planned` +- execution: `code_committed`, `task_blocked` +- validation: `validation_passed`, `validation_failed` +- integration: `branch_merged` -- Event gates: `success`, `validation_fail`, `failure`, `always`, `onTaskComplete`, `onValidationFail` -- Conditions: - - `state_flag` - - `history_has_event` - - `file_exists` - - `always` +Actors can emit events in `ActorExecutionResult.events`. Pipeline status also emits default validation/execution events. -Example manifest: +## Retry Matrix and Cancellation -```json -{ - "schemaVersion": "1", - "topologies": ["hierarchical", "retry-unrolled", "sequential"], - "personas": [ - { - "id": "coder", - "displayName": "Coder", - "systemPromptTemplate": "Implement ticket {{ticket}} in repo {{repo}}", - "toolClearance": { - "allowlist": ["read_file", "write_file"], - "banlist": ["rm"] - } - } - ], - "relationships": [], - "pipeline": { - "entryNodeId": "coder-1", - "nodes": [ - { - "id": "coder-1", - "actorId": "coder_actor", - "personaId": "coder", - "constraints": { "maxRetries": 1 } - } - ], - "edges": [] - }, - "topologyConstraints": { - "maxDepth": 4, - "maxRetries": 2 - } -} -``` - -### Minimal Engine Usage - -```ts -import { SchemaDrivenExecutionEngine } from "./src/agents/orchestration.js"; - -const engine = new SchemaDrivenExecutionEngine({ - manifest, - actorExecutors: { - coder_actor: async ({ prompt, context, toolClearance }) => { - // execute actor logic here - return { - status: "success", - payload: { - summary: "done" - }, - stateFlags: { - implemented: true - } - }; - } - }, - settings: { - workspaceRoot: process.cwd(), - runtimeContext: { - repo: "ai_ops", - ticket: "AIOPS-123" - } - } -}); - -const result = await engine.runSession({ - sessionId: "session-1", - initialPayload: { - task: "Implement feature" - } -}); - -console.log(result.records); -``` - -## Stateless Handoffs and Context - -The engine does not depend on conversational memory between nodes. - -- Node inputs are written as handoff payloads to storage. -- Each node execution reads a fresh context snapshot from disk. -- Session state persists: - - flags - - metadata - - history events - -Default state root is controlled by `AGENT_STATE_ROOT`. - -## Recursive Orchestration Contract - -`AgentManager.runRecursiveAgent(...)` uses a strict two-phase fanout/fan-in model: - -- Phase 1 (planner): agent execution returns either a terminal result or a fanout plan (`intents[]` + `aggregate(...)`). -- Parent tokens are released before children are scheduled, avoiding deadlocks even when `AGENT_MAX_CONCURRENT=1`. -- Children run in isolated deterministic session IDs (`_child_`), each with their own `AbortSignal`. -- Phase 2 (aggregator): once all children complete, the aggregate phase runs as a fresh invocation. - -Optional child middleware hooks (`allocateForChild`, `releaseForChild`) let callers integrate provisioning/suballocation without coupling `AgentManager` to filesystem or git operations. - -## Resource Provisioning - -The provisioning layer separates: - -- Hard constraints: actual resource allocation enforced before run. -- Soft constraints: injected env vars, prompt sections, metadata, and discovery snapshot. - -Built-in providers: - -- `git-worktree` -- `port-range` - -Runtime injection includes: - -- Working directory override -- Injected env vars such as `AGENT_WORKTREE_PATH`, `AGENT_PORT_RANGE_START`, `AGENT_PORT_RANGE_END`, `AGENT_PORT_PRIMARY` -- Discovery file path via `AGENT_DISCOVERY_FILE` - -### Hierarchical Suballocation - -Parent sessions can suballocate resources for child sessions using: - -- `ResourceProvisioningOrchestrator.provisionChildSession(...)` -- `buildChildResourceRequests(...)` - -Behavior: - -- Child worktrees are placed under a deterministic parent-scoped root. -- Child port blocks are deterministically carved from the parent assigned range. - -## MCP Configuration - -Use `mcp.config.json` to configure shared and provider-specific MCP servers. - -- `MCP_CONFIG_PATH` controls config location (default `./mcp.config.json`). -- Shared server definitions are in `servers`. -- Provider overrides: - - `codex.mcp_servers` - - `claude.mcpServers` -- Handlers: - - built-in `context7` - - built-in `claude-task-master` - - built-in `generic` - - custom handlers via `registerMcpHandler(...)` - -See `mcp.config.example.json` for a complete template. +- `validation_fail`: routed through retry-unrolled execution (new child manager session) +- hard failures: timeout/network/403-like failures tracked sequentially; at 2 consecutive hard failures the pipeline aborts fast +- `AbortSignal` is passed into every actor execution input +- session closure aborts child recursive work ## Environment Variables @@ -266,14 +113,15 @@ See `mcp.config.example.json` for a complete template. - `AGENT_MAX_SESSION` - `AGENT_MAX_RECURSIVE_DEPTH` -### Orchestration Limits +### Orchestration / Context - `AGENT_STATE_ROOT` +- `AGENT_PROJECT_CONTEXT_PATH` - `AGENT_TOPOLOGY_MAX_DEPTH` - `AGENT_TOPOLOGY_MAX_RETRIES` - `AGENT_RELATIONSHIP_MAX_CHILDREN` -### Provisioning +### Provisioning / Resource Controls - `AGENT_WORKTREE_ROOT` - `AGENT_WORKTREE_BASE_REF` @@ -288,13 +136,11 @@ Defaults are documented in `.env.example`. ## Quality Gate -Run the full pre-PR gate: - ```bash npm run verify ``` -Equivalent individual commands: +Equivalent: ```bash npm run check @@ -303,20 +149,7 @@ npm run test npm run build ``` -## Build and Start +## Notes -```bash -npm run build -npm run start -- codex "Hello from built JS" -``` - -## Known Limitations - -- Tool clearance allowlist/banlist is currently metadata; enforcement is not yet wired into an execution sandbox. - -## References - -- `docs/orchestration-engine.md` -- OpenAI Codex SDK docs: https://developers.openai.com/codex/sdk/ -- Codex MCP config docs: https://developers.openai.com/codex/config#model-context-protocol-mcp_servers -- Claude Agent SDK docs: https://platform.claude.com/docs/en/agent-sdk/overview +- Tool clearance allowlist/banlist is currently metadata only; hard enforcement must happen at the tool execution boundary. +- `AgentManager.runRecursiveAgent(...)` remains available for low-level testing, but pipeline execution should use `SchemaDrivenExecutionEngine.runSession(...)`. diff --git a/docs/orchestration-engine.md b/docs/orchestration-engine.md index 339c73d..3188f9f 100644 --- a/docs/orchestration-engine.md +++ b/docs/orchestration-engine.md @@ -9,27 +9,39 @@ The orchestration runtime introduces explicit schema validation and deterministi - `AgentManifest` schema (`src/agents/manifest.ts`): validates personas, relationships, topology constraints, and a strict DAG pipeline. - Persona registry (`src/agents/persona-registry.ts`): renders templated prompts with runtime context and routes behavioral events. - Stateful storage for stateless execution (`src/agents/state-context.ts`): each node execution reads payload + state from storage to get fresh context. -- DAG pipeline runner (`src/agents/pipeline.ts`): executes actor nodes, evaluates state/history/repo conditions, enforces retry/depth limits. -- Orchestration facade (`src/agents/orchestration.ts`): wires manifest + registry + pipeline + state manager with env-driven limits. +- DAG pipeline runner (`src/agents/pipeline.ts`): executes topology blocks, emits typed domain events, evaluates route conditions, and enforces retry/depth/failure limits. +- Project context store (`src/agents/project-context.ts`): project-scoped global flags, artifact pointers, and task queue persisted across sessions. +- Orchestration facade (`src/agents/orchestration.ts`): wires manifest + registry + pipeline + state manager + project context with env-driven limits. - Hierarchical resource suballocation (`src/agents/provisioning.ts`): builds child `git-worktree` and child `port-range` requests from parent allocation data. -- Recursive manager runtime (`src/agents/manager.ts`): queue-aware fanout/fan-in execution with fail-fast child cancellation and session-level abort propagation. +- Recursive manager runtime (`src/agents/manager.ts`): utility invoked by the pipeline engine for fan-out/retry-unrolled execution. ## Constraint model - Relationship constraints: per-edge limits (`maxDepth`, `maxChildren`) and process-level cap (`AGENT_RELATIONSHIP_MAX_CHILDREN`). -- Pipeline constraints: per-node retry limits and process-level cap (`AGENT_TOPOLOGY_MAX_RETRIES`). +- Pipeline constraints: per-node retry limits, retry-unrolled topology, and process-level cap (`AGENT_TOPOLOGY_MAX_RETRIES`). - Topology constraints: max depth and retries from manifest + env caps. ## Stateless handoffs -Node payloads are persisted under the state root. Nodes do not inherit in-memory conversational context from previous node runs. Fresh context is reconstructed from the handoff and persisted state each execution. +Node payloads are persisted under the state root. Nodes do not inherit in-memory conversational context from previous node runs. Fresh context is reconstructed from the handoff and persisted state each execution. Sessions load project context from `AGENT_PROJECT_CONTEXT_PATH` at initialization, and orchestration writes project updates on each node completion. -## Recursive execution model +## Execution topology model -- Recursive planning is schema-driven: a node returns child intents rather than imperatively spawning children. -- Parent execution ends before child runs begin; the parent token is released and reacquired only for aggregate phase execution. -- Child sessions use deterministic hierarchical IDs (`_child_`) and are cancellable through parent session closure. -- Resource orchestration remains external to `AgentManager` via middleware hooks for child allocation/release. +- Pipeline graph execution is DAG-based with ready-node frontiers. +- Nodes tagged with topology blocks `parallel`/`hierarchical` are dispatched concurrently (`Promise.all`) through `AgentManager`. +- Validation failures follow retry-unrolled behavior and are executed as new manager child sessions. +- Sequential hard failures (timeout/network/403-like) trigger fail-fast abort. +- `AbortSignal` is passed through actor execution input for immediate cancellation propagation. + +## Domain events + +- Domain event schema is strongly typed (`src/agents/domain-events.ts`). +- Standard event domains: + - planning: `requirements_defined`, `tasks_planned` + - execution: `code_committed`, `task_blocked` + - validation: `validation_passed`, `validation_failed` + - integration: `branch_merged` +- Pipeline edges can trigger on domain events (`edge.event`) in addition to legacy status triggers (`edge.on`). ## Security note diff --git a/src/agents/domain-events.ts b/src/agents/domain-events.ts new file mode 100644 index 0000000..63290f3 --- /dev/null +++ b/src/agents/domain-events.ts @@ -0,0 +1,112 @@ +import { randomUUID } from "node:crypto"; +import type { JsonObject } from "./types.js"; + +export type PlanningDomainEventType = "requirements_defined" | "tasks_planned"; +export type ExecutionDomainEventType = "code_committed" | "task_blocked"; +export type ValidationDomainEventType = "validation_passed" | "validation_failed"; +export type IntegrationDomainEventType = "branch_merged"; + +export type DomainEventType = + | PlanningDomainEventType + | ExecutionDomainEventType + | ValidationDomainEventType + | IntegrationDomainEventType; + +export type DomainEventSource = "pipeline" | "actor"; + +export type DomainEventPayload = { + summary?: string; + details?: JsonObject; + errorCode?: string; + artifactPointer?: string; +}; + +export type DomainEvent = { + id: string; + type: TType; + source: DomainEventSource; + sessionId: string; + nodeId: string; + attempt: number; + timestamp: string; + payload: DomainEventPayload; +}; + +export type DomainEventEmission = { + type: TType; + payload?: DomainEventPayload; +}; + +export type DomainEventHandler = ( + event: DomainEvent, +) => Promise | void; + +const DOMAIN_EVENT_TYPES = new Set([ + "requirements_defined", + "tasks_planned", + "code_committed", + "task_blocked", + "validation_passed", + "validation_failed", + "branch_merged", +]); + +export function isDomainEventType(value: string): value is DomainEventType { + return DOMAIN_EVENT_TYPES.has(value as DomainEventType); +} + +export function createDomainEvent(input: { + type: DomainEventType; + source: DomainEventSource; + sessionId: string; + nodeId: string; + attempt: number; + payload?: DomainEventPayload; +}): DomainEvent { + return { + id: randomUUID(), + type: input.type, + source: input.source, + sessionId: input.sessionId, + nodeId: input.nodeId, + attempt: input.attempt, + timestamp: new Date().toISOString(), + payload: input.payload ?? {}, + }; +} + +export class DomainEventBus { + private readonly handlers = new Map>(); + + subscribe( + type: TType, + handler: DomainEventHandler, + ): () => void { + const typedHandler = handler as DomainEventHandler; + const set = this.handlers.get(type); + if (set) { + set.add(typedHandler); + } else { + this.handlers.set(type, new Set([typedHandler])); + } + + return () => { + const current = this.handlers.get(type); + current?.delete(typedHandler); + if (current && current.size === 0) { + this.handlers.delete(type); + } + }; + } + + async publish(event: DomainEvent): Promise { + const set = this.handlers.get(event.type); + if (!set || set.size === 0) { + return; + } + + for (const handler of set) { + await handler(event); + } + } +} diff --git a/src/agents/manager.ts b/src/agents/manager.ts index 3fcc4a7..6b267db 100644 --- a/src/agents/manager.ts +++ b/src/agents/manager.ts @@ -297,6 +297,10 @@ export class AgentManager { } } + /** + * @deprecated Prefer running recursive topologies through SchemaDrivenExecutionEngine.runSession. + * This method remains available for internal orchestration and low-level tests. + */ async runRecursiveAgent(input: { sessionId: string; depth: number; diff --git a/src/agents/manifest.ts b/src/agents/manifest.ts index 9892533..4beec7a 100644 --- a/src/agents/manifest.ts +++ b/src/agents/manifest.ts @@ -1,4 +1,5 @@ import { isRecord } from "./types.js"; +import { isDomainEventType, type DomainEventType } from "./domain-events.js"; export type ToolClearancePolicy = { allowlist: string[]; @@ -45,23 +46,32 @@ export type PipelineConstraint = { maxRetries?: number; }; +export type NodeTopologyKind = "sequential" | "parallel" | "hierarchical" | "retry-unrolled"; + +export type PipelineNodeTopology = { + kind: NodeTopologyKind; + blockId?: string; +}; + export type PipelineNode = { id: string; actorId: string; personaId: string; constraints?: PipelineConstraint; + topology?: PipelineNodeTopology; }; export type PipelineEdge = { from: string; to: string; - on: + on?: | "success" | "validation_fail" | "failure" | "always" | "onTaskComplete" | "onValidationFail"; + event?: DomainEventType; when?: RouteCondition[]; }; @@ -71,7 +81,7 @@ export type PipelineGraph = { edges: PipelineEdge[]; }; -export type TopologyKind = "hierarchical" | "retry-unrolled" | "sequential"; +export type TopologyKind = "hierarchical" | "parallel" | "retry-unrolled" | "sequential"; export type TopologyConstraint = { maxDepth: number; @@ -216,6 +226,34 @@ function parsePipelineNode(value: unknown): PipelineNode { throw new Error("Pipeline node must be an object."); } + const topology = value.topology; + let parsedTopology: PipelineNodeTopology | undefined; + if (topology !== undefined) { + if (!isRecord(topology)) { + throw new Error("Pipeline node topology must be an object when provided."); + } + + const kind = readString(topology, "kind"); + if ( + kind !== "sequential" && + kind !== "parallel" && + kind !== "hierarchical" && + kind !== "retry-unrolled" + ) { + throw new Error(`Pipeline node topology kind "${kind}" is not supported.`); + } + + const blockIdRaw = topology.blockId; + if (blockIdRaw !== undefined && (typeof blockIdRaw !== "string" || blockIdRaw.trim().length === 0)) { + throw new Error("Pipeline node topology blockId must be a non-empty string when provided."); + } + + parsedTopology = { + kind, + ...(typeof blockIdRaw === "string" ? { blockId: blockIdRaw.trim() } : {}), + }; + } + const constraints = isRecord(value.constraints) ? { maxRetries: readOptionalInteger(value.constraints, "maxRetries", { min: 0 }), @@ -227,6 +265,7 @@ function parsePipelineNode(value: unknown): PipelineNode { actorId: readString(value, "actorId"), personaId: readString(value, "personaId"), constraints, + ...(parsedTopology ? { topology: parsedTopology } : {}), }; } @@ -235,8 +274,7 @@ function parsePipelineEdge(value: unknown): PipelineEdge { throw new Error("Pipeline edge must be an object."); } - const on = readString(value, "on"); - const validEvents: PipelineEdge["on"][] = [ + const validEvents: NonNullable[] = [ "success", "validation_fail", "failure", @@ -245,8 +283,29 @@ function parsePipelineEdge(value: unknown): PipelineEdge { "onValidationFail", ]; - if (!validEvents.includes(on as PipelineEdge["on"])) { - throw new Error(`Pipeline edge field \"on\" has unsupported event \"${on}\".`); + const rawOn = value.on; + let on: PipelineEdge["on"]; + if (rawOn !== undefined) { + if (typeof rawOn !== "string" || !validEvents.includes(rawOn as NonNullable)) { + throw new Error(`Pipeline edge field "on" has unsupported event "${String(rawOn)}".`); + } + on = rawOn as NonNullable; + } + + const rawDomainEvent = value.event; + let event: DomainEventType | undefined; + if (rawDomainEvent !== undefined) { + if (typeof rawDomainEvent !== "string" || !isDomainEventType(rawDomainEvent)) { + throw new Error(`Pipeline edge field "event" has unsupported domain event "${String(rawDomainEvent)}".`); + } + event = rawDomainEvent; + } + + if (!on && !event) { + throw new Error('Pipeline edge must provide either an "on" trigger or an "event" trigger.'); + } + if (on && event) { + throw new Error('Pipeline edge cannot define both "on" and "event" triggers simultaneously.'); } const rawWhen = value.when; @@ -263,7 +322,8 @@ function parsePipelineEdge(value: unknown): PipelineEdge { return { from: readString(value, "from"), to: readString(value, "to"), - on: on as PipelineEdge["on"], + ...(on ? { on } : {}), + ...(event ? { event } : {}), ...(when.length > 0 ? { when } : {}), }; } @@ -298,7 +358,7 @@ function parseTopologies(value: unknown): TopologyKind[] { throw new Error("Manifest topologies must be a non-empty array."); } - const valid = new Set(["hierarchical", "retry-unrolled", "sequential"]); + const valid = new Set(["hierarchical", "parallel", "retry-unrolled", "sequential"]); const result: TopologyKind[] = []; for (const item of value) { @@ -498,6 +558,12 @@ export function parseAgentManifest(input: unknown): AgentManifest { if (!personaIds.has(node.personaId)) { throw new Error(`Pipeline node \"${node.id}\" references unknown persona \"${node.personaId}\".`); } + + if (node.topology && !manifest.topologies.includes(node.topology.kind as TopologyKind)) { + throw new Error( + `Pipeline node "${node.id}" topology "${node.topology.kind}" is not listed in manifest topologies.`, + ); + } } assertPipelineDag(manifest.pipeline); diff --git a/src/agents/orchestration.ts b/src/agents/orchestration.ts index 7b83784..2657f80 100644 --- a/src/agents/orchestration.ts +++ b/src/agents/orchestration.ts @@ -1,17 +1,21 @@ import { resolve } from "node:path"; import { parseAgentManifest, type AgentManifest } from "./manifest.js"; +import { AgentManager } from "./manager.js"; import { PersonaRegistry, type PersonaBehaviorEvent, type PersonaBehaviorHandler, } from "./persona-registry.js"; import { PipelineExecutor, type ActorExecutor, type PipelineRunSummary } from "./pipeline.js"; +import { FileSystemProjectContextStore } from "./project-context.js"; +import { loadAgentManagerLimitsFromEnv } from "./runtime.js"; import { FileSystemStateContextManager, type StoredSessionState } from "./state-context.js"; import type { JsonObject } from "./types.js"; export type OrchestrationSettings = { workspaceRoot: string; stateRoot: string; + projectContextPath: string; maxDepth: number; maxRetries: number; maxChildren: number; @@ -51,12 +55,27 @@ function readOptionalStringEnv(key: "AGENT_STATE_ROOT", fallback: string): strin return raw; } +function readOptionalProjectContextPathEnv( + key: "AGENT_PROJECT_CONTEXT_PATH", + fallback: string, +): string { + const raw = process.env[key]?.trim(); + if (!raw) { + return fallback; + } + return raw; +} + export function loadOrchestrationSettingsFromEnv(): Omit< OrchestrationSettings, "workspaceRoot" | "runtimeContext" > { return { stateRoot: readOptionalStringEnv("AGENT_STATE_ROOT", ".ai_ops/state"), + projectContextPath: readOptionalProjectContextPathEnv( + "AGENT_PROJECT_CONTEXT_PATH", + ".ai_ops/project-context.json", + ), maxDepth: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_DEPTH", 4, 1), maxRetries: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_RETRIES", 2, 0), maxChildren: readOptionalIntegerEnv("AGENT_RELATIONSHIP_MAX_CHILDREN", 4, 1), @@ -91,9 +110,11 @@ export class SchemaDrivenExecutionEngine { private readonly manifest: AgentManifest; private readonly personaRegistry = new PersonaRegistry(); private readonly stateManager: FileSystemStateContextManager; + private readonly projectContextStore: FileSystemProjectContextStore; private readonly actorExecutors: ReadonlyMap; private readonly settings: OrchestrationSettings; private readonly childrenByParent: Map; + private readonly manager: AgentManager; constructor(input: { manifest: AgentManifest | unknown; @@ -103,6 +124,7 @@ export class SchemaDrivenExecutionEngine { workspaceRoot?: string; runtimeContext?: Record; }; + manager?: AgentManager; }) { this.manifest = parseAgentManifest(input.manifest); @@ -110,6 +132,7 @@ export class SchemaDrivenExecutionEngine { this.settings = { workspaceRoot: resolve(input.settings?.workspaceRoot ?? process.cwd()), stateRoot: resolve(input.settings?.stateRoot ?? defaults.stateRoot), + projectContextPath: resolve(input.settings?.projectContextPath ?? defaults.projectContextPath), maxDepth: input.settings?.maxDepth ?? defaults.maxDepth, maxRetries: input.settings?.maxRetries ?? defaults.maxRetries, maxChildren: input.settings?.maxChildren ?? defaults.maxChildren, @@ -121,8 +144,12 @@ export class SchemaDrivenExecutionEngine { this.stateManager = new FileSystemStateContextManager({ rootDirectory: this.settings.stateRoot, }); + this.projectContextStore = new FileSystemProjectContextStore({ + filePath: this.settings.projectContextPath, + }); this.actorExecutors = toExecutorMap(input.actorExecutors); + this.manager = input.manager ?? new AgentManager(loadAgentManagerLimitsFromEnv()); for (const persona of this.manifest.personas) { this.personaRegistry.register({ @@ -182,7 +209,11 @@ export class SchemaDrivenExecutionEngine { sessionId: string; initialPayload: JsonObject; initialState?: Partial; + signal?: AbortSignal; }): Promise { + const managerSessionId = `${input.sessionId}__pipeline`; + const managerSession = this.manager.createSession(managerSessionId); + const executor = new PipelineExecutor( this.manifest, this.personaRegistry, @@ -193,14 +224,21 @@ export class SchemaDrivenExecutionEngine { runtimeContext: this.settings.runtimeContext, maxDepth: Math.min(this.settings.maxDepth, this.manifest.topologyConstraints.maxDepth), maxRetries: Math.min(this.settings.maxRetries, this.manifest.topologyConstraints.maxRetries), + manager: this.manager, + managerSessionId, + projectContextStore: this.projectContextStore, }, ); - - return executor.run({ - sessionId: input.sessionId, - initialPayload: input.initialPayload, - initialState: input.initialState, - }); + try { + return await executor.run({ + sessionId: input.sessionId, + initialPayload: input.initialPayload, + initialState: input.initialState, + signal: input.signal, + }); + } finally { + managerSession.close(); + } } private assertRelationshipConstraints(): void { diff --git a/src/agents/pipeline.ts b/src/agents/pipeline.ts index 798dc5d..f78b792 100644 --- a/src/agents/pipeline.ts +++ b/src/agents/pipeline.ts @@ -1,8 +1,18 @@ -import { access } from "node:fs/promises"; import { constants as fsConstants } from "node:fs"; +import { access } from "node:fs/promises"; import { resolve } from "node:path"; +import { + createDomainEvent, + DomainEventBus, + type DomainEvent, + type DomainEventEmission, + type DomainEventPayload, + type DomainEventType, +} from "./domain-events.js"; import type { AgentManifest, PipelineEdge, PipelineNode, RouteCondition } from "./manifest.js"; +import type { AgentManager, RecursiveChildIntent } from "./manager.js"; import { PersonaRegistry } from "./persona-registry.js"; +import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js"; import { FileSystemStateContextManager, type NodeExecutionContext, @@ -12,12 +22,17 @@ import { import type { JsonObject } from "./types.js"; export type ActorResultStatus = "success" | "validation_fail" | "failure"; +export type ActorFailureKind = "soft" | "hard"; export type ActorExecutionResult = { status: ActorResultStatus; payload?: JsonObject; stateFlags?: Record; stateMetadata?: JsonObject; + events?: DomainEventEmission[]; + projectContextPatch?: ProjectContextPatch; + failureKind?: ActorFailureKind; + failureCode?: string; }; export type ActorExecutionInput = { @@ -25,6 +40,7 @@ export type ActorExecutionInput = { node: PipelineNode; prompt: string; context: NodeExecutionContext; + signal: AbortSignal; toolClearance: { allowlist: string[]; banlist: string[]; @@ -38,11 +54,13 @@ export type PipelineExecutionRecord = { depth: number; attempt: number; status: ActorResultStatus; + emittedEvents: DomainEventType[]; }; export type PipelineRunSummary = { sessionId: string; records: PipelineExecutionRecord[]; + events: DomainEvent[]; finalState: StoredSessionState; }; @@ -51,6 +69,9 @@ export type PipelineExecutorOptions = { runtimeContext: Record; maxDepth: number; maxRetries: number; + manager: AgentManager; + managerSessionId: string; + projectContextStore: FileSystemProjectContextStore; }; type QueueItem = { @@ -58,6 +79,37 @@ type QueueItem = { depth: number; }; +type ExecutionGroup = { + concurrent: boolean; + items: QueueItem[]; +}; + +type RetryIntent = RecursiveChildIntent & { + context: { + attempt: number; + }; +}; + +type NodeAttemptResult = { + attempt: number; + depth: number; + result: ActorExecutionResult; + payloadForNext: JsonObject; + domainEvents: DomainEvent[]; + hardFailure: boolean; +}; + +type NodeExecutionOutcome = { + queueItem: QueueItem; + finalResult: ActorExecutionResult; + finalPayload: JsonObject; + finalAttempt: number; + finalEventTypes: DomainEventType[]; + records: PipelineExecutionRecord[]; + events: DomainEvent[]; + hardFailureAttempts: boolean[]; +}; + function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined { if (status === "success") { return "onTaskComplete"; @@ -68,7 +120,15 @@ function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValid return undefined; } -function shouldEdgeRun(edge: PipelineEdge, status: ActorResultStatus): boolean { +function shouldEdgeRun( + edge: PipelineEdge, + status: ActorResultStatus, + emittedEventTypes: ReadonlySet, +): boolean { + if (edge.event) { + return emittedEventTypes.has(edge.event); + } + if (edge.on === "always") { return true; } @@ -136,9 +196,134 @@ async function edgeConditionsSatisfied( return true; } +function throwIfAborted(signal?: AbortSignal): void { + if (!signal?.aborted) { + return; + } + + if (signal.reason instanceof Error) { + throw signal.reason; + } + + const error = new Error( + signal.reason === undefined + ? "The pipeline run was aborted." + : `The pipeline run was aborted: ${String(signal.reason)}.`, + ); + error.name = "AbortError"; + throw error; +} + +function toAbortError(signal: AbortSignal): Error { + if (signal.reason instanceof Error) { + return signal.reason; + } + + const error = new Error( + signal.reason === undefined + ? "The pipeline run was aborted." + : `The pipeline run was aborted: ${String(signal.reason)}.`, + ); + error.name = "AbortError"; + return error; +} + +function toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + return String(error); +} + +function toErrorPayload(error: unknown): JsonObject { + const errorMessage = toErrorMessage(error); + return { + error: errorMessage, + }; +} + +function toFailureCodeFromError(error: unknown): string | undefined { + if (!(error instanceof Error)) { + return undefined; + } + + const maybeCode = (error as NodeJS.ErrnoException).code; + return typeof maybeCode === "string" ? maybeCode : undefined; +} + +function containsHardFailureSignal(value: string): boolean { + return /(timeout|timed out|network|econnreset|econnrefused|enotfound|403|forbidden)/i.test(value); +} + +function inferHardFailure(result: ActorExecutionResult): boolean { + if (result.failureKind === "hard") { + return true; + } + + if (result.status !== "failure") { + return false; + } + + const payloadText = (() => { + const message = result.payload?.error; + return typeof message === "string" ? message : ""; + })(); + + const codeText = result.failureCode ?? ""; + return containsHardFailureSignal(`${codeText} ${payloadText}`); +} + +function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload { + if (status === "success") { + return { + summary: "Node completed successfully.", + }; + } + if (status === "validation_fail") { + return { + summary: "Node failed validation and requested retry/unrolled remediation.", + }; + } + return { + summary: "Node execution failed and was marked blocked.", + }; +} + +function defaultEventsForStatus(status: ActorResultStatus): DomainEventType[] { + if (status === "success") { + return ["validation_passed"]; + } + if (status === "validation_fail") { + return ["validation_failed"]; + } + return ["task_blocked"]; +} + +function getExecutionMode(node: PipelineNode): "sequential" | "parallel" | "hierarchical" { + if (node.topology?.kind === "parallel") { + return "parallel"; + } + if (node.topology?.kind === "hierarchical") { + return "hierarchical"; + } + return "sequential"; +} + +function toBlockKey(item: QueueItem, node: PipelineNode): string { + const mode = getExecutionMode(node); + if (mode === "sequential") { + return `sequential:${item.nodeId}`; + } + + const blockId = node.topology?.blockId; + return blockId ? `${mode}:${blockId}` : `${mode}:${item.nodeId}`; +} + export class PipelineExecutor { private readonly nodeById = new Map(); private readonly edgesBySource = new Map(); + private readonly domainEventBus = new DomainEventBus(); + private managerRunCounter = 0; constructor( private readonly manifest: AgentManifest, @@ -165,150 +350,481 @@ export class PipelineExecutor { sessionId: string; initialPayload: JsonObject; initialState?: Partial; + signal?: AbortSignal; }): Promise { - await this.stateManager.initializeSession(input.sessionId, input.initialState); + const projectContext = await this.options.projectContextStore.readState(); + + await this.stateManager.initializeSession(input.sessionId, { + ...(input.initialState ?? {}), + flags: { + ...projectContext.globalFlags, + ...(input.initialState?.flags ?? {}), + }, + metadata: { + project_context: { + globalFlags: { ...projectContext.globalFlags }, + artifactPointers: { ...projectContext.artifactPointers }, + taskQueue: projectContext.taskQueue.map((task) => ({ + id: task.id, + title: task.title, + status: task.status, + ...(task.assignee ? { assignee: task.assignee } : {}), + ...(task.metadata ? { metadata: task.metadata } : {}), + })), + }, + ...((input.initialState?.metadata ?? {}) as JsonObject), + }, + }); + await this.stateManager.writeHandoff(input.sessionId, { nodeId: this.manifest.pipeline.entryNodeId, payload: input.initialPayload, }); const records: PipelineExecutionRecord[] = []; - const queue: QueueItem[] = [{ nodeId: this.manifest.pipeline.entryNodeId, depth: 0 }]; - const attemptsByNode = new Map(); + const events: DomainEvent[] = []; + const ready = new Map([[this.manifest.pipeline.entryNodeId, 0]]); + const completedNodes = new Set(); - const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 3); + const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 4); let executionCount = 0; + let sequentialHardFailures = 0; - while (queue.length > 0) { - const item = queue.shift(); - if (!item) { - continue; + while (ready.size > 0) { + throwIfAborted(input.signal); + const frontier: QueueItem[] = [...ready.entries()].map(([nodeId, depth]) => ({ nodeId, depth })); + ready.clear(); + + for (const group of this.buildExecutionGroups(frontier)) { + const groupResults = group.concurrent + ? await Promise.all( + group.items.map((queueItem) => this.executeNode({ ...queueItem, sessionId: input.sessionId, signal: input.signal })), + ) + : await this.executeSequentialGroup(input.sessionId, group.items, input.signal); + + for (const nodeResult of groupResults) { + records.push(...nodeResult.records); + events.push(...nodeResult.events); + + executionCount += nodeResult.records.length; + if (executionCount > maxExecutions) { + throw new Error("Pipeline execution exceeded the configured safe execution bound."); + } + + for (const wasHardFailure of nodeResult.hardFailureAttempts) { + if (wasHardFailure) { + sequentialHardFailures += 1; + } else { + sequentialHardFailures = 0; + } + + if (sequentialHardFailures >= 2) { + throw new Error( + "Hard failure threshold reached (>=2 sequential API/network/403 failures). Pipeline aborted.", + ); + } + } + + completedNodes.add(nodeResult.queueItem.nodeId); + + const state = await this.stateManager.readState(input.sessionId); + const candidateEdges = this.edgesBySource.get(nodeResult.queueItem.nodeId) ?? []; + const eventTypes = new Set(nodeResult.finalEventTypes); + + for (const edge of candidateEdges) { + if (!shouldEdgeRun(edge, nodeResult.finalResult.status, eventTypes)) { + continue; + } + + if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) { + continue; + } + + if (completedNodes.has(edge.to)) { + continue; + } + + await this.stateManager.writeHandoff(input.sessionId, { + nodeId: edge.to, + fromNodeId: nodeResult.queueItem.nodeId, + payload: nodeResult.finalPayload, + }); + + const nextDepth = nodeResult.queueItem.depth + 1; + const existingDepth = ready.get(edge.to); + if (existingDepth === undefined || nextDepth < existingDepth) { + ready.set(edge.to, nextDepth); + } + } + } } + } - executionCount += 1; - if (executionCount > maxExecutions) { - throw new Error("Pipeline execution exceeded the configured safe execution bound."); - } - - if (item.depth > this.options.maxDepth) { - throw new Error( - `Pipeline depth ${String(item.depth)} exceeds configured maxDepth ${String(this.options.maxDepth)}.`, - ); - } - - const node = this.nodeById.get(item.nodeId); - if (!node) { - throw new Error(`Pipeline node \"${item.nodeId}\" is not defined.`); - } - - const executor = this.actorExecutors.get(node.actorId); - if (!executor) { - throw new Error(`No actor executor registered for actor \"${node.actorId}\".`); - } - - const context = await this.stateManager.buildFreshNodeContext(input.sessionId, node.id); - const prompt = this.personaRegistry.renderSystemPrompt({ - personaId: node.personaId, - runtimeContext: { - ...this.options.runtimeContext, - session_id: input.sessionId, - node_id: node.id, - depth: item.depth, + const finalState = await this.stateManager.readState(input.sessionId); + if (records.length > 0 && records[records.length - 1]?.status === "success") { + await this.options.projectContextStore.patchState({ + artifactPointers: { + [`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId), }, }); - - const result = await executor({ - sessionId: input.sessionId, - node, - prompt, - context, - toolClearance: this.personaRegistry.getToolClearance(node.personaId), - }); - - const attempt = (attemptsByNode.get(node.id) ?? 0) + 1; - attemptsByNode.set(node.id, attempt); - records.push({ - nodeId: node.id, - depth: item.depth, - attempt, - status: result.status, - }); - - const behaviorEvent = toBehaviorEvent(result.status); - const behaviorPatch = behaviorEvent - ? await this.personaRegistry.emitBehaviorEvent({ - personaId: node.personaId, - event: behaviorEvent, - sessionId: input.sessionId, - nodeId: node.id, - payload: result.payload ?? {}, - }) - : {}; - - const historyEvent: SessionHistoryEntry = { - nodeId: node.id, - event: result.status, - timestamp: new Date().toISOString(), - ...(result.payload ? { data: result.payload } : {}), - }; - - await this.stateManager.patchState(input.sessionId, { - ...(result.stateFlags ? { flags: result.stateFlags } : {}), - metadata: { - ...(result.stateMetadata ?? {}), - ...behaviorPatch, - }, - historyEvent, - }); - - const maxRetriesForNode = Math.min( - node.constraints?.maxRetries ?? this.manifest.topologyConstraints.maxRetries, - this.options.maxRetries, - ); - - if (result.status !== "success" && attempt <= maxRetriesForNode + 1) { - await this.stateManager.writeHandoff(input.sessionId, { - nodeId: node.id, - fromNodeId: node.id, - payload: result.payload ?? context.handoff.payload, - }); - - queue.push({ - nodeId: node.id, - depth: item.depth, - }); - continue; - } - - const state = await this.stateManager.readState(input.sessionId); - const candidateEdges = this.edgesBySource.get(node.id) ?? []; - - for (const edge of candidateEdges) { - if (!shouldEdgeRun(edge, result.status)) { - continue; - } - - if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) { - continue; - } - - await this.stateManager.writeHandoff(input.sessionId, { - nodeId: edge.to, - fromNodeId: node.id, - payload: result.payload ?? context.handoff.payload, - }); - - queue.push({ - nodeId: edge.to, - depth: item.depth + 1, - }); - } } return { sessionId: input.sessionId, records, - finalState: await this.stateManager.readState(input.sessionId), + events, + finalState, }; } + + private buildExecutionGroups(frontier: QueueItem[]): ExecutionGroup[] { + const groupsByKey = new Map(); + + for (const item of frontier) { + const node = this.nodeById.get(item.nodeId); + if (!node) { + throw new Error(`Pipeline node "${item.nodeId}" is not defined.`); + } + + const mode = getExecutionMode(node); + const key = toBlockKey(item, node); + const existing = groupsByKey.get(key); + if (existing) { + existing.items.push(item); + } else { + groupsByKey.set(key, { + concurrent: mode === "parallel" || mode === "hierarchical", + items: [item], + }); + } + } + + return [...groupsByKey.values()]; + } + + private async executeSequentialGroup( + sessionId: string, + items: QueueItem[], + signal?: AbortSignal, + ): Promise { + const results: NodeExecutionOutcome[] = []; + for (const item of items) { + results.push(await this.executeNode({ ...item, sessionId, signal })); + } + return results; + } + + private shouldRetryValidation(node: PipelineNode): boolean { + if (node.topology?.kind === "retry-unrolled") { + return true; + } + return this.manifest.topologies.includes("retry-unrolled"); + } + + private getMaxRetriesForNode(node: PipelineNode): number { + return Math.min( + node.constraints?.maxRetries ?? this.manifest.topologyConstraints.maxRetries, + this.options.maxRetries, + ); + } + + private buildManagerRunSessionId(nodeId: string): string { + this.managerRunCounter += 1; + const safeNodeId = nodeId.replace(/[^a-zA-Z0-9_-]/g, "_"); + return `${this.options.managerSessionId}__${safeNodeId}__run_${String(this.managerRunCounter)}`; + } + + private async executeNode(input: { + sessionId: string; + nodeId: string; + depth: number; + signal?: AbortSignal; + }): Promise { + const { sessionId, nodeId, depth, signal } = input; + + if (depth > this.options.maxDepth) { + throw new Error( + `Pipeline depth ${String(depth)} exceeds configured maxDepth ${String(this.options.maxDepth)}.`, + ); + } + + const node = this.nodeById.get(nodeId); + if (!node) { + throw new Error(`Pipeline node "${nodeId}" is not defined.`); + } + + const executor = this.actorExecutors.get(node.actorId); + if (!executor) { + throw new Error(`No actor executor registered for actor "${node.actorId}".`); + } + + const managerRunSessionId = this.buildManagerRunSessionId(node.id); + const managerRunSession = this.options.manager.createSession(managerRunSessionId, { + parentSessionId: this.options.managerSessionId, + }); + + const nodeRecords: PipelineExecutionRecord[] = []; + const nodeEvents: DomainEvent[] = []; + const hardFailureAttempts: boolean[] = []; + const maxRetriesForNode = this.getMaxRetriesForNode(node); + + try { + const output = await this.options.manager.runRecursiveAgent({ + sessionId: managerRunSessionId, + depth, + signal, + run: async ({ intent, depth: recursiveDepth, signal: recursiveSignal }) => { + const attempt = (() => { + const intentAttempt = intent?.context?.attempt; + if (typeof intentAttempt === "number" && Number.isInteger(intentAttempt) && intentAttempt >= 1) { + return intentAttempt; + } + return 1; + })(); + + const context = await this.stateManager.buildFreshNodeContext(sessionId, node.id); + const prompt = this.personaRegistry.renderSystemPrompt({ + personaId: node.personaId, + runtimeContext: { + ...this.options.runtimeContext, + session_id: sessionId, + node_id: node.id, + depth: recursiveDepth, + attempt, + }, + }); + + const result = await this.invokeActorExecutor({ + sessionId, + node, + prompt, + context, + signal: recursiveSignal, + executor, + }); + + const domainEvents = this.createAttemptDomainEvents({ + sessionId, + nodeId: node.id, + attempt, + status: result.status, + customEvents: result.events, + }); + + await this.persistNodeAttempt({ + sessionId, + node, + attempt, + result, + domainEvents, + }); + + const emittedEventTypes = domainEvents.map((event) => event.type); + nodeRecords.push({ + nodeId: node.id, + depth: recursiveDepth, + attempt, + status: result.status, + emittedEvents: emittedEventTypes, + }); + nodeEvents.push(...domainEvents); + + const hardFailure = inferHardFailure(result); + hardFailureAttempts.push(hardFailure); + + const payloadForNext = result.payload ?? context.handoff.payload; + const shouldRetry = + result.status === "validation_fail" && + this.shouldRetryValidation(node) && + attempt <= maxRetriesForNode; + + if (!shouldRetry) { + return { + type: "complete" as const, + output: { + attempt, + depth: recursiveDepth, + result, + payloadForNext, + domainEvents, + hardFailure, + }, + }; + } + + await this.stateManager.writeHandoff(sessionId, { + nodeId: node.id, + fromNodeId: node.id, + payload: payloadForNext, + }); + + return { + type: "fanout" as const, + intents: [ + { + persona: node.personaId, + task: `retry-${node.id}-attempt-${String(attempt + 1)}`, + context: { + attempt: attempt + 1, + }, + }, + ], + aggregate: ({ childResults }) => { + const first = childResults[0]; + if (!first) { + throw new Error(`Retry aggregation for node "${node.id}" did not receive child output.`); + } + return first.output; + }, + }; + }, + }); + + return { + queueItem: { + nodeId: node.id, + depth, + }, + finalResult: output.result, + finalPayload: output.payloadForNext, + finalAttempt: output.attempt, + finalEventTypes: output.domainEvents.map((event) => event.type), + records: nodeRecords, + events: nodeEvents, + hardFailureAttempts, + }; + } finally { + managerRunSession.close(); + } + } + + private async invokeActorExecutor(input: { + sessionId: string; + node: PipelineNode; + prompt: string; + context: NodeExecutionContext; + signal: AbortSignal; + executor: ActorExecutor; + }): Promise { + try { + throwIfAborted(input.signal); + return await input.executor({ + sessionId: input.sessionId, + node: input.node, + prompt: input.prompt, + context: input.context, + signal: input.signal, + toolClearance: this.personaRegistry.getToolClearance(input.node.personaId), + }); + } catch (error) { + if (input.signal.aborted) { + throw toAbortError(input.signal); + } + + const failureCode = toFailureCodeFromError(error); + const failureKind = containsHardFailureSignal(`${failureCode ?? ""} ${toErrorMessage(error)}`) + ? "hard" + : "soft"; + + return { + status: "failure", + payload: toErrorPayload(error), + failureCode, + failureKind, + }; + } + } + + private createAttemptDomainEvents(input: { + sessionId: string; + nodeId: string; + attempt: number; + status: ActorResultStatus; + customEvents?: DomainEventEmission[]; + }): DomainEvent[] { + const eventPayloadByType = new Map(); + + for (const type of defaultEventsForStatus(input.status)) { + eventPayloadByType.set(type, defaultEventPayloadForStatus(input.status)); + } + + for (const customEvent of input.customEvents ?? []) { + eventPayloadByType.set(customEvent.type, customEvent.payload ?? {}); + } + + return [...eventPayloadByType.entries()].map(([type, payload]) => + createDomainEvent({ + type, + source: "actor", + sessionId: input.sessionId, + nodeId: input.nodeId, + attempt: input.attempt, + payload, + }), + ); + } + + private async persistNodeAttempt(input: { + sessionId: string; + node: PipelineNode; + attempt: number; + result: ActorExecutionResult; + domainEvents: DomainEvent[]; + }): Promise { + const behaviorEvent = toBehaviorEvent(input.result.status); + const behaviorPatch = behaviorEvent + ? await this.personaRegistry.emitBehaviorEvent({ + personaId: input.node.personaId, + event: behaviorEvent, + sessionId: input.sessionId, + nodeId: input.node.id, + payload: input.result.payload ?? {}, + }) + : {}; + + const legacyHistoryEvent: SessionHistoryEntry = { + nodeId: input.node.id, + event: input.result.status, + timestamp: new Date().toISOString(), + ...(input.result.payload ? { data: input.result.payload } : {}), + }; + + const domainHistoryEvents: SessionHistoryEntry[] = input.domainEvents.map((event) => ({ + nodeId: input.node.id, + event: event.type, + timestamp: event.timestamp, + data: { + source: event.source, + attempt: event.attempt, + ...(event.payload.summary ? { summary: event.payload.summary } : {}), + ...(event.payload.errorCode ? { errorCode: event.payload.errorCode } : {}), + ...(event.payload.artifactPointer ? { artifactPointer: event.payload.artifactPointer } : {}), + ...(event.payload.details ? { details: event.payload.details } : {}), + }, + })); + + await this.stateManager.patchState(input.sessionId, { + ...(input.result.stateFlags ? { flags: input.result.stateFlags } : {}), + metadata: { + ...(input.result.stateMetadata ?? {}), + ...behaviorPatch, + }, + historyEvent: legacyHistoryEvent, + historyEvents: domainHistoryEvents, + }); + + for (const event of input.domainEvents) { + await this.domainEventBus.publish(event); + } + + const patch: ProjectContextPatch = { + ...(input.result.projectContextPatch ?? {}), + artifactPointers: { + [`sessions/${input.sessionId}/last_completed_node`]: input.node.id, + [`sessions/${input.sessionId}/last_attempt`]: String(input.attempt), + ...(input.result.projectContextPatch?.artifactPointers ?? {}), + }, + }; + await this.options.projectContextStore.patchState(patch); + } } diff --git a/src/agents/project-context.ts b/src/agents/project-context.ts new file mode 100644 index 0000000..e96e07c --- /dev/null +++ b/src/agents/project-context.ts @@ -0,0 +1,218 @@ +import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { dirname, resolve } from "node:path"; +import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./types.js"; + +export type ProjectTaskStatus = "pending" | "in_progress" | "blocked" | "done"; + +export type ProjectTask = { + id: string; + title: string; + status: ProjectTaskStatus; + assignee?: string; + metadata?: JsonObject; +}; + +export type ProjectContextState = { + globalFlags: Record; + artifactPointers: Record; + taskQueue: ProjectTask[]; +}; + +export type ProjectContextPatch = { + globalFlags?: Record; + artifactPointers?: Record; + taskQueue?: ProjectTask[]; + enqueueTasks?: ProjectTask[]; + upsertTasks?: ProjectTask[]; +}; + +const DEFAULT_PROJECT_CONTEXT: ProjectContextState = { + globalFlags: {}, + artifactPointers: {}, + taskQueue: [], +}; + +function assertNonEmptyString(value: unknown, label: string): string { + if (typeof value !== "string" || value.trim().length === 0) { + throw new Error(`${label} must be a non-empty string.`); + } + return value.trim(); +} + +function toJsonObject(value: unknown, label: string): JsonObject { + if (!isRecord(value)) { + throw new Error(`${label} is malformed.`); + } + return value as JsonObject; +} + +function toTaskStatus(value: unknown, label: string): ProjectTaskStatus { + if (value === "pending" || value === "in_progress" || value === "blocked" || value === "done") { + return value; + } + throw new Error(`${label} has unsupported status "${String(value)}".`); +} + +function toProjectTask(value: unknown, label: string): ProjectTask { + if (!isRecord(value)) { + throw new Error(`${label} is malformed.`); + } + + const assignee = value.assignee; + if (assignee !== undefined && (typeof assignee !== "string" || assignee.trim().length === 0)) { + throw new Error(`${label}.assignee must be a non-empty string when provided.`); + } + + return { + id: assertNonEmptyString(value.id, `${label}.id`), + title: assertNonEmptyString(value.title, `${label}.title`), + status: toTaskStatus(value.status, `${label}.status`), + ...(typeof assignee === "string" ? { assignee: assignee.trim() } : {}), + ...(value.metadata !== undefined + ? { metadata: toJsonObject(value.metadata, `${label}.metadata`) } + : {}), + }; +} + +function toBooleanRecord(value: unknown, label: string): Record { + if (!isRecord(value)) { + throw new Error(`${label} is malformed.`); + } + + const out: Record = {}; + for (const [key, raw] of Object.entries(value)) { + if (typeof raw !== "boolean") { + throw new Error(`${label}.${key} must be a boolean.`); + } + out[key] = raw; + } + + return out; +} + +function toStringRecord(value: unknown, label: string): Record { + if (!isRecord(value)) { + throw new Error(`${label} is malformed.`); + } + + const out: Record = {}; + for (const [key, raw] of Object.entries(value)) { + out[key] = assertNonEmptyString(raw, `${label}.${key}`); + } + + return out; +} + +function toProjectContextState(value: unknown): ProjectContextState { + if (!isRecord(value)) { + throw new Error("Project context store is malformed."); + } + + const tasksRaw = value.taskQueue; + if (!Array.isArray(tasksRaw)) { + throw new Error("Project context taskQueue is malformed."); + } + + return { + globalFlags: toBooleanRecord(value.globalFlags, "Project context globalFlags"), + artifactPointers: toStringRecord(value.artifactPointers, "Project context artifactPointers"), + taskQueue: tasksRaw.map((task, index) => toProjectTask(task, `Project context taskQueue[${String(index)}]`)), + }; +} + +function cloneState(state: ProjectContextState): ProjectContextState { + return deepCloneJson(state as JsonValue) as ProjectContextState; +} + +function mergeUpsertTasks(current: ProjectTask[], upserts: ProjectTask[]): ProjectTask[] { + if (upserts.length === 0) { + return current; + } + + const byId = new Map(); + for (const task of current) { + byId.set(task.id, task); + } + for (const task of upserts) { + byId.set(task.id, task); + } + + return [...byId.values()]; +} + +export class FileSystemProjectContextStore { + private readonly filePath: string; + private queue: Promise = Promise.resolve(); + + constructor(input: { filePath: string }) { + this.filePath = resolve(input.filePath); + } + + getFilePath(): string { + return this.filePath; + } + + async readState(): Promise { + try { + const content = await readFile(this.filePath, "utf8"); + const parsed = JSON.parse(content) as unknown; + return toProjectContextState(parsed); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return cloneState(DEFAULT_PROJECT_CONTEXT); + } + throw error; + } + } + + async writeState(state: ProjectContextState): Promise { + await this.runSerialized(async () => { + await mkdir(dirname(this.filePath), { recursive: true }); + await writeFile(this.filePath, `${JSON.stringify(state, null, 2)}\n`, "utf8"); + }); + } + + async patchState(patch: ProjectContextPatch): Promise { + return this.runSerialized(async () => { + const current = await this.readState(); + + if (patch.globalFlags) { + Object.assign(current.globalFlags, patch.globalFlags); + } + if (patch.artifactPointers) { + Object.assign(current.artifactPointers, patch.artifactPointers); + } + if (patch.taskQueue) { + current.taskQueue = patch.taskQueue.map((task, index) => + toProjectTask(task, `Project context patch taskQueue[${String(index)}]`), + ); + } + if (patch.enqueueTasks && patch.enqueueTasks.length > 0) { + current.taskQueue.push( + ...patch.enqueueTasks.map((task, index) => + toProjectTask(task, `Project context patch enqueueTasks[${String(index)}]`), + ), + ); + } + if (patch.upsertTasks && patch.upsertTasks.length > 0) { + const upsertTasks = patch.upsertTasks.map((task, index) => + toProjectTask(task, `Project context patch upsertTasks[${String(index)}]`), + ); + current.taskQueue = mergeUpsertTasks(current.taskQueue, upsertTasks); + } + + await mkdir(dirname(this.filePath), { recursive: true }); + await writeFile(this.filePath, `${JSON.stringify(current, null, 2)}\n`, "utf8"); + return current; + }); + } + + private runSerialized(operation: () => Promise): Promise { + const run = this.queue.then(operation, operation); + this.queue = run.then( + () => undefined, + () => undefined, + ); + return run; + } +} diff --git a/src/agents/state-context.ts b/src/agents/state-context.ts index f68cc33..6a8ae97 100644 --- a/src/agents/state-context.ts +++ b/src/agents/state-context.ts @@ -161,6 +161,10 @@ export class FileSystemStateContextManager { return this.rootDirectory; } + getSessionStatePath(sessionId: string): string { + return toStatePath(this.rootDirectory, sessionId); + } + async initializeSession( sessionId: string, initialState: Partial = {}, @@ -205,6 +209,7 @@ export class FileSystemStateContextManager { flags?: Record; metadata?: JsonObject; historyEvent?: SessionHistoryEntry; + historyEvents?: SessionHistoryEntry[]; }, ): Promise { const current = await this.readState(sessionId); @@ -218,6 +223,9 @@ export class FileSystemStateContextManager { if (patch.historyEvent) { current.history.push(patch.historyEvent); } + if (patch.historyEvents && patch.historyEvents.length > 0) { + current.history.push(...patch.historyEvents); + } await this.writeState(sessionId, current); return current; diff --git a/tests/orchestration-engine.test.ts b/tests/orchestration-engine.test.ts index 01950c3..0768199 100644 --- a/tests/orchestration-engine.test.ts +++ b/tests/orchestration-engine.test.ts @@ -1,6 +1,6 @@ import test from "node:test"; import assert from "node:assert/strict"; -import { mkdtemp, writeFile } from "node:fs/promises"; +import { mkdtemp, readFile, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { resolve } from "node:path"; import { SchemaDrivenExecutionEngine } from "../src/agents/orchestration.js"; @@ -146,6 +146,7 @@ function createManifest(): unknown { test("runs DAG pipeline with state-dependent routing and retry behavior", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); + const projectContextPath = resolve(stateRoot, "project-context.json"); await writeFile(resolve(workspaceRoot, "PRD.md"), "# PRD\n", "utf8"); @@ -156,6 +157,7 @@ test("runs DAG pipeline with state-dependent routing and retry behavior", async settings: { workspaceRoot, stateRoot, + projectContextPath, runtimeContext: { repo: "ai_ops", ticket: "AIOPS-123", @@ -246,3 +248,422 @@ test("runs DAG pipeline with state-dependent routing and retry behavior", async assert.deepEqual(engine.planChildPersonas({ parentPersonaId: "task", depth: 1 }), ["coder"]); }); + +test("runs parallel topology blocks concurrently and routes via domain-event edges", async () => { + const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); + const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); + const projectContextPath = resolve(stateRoot, "project-context.json"); + + const manifest = { + schemaVersion: "1", + topologies: ["parallel", "retry-unrolled", "sequential"], + personas: [ + { + id: "planner", + displayName: "Planner", + systemPromptTemplate: "Planner {{repo}}", + toolClearance: { + allowlist: ["read_file"], + banlist: [], + }, + }, + { + id: "coder", + displayName: "Coder", + systemPromptTemplate: "Coder {{repo}}", + toolClearance: { + allowlist: ["read_file", "write_file"], + banlist: [], + }, + }, + { + id: "integrator", + displayName: "Integrator", + systemPromptTemplate: "Integrator {{repo}}", + toolClearance: { + allowlist: ["read_file"], + banlist: [], + }, + }, + ], + relationships: [], + topologyConstraints: { + maxDepth: 5, + maxRetries: 2, + }, + pipeline: { + entryNodeId: "plan", + nodes: [ + { + id: "plan", + actorId: "plan_actor", + personaId: "planner", + }, + { + id: "code-a", + actorId: "code_a", + personaId: "coder", + topology: { + kind: "parallel", + blockId: "implementation", + }, + }, + { + id: "code-b", + actorId: "code_b", + personaId: "coder", + topology: { + kind: "parallel", + blockId: "implementation", + }, + }, + { + id: "integrate", + actorId: "integrate_actor", + personaId: "integrator", + }, + ], + edges: [ + { + from: "plan", + to: "code-a", + on: "success", + }, + { + from: "plan", + to: "code-b", + on: "success", + }, + { + from: "code-a", + to: "integrate", + event: "code_committed", + }, + { + from: "code-b", + to: "integrate", + event: "code_committed", + }, + ], + }, + } as const; + + let activeCoders = 0; + let maxConcurrentCoders = 0; + let releaseCoders: (() => void) | undefined; + const codersReleased = new Promise((resolve) => { + releaseCoders = resolve; + }); + let coderStarts = 0; + let notifyBothCodersStarted: (() => void) | undefined; + const bothCodersStarted = new Promise((resolve) => { + notifyBothCodersStarted = resolve; + }); + + const engine = new SchemaDrivenExecutionEngine({ + manifest, + settings: { + workspaceRoot, + stateRoot, + projectContextPath, + runtimeContext: { + repo: "ai_ops", + }, + maxDepth: 5, + maxRetries: 2, + maxChildren: 4, + }, + actorExecutors: { + plan_actor: async () => ({ + status: "success", + payload: { + phase: "plan", + }, + }), + code_a: async () => { + activeCoders += 1; + maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders); + coderStarts += 1; + if (coderStarts === 2) { + notifyBothCodersStarted?.(); + } + await codersReleased; + activeCoders = Math.max(activeCoders - 1, 0); + return { + status: "success", + payload: { + branch: "feature/a", + }, + events: [ + { + type: "code_committed", + payload: { + summary: "Feature A committed", + }, + }, + ], + projectContextPatch: { + artifactPointers: { + feature_a_commit: "feature/a@abc123", + }, + }, + }; + }, + code_b: async () => { + activeCoders += 1; + maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders); + coderStarts += 1; + if (coderStarts === 2) { + notifyBothCodersStarted?.(); + } + await codersReleased; + activeCoders = Math.max(activeCoders - 1, 0); + return { + status: "success", + payload: { + branch: "feature/b", + }, + events: [ + { + type: "code_committed", + payload: { + summary: "Feature B committed", + }, + }, + ], + projectContextPatch: { + enqueueTasks: [ + { + id: "task-integrate", + title: "Integrate feature branches", + status: "pending", + }, + ], + }, + }; + }, + integrate_actor: async () => ({ + status: "success", + payload: { + merged: true, + }, + events: [ + { + type: "branch_merged", + payload: { + summary: "Branches merged", + }, + }, + ], + }), + }, + }); + + const runPromise = engine.runSession({ + sessionId: "session-parallel-domain-events", + initialPayload: { + task: "Parallel implementation", + }, + }); + + await bothCodersStarted; + releaseCoders?.(); + const result = await runPromise; + + assert.equal(maxConcurrentCoders, 2); + assert.deepEqual( + result.records.map((record) => `${record.nodeId}:${record.status}`), + ["plan:success", "code-a:success", "code-b:success", "integrate:success"], + ); + + const storedContextRaw = await readFile(projectContextPath, "utf8"); + const storedContext = JSON.parse(storedContextRaw) as { + artifactPointers: Record; + taskQueue: Array<{ id: string }>; + }; + assert.equal(storedContext.artifactPointers.feature_a_commit, "feature/a@abc123"); + assert.equal(storedContext.taskQueue[0]?.id, "task-integrate"); + const finalStatePointer = storedContext.artifactPointers["sessions/session-parallel-domain-events/final_state"]; + assert.ok(finalStatePointer); + assert.match(finalStatePointer, /state\.json$/); +}); + +test("fails fast after two sequential hard failures", async () => { + const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); + const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); + const projectContextPath = resolve(stateRoot, "project-context.json"); + + const manifest = { + schemaVersion: "1", + topologies: ["sequential"], + personas: [ + { + id: "coder", + displayName: "Coder", + systemPromptTemplate: "Coder", + toolClearance: { + allowlist: [], + banlist: [], + }, + }, + ], + relationships: [], + topologyConstraints: { + maxDepth: 4, + maxRetries: 0, + }, + pipeline: { + entryNodeId: "first", + nodes: [ + { + id: "first", + actorId: "first_actor", + personaId: "coder", + }, + { + id: "second", + actorId: "second_actor", + personaId: "coder", + }, + ], + edges: [ + { + from: "first", + to: "second", + on: "failure", + }, + ], + }, + } as const; + + const engine = new SchemaDrivenExecutionEngine({ + manifest, + settings: { + workspaceRoot, + stateRoot, + projectContextPath, + maxDepth: 4, + maxRetries: 0, + maxChildren: 2, + runtimeContext: {}, + }, + actorExecutors: { + first_actor: async () => ({ + status: "failure", + payload: { + error: "network timeout while reaching upstream API", + }, + failureKind: "hard", + }), + second_actor: async () => ({ + status: "failure", + payload: { + error: "HTTP 403 from provider", + }, + failureKind: "hard", + }), + }, + }); + + await assert.rejects( + () => + engine.runSession({ + sessionId: "session-hard-failure", + initialPayload: { + task: "Trigger hard failures", + }, + }), + /Hard failure threshold reached/, + ); +}); + +test("propagates abort signal into actor execution and stops the run", async () => { + const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); + const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); + const projectContextPath = resolve(stateRoot, "project-context.json"); + + const manifest = { + schemaVersion: "1", + topologies: ["sequential"], + personas: [ + { + id: "coder", + displayName: "Coder", + systemPromptTemplate: "Coder", + toolClearance: { + allowlist: [], + banlist: [], + }, + }, + ], + relationships: [], + topologyConstraints: { + maxDepth: 2, + maxRetries: 0, + }, + pipeline: { + entryNodeId: "long-run", + nodes: [ + { + id: "long-run", + actorId: "long_actor", + personaId: "coder", + }, + ], + edges: [], + }, + } as const; + + let observedAbort = false; + const engine = new SchemaDrivenExecutionEngine({ + manifest, + settings: { + workspaceRoot, + stateRoot, + projectContextPath, + maxDepth: 2, + maxRetries: 0, + maxChildren: 2, + runtimeContext: {}, + }, + actorExecutors: { + long_actor: async (input) => { + await new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, 5000); + input.signal.addEventListener( + "abort", + () => { + observedAbort = true; + clearTimeout(timeout); + reject(input.signal.reason ?? new Error("aborted")); + }, + { once: true }, + ); + }); + + return { + status: "success", + payload: { + unreachable: true, + }, + }; + }, + }, + }); + + const controller = new AbortController(); + const runPromise = engine.runSession({ + sessionId: "session-abort", + initialPayload: { + task: "Abort test", + }, + signal: controller.signal, + }); + + setTimeout(() => { + controller.abort(new Error("manual-abort")); + }, 20); + + await assert.rejects(() => runPromise, /(AbortError|manual-abort|aborted)/i); + assert.equal(observedAbort, true); +}); diff --git a/tests/project-context.test.ts b/tests/project-context.test.ts new file mode 100644 index 0000000..d7de7b8 --- /dev/null +++ b/tests/project-context.test.ts @@ -0,0 +1,58 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { mkdtemp } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { resolve } from "node:path"; +import { FileSystemProjectContextStore } from "../src/agents/project-context.js"; + +test("project context store reads defaults and applies domain patches", async () => { + const root = await mkdtemp(resolve(tmpdir(), "ai-ops-project-context-")); + const store = new FileSystemProjectContextStore({ + filePath: resolve(root, "project-context.json"), + }); + + const initial = await store.readState(); + assert.deepEqual(initial, { + globalFlags: {}, + artifactPointers: {}, + taskQueue: [], + }); + + await store.patchState({ + globalFlags: { + requirements_defined: true, + }, + artifactPointers: { + prd: "docs/PRD.md", + }, + enqueueTasks: [ + { + id: "task-1", + title: "Build parser", + status: "pending", + }, + ], + }); + + const updated = await store.patchState({ + upsertTasks: [ + { + id: "task-1", + title: "Build parser", + status: "in_progress", + }, + { + id: "task-2", + title: "Add tests", + status: "pending", + }, + ], + }); + + assert.equal(updated.globalFlags.requirements_defined, true); + assert.equal(updated.artifactPointers.prd, "docs/PRD.md"); + assert.deepEqual( + updated.taskQueue.map((task) => `${task.id}:${task.status}`), + ["task-1:in_progress", "task-2:pending"], + ); +});