import test from "node:test"; import assert from "node:assert/strict"; import { mkdtemp, readFile, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { resolve } from "node:path"; import { SchemaDrivenExecutionEngine } from "../src/agents/orchestration.js"; import type { ActorExecutionResult } from "../src/agents/pipeline.js"; import { loadConfig } from "../src/config.js"; import { createDefaultMcpRegistry, createMcpHandlerShell } from "../src/mcp.js"; import { SecurityViolationError } from "../src/security/index.js"; function createManifest(): unknown { return { schemaVersion: "1", topologies: ["hierarchical", "retry-unrolled", "sequential"], personas: [ { id: "product", displayName: "Product", systemPromptTemplate: "Product planning for {{repo}}", toolClearance: { allowlist: ["read_file"], banlist: ["delete_file"], }, }, { id: "task", displayName: "Task", systemPromptTemplate: "Task planning for {{repo}}", toolClearance: { allowlist: ["read_file", "write_file"], banlist: ["git_reset"], }, }, { id: "coder", displayName: "Coder", systemPromptTemplate: "Coder implements {{ticket}}", toolClearance: { allowlist: ["read_file", "write_file"], banlist: ["rm"], }, }, { id: "qa", displayName: "QA", systemPromptTemplate: "QA validates {{ticket}}", toolClearance: { allowlist: ["read_file"], banlist: ["write_file"], }, }, ], relationships: [ { parentPersonaId: "product", childPersonaId: "task", constraints: { maxChildren: 2, maxDepth: 2, }, }, { parentPersonaId: "task", childPersonaId: "coder", constraints: { maxChildren: 3, maxDepth: 3, }, }, ], topologyConstraints: { maxDepth: 6, maxRetries: 2, }, pipeline: { entryNodeId: "project-gate", nodes: [ { id: "project-gate", actorId: "project_gate", personaId: "product", }, { id: "task-plan", actorId: "task_plan", personaId: "task", }, { id: "coder-1", actorId: "coder", personaId: "coder", constraints: { maxRetries: 1, }, }, { id: "qa-1", actorId: "qa", personaId: "qa", }, ], edges: [ { from: "project-gate", to: "task-plan", on: "success", when: [ { type: "state_flag", key: "needs_bootstrap", equals: true, }, ], }, { from: "project-gate", to: "coder-1", on: "success", when: [ { type: "state_flag", key: "needs_bootstrap", equals: false, }, ], }, { from: "task-plan", to: "coder-1", on: "success", }, { from: "coder-1", to: "qa-1", on: "success", when: [ { type: "history_has_event", event: "validation_failed", }, ], }, ], }, }; } test("runs DAG pipeline with state-dependent routing and retry behavior", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); await writeFile(resolve(workspaceRoot, "PRD.md"), "# PRD\n", "utf8"); let coderAttempts = 0; const engine = new SchemaDrivenExecutionEngine({ manifest: createManifest(), settings: { workspaceRoot, stateRoot, projectContextPath, runtimeContext: { repo: "ai_ops", ticket: "AIOPS-123", }, maxChildren: 4, maxDepth: 8, maxRetries: 3, }, actorExecutors: { project_gate: async () => ({ status: "success", payload: { phase: "gate", }, stateFlags: { needs_bootstrap: true, }, }), task_plan: async (input) => { assert.match(input.prompt, /ai_ops/); return { status: "success", payload: { plan: "roadmap", }, stateFlags: { roadmap_ready: true, }, }; }, coder: async (input): Promise => { assert.match(input.prompt, /AIOPS-123/); assert.deepEqual(input.executionContext.allowedTools, ["read_file", "write_file"]); assert.equal(input.executionContext.phase, "coder-1"); assert.equal(typeof input.executionContext.modelConstraint, "string"); assert.ok(input.executionContext.modelConstraint.length > 0); assert.ok(input.security); coderAttempts += 1; if (coderAttempts === 1) { return { status: "validation_fail", payload: { issue: "missing test", }, }; } return { status: "success", payload: { code: "done", }, }; }, qa: async () => ({ status: "success", payload: { qa: "ok", }, }), }, behaviorHandlers: { coder: { onValidationFail: () => ({ lastValidationFailure: "coder-1", }), }, }, }); const result = await engine.runSession({ sessionId: "session-orchestration-1", initialPayload: { task: "Implement pipeline", }, }); assert.equal(result.status, "success"); assert.deepEqual( result.records.map((record) => `${record.nodeId}:${record.status}:${String(record.attempt)}`), [ "project-gate:success:1", "task-plan:success:1", "coder-1:validation_fail:1", "coder-1:success:2", "qa-1:success:1", ], ); assert.equal(result.finalState.flags.needs_bootstrap, true); assert.equal(result.finalState.flags.roadmap_ready, true); assert.equal(result.finalState.metadata.lastValidationFailure, "coder-1"); assert.deepEqual(engine.planChildPersonas({ parentPersonaId: "task", depth: 1 }), ["coder"]); }); test("injects resolved mcp/helpers and enforces Claude tool gate in actor executor", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const mcpConfigPath = resolve(workspaceRoot, "mcp.config.json"); await writeFile( mcpConfigPath, JSON.stringify( { servers: { "task-master-tools": { handler: "claude-task-master", type: "stdio", command: "node", args: ["task-master-mcp.js"], enabled_tools: ["read_file", "write_file", "search"], }, }, }, null, 2, ), "utf8", ); const config = loadConfig({ ...process.env, MCP_CONFIG_PATH: mcpConfigPath, }); const customRegistry = createDefaultMcpRegistry(); customRegistry.register( createMcpHandlerShell({ id: "custom-task-mcp-handler", description: "custom task handler", matches: () => false, }), ); const manifest = { schemaVersion: "1" as const, topologies: ["sequential"], personas: [ { id: "task", displayName: "Task", systemPromptTemplate: "Task executor", modelConstraint: "claude-3-haiku", toolClearance: { allowlist: ["read_file", "write_file"], banlist: ["rm"], }, }, ], relationships: [], topologyConstraints: { maxDepth: 2, maxRetries: 0, }, pipeline: { entryNodeId: "task-node", nodes: [ { id: "task-node", actorId: "task_actor", personaId: "task", }, ], edges: [], }, }; const engine = new SchemaDrivenExecutionEngine({ manifest, config, mcpRegistry: customRegistry, settings: { workspaceRoot, stateRoot, projectContextPath, maxChildren: 1, maxDepth: 2, maxRetries: 0, }, actorExecutors: { task_actor: async (input) => { assert.deepEqual(input.executionContext.allowedTools, ["read_file", "write_file"]); assert.equal(input.executionContext.phase, "task-node"); assert.equal(input.executionContext.modelConstraint, "claude-3-haiku"); assert.equal(input.executionContext.security.worktreePath, workspaceRoot); assert.equal(input.executionContext.security.violationMode, "hard_abort"); const codexConfig = input.mcp.resolveConfig({ providerHint: "codex", }); const codexServer = (codexConfig.codexConfig?.mcp_servers as Record> | undefined)?.[ "task-master-tools" ]; assert.ok(codexServer); assert.deepEqual(codexServer.enabled_tools, ["read_file", "write_file"]); assert.deepEqual(input.mcp.allowedTools, ["read_file", "write_file"]); assert.deepEqual( input.mcp.filterToolsForProvider(["read_file", "search", "write_file"]), ["read_file", "write_file"], ); const claudeConfig = input.mcp.resolveConfig({ providerHint: "claude", }); assert.ok(claudeConfig.claudeMcpServers?.["task-master-tools"]); const canUseTool = input.mcp.createClaudeCanUseTool(); const allow = await canUseTool( "mcp__claude-task-master__read_file", {}, { signal: new AbortController().signal, toolUseID: "allow-1", }, ); assert.deepEqual(allow, { behavior: "allow", toolUseID: "allow-1", }); await assert.rejects( () => canUseTool( "mcp__claude-task-master__rm", {}, { signal: new AbortController().signal, toolUseID: "deny-1", }, ), /Tool .* is not present in allowlist/, ); await assert.rejects( () => canUseTool( "mcp__claude-task-master__search", {}, { signal: new AbortController().signal, toolUseID: "deny-2", }, ), /Tool .* is not present in allowlist/, ); return { status: "success", payload: { ok: true, }, }; }, }, }); const result = await engine.runSession({ sessionId: "session-mcp-gate-1", initialPayload: { task: "verify mcp gate", }, }); assert.equal(result.status, "success"); }); test("runs parallel topology blocks concurrently and routes via domain-event edges", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const manifest = { schemaVersion: "1", topologies: ["parallel", "retry-unrolled", "sequential"], personas: [ { id: "planner", displayName: "Planner", systemPromptTemplate: "Planner {{repo}}", toolClearance: { allowlist: ["read_file"], banlist: [], }, }, { id: "coder", displayName: "Coder", systemPromptTemplate: "Coder {{repo}}", toolClearance: { allowlist: ["read_file", "write_file"], banlist: [], }, }, { id: "integrator", displayName: "Integrator", systemPromptTemplate: "Integrator {{repo}}", toolClearance: { allowlist: ["read_file"], banlist: [], }, }, ], relationships: [], topologyConstraints: { maxDepth: 5, maxRetries: 2, }, pipeline: { entryNodeId: "plan", nodes: [ { id: "plan", actorId: "plan_actor", personaId: "planner", }, { id: "code-a", actorId: "code_a", personaId: "coder", topology: { kind: "parallel", blockId: "implementation", }, }, { id: "code-b", actorId: "code_b", personaId: "coder", topology: { kind: "parallel", blockId: "implementation", }, }, { id: "integrate", actorId: "integrate_actor", personaId: "integrator", }, ], edges: [ { from: "plan", to: "code-a", on: "success", }, { from: "plan", to: "code-b", on: "success", }, { from: "code-a", to: "integrate", event: "code_committed", }, { from: "code-b", to: "integrate", event: "code_committed", }, ], }, } as const; let activeCoders = 0; let maxConcurrentCoders = 0; let releaseCoders: (() => void) | undefined; const codersReleased = new Promise((resolve) => { releaseCoders = resolve; }); let coderStarts = 0; let notifyBothCodersStarted: (() => void) | undefined; const bothCodersStarted = new Promise((resolve) => { notifyBothCodersStarted = resolve; }); const engine = new SchemaDrivenExecutionEngine({ manifest, settings: { workspaceRoot, stateRoot, projectContextPath, runtimeContext: { repo: "ai_ops", }, maxDepth: 5, maxRetries: 2, maxChildren: 4, }, actorExecutors: { plan_actor: async () => ({ status: "success", payload: { phase: "plan", }, }), code_a: async () => { activeCoders += 1; maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders); coderStarts += 1; if (coderStarts === 2) { notifyBothCodersStarted?.(); } await codersReleased; activeCoders = Math.max(activeCoders - 1, 0); return { status: "success", payload: { branch: "feature/a", }, events: [ { type: "code_committed", payload: { summary: "Feature A committed", }, }, ], projectContextPatch: { artifactPointers: { feature_a_commit: "feature/a@abc123", }, }, }; }, code_b: async () => { activeCoders += 1; maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders); coderStarts += 1; if (coderStarts === 2) { notifyBothCodersStarted?.(); } await codersReleased; activeCoders = Math.max(activeCoders - 1, 0); return { status: "success", payload: { branch: "feature/b", }, events: [ { type: "code_committed", payload: { summary: "Feature B committed", }, }, ], projectContextPatch: { enqueueTasks: [ { id: "task-integrate", title: "Integrate feature branches", status: "pending", }, ], }, }; }, integrate_actor: async () => ({ status: "success", payload: { merged: true, }, events: [ { type: "branch_merged", payload: { summary: "Branches merged", }, }, ], }), }, }); const runPromise = engine.runSession({ sessionId: "session-parallel-domain-events", initialPayload: { task: "Parallel implementation", }, }); await bothCodersStarted; releaseCoders?.(); const result = await runPromise; assert.equal(maxConcurrentCoders, 2); assert.equal(result.status, "success"); assert.deepEqual( result.records.map((record) => `${record.nodeId}:${record.status}`), ["plan:success", "code-a:success", "code-b:success", "integrate:success"], ); const storedContextRaw = await readFile(projectContextPath, "utf8"); const storedContext = JSON.parse(storedContextRaw) as { artifactPointers: Record; taskQueue: Array<{ id: string }>; }; assert.equal(storedContext.artifactPointers.feature_a_commit, "feature/a@abc123"); assert.equal(storedContext.taskQueue[0]?.id, "task-integrate"); const finalStatePointer = storedContext.artifactPointers["sessions/session-parallel-domain-events/final_state"]; assert.ok(finalStatePointer); assert.match(finalStatePointer, /state\.json$/); }); test("fails fast after two sequential hard failures", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const manifest = { schemaVersion: "1", topologies: ["sequential"], personas: [ { id: "coder", displayName: "Coder", systemPromptTemplate: "Coder", toolClearance: { allowlist: [], banlist: [], }, }, ], relationships: [], topologyConstraints: { maxDepth: 4, maxRetries: 0, }, pipeline: { entryNodeId: "first", nodes: [ { id: "first", actorId: "first_actor", personaId: "coder", }, { id: "second", actorId: "second_actor", personaId: "coder", }, ], edges: [ { from: "first", to: "second", on: "failure", }, ], }, } as const; const engine = new SchemaDrivenExecutionEngine({ manifest, settings: { workspaceRoot, stateRoot, projectContextPath, maxDepth: 4, maxRetries: 0, maxChildren: 2, runtimeContext: {}, }, actorExecutors: { first_actor: async () => ({ status: "failure", payload: { error: "network timeout while reaching upstream API", }, failureKind: "hard", }), second_actor: async () => ({ status: "failure", payload: { error: "HTTP 403 from provider", }, failureKind: "hard", }), }, }); await assert.rejects( () => engine.runSession({ sessionId: "session-hard-failure", initialPayload: { task: "Trigger hard failures", }, }), /Hard failure threshold reached/, ); }); test("marks aggregate status as failure when a terminal node fails", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const manifest = { schemaVersion: "1", topologies: ["sequential"], personas: [ { id: "coder", displayName: "Coder", systemPromptTemplate: "Coder", toolClearance: { allowlist: [], banlist: [], }, }, ], relationships: [], topologyConstraints: { maxDepth: 3, maxRetries: 0, }, pipeline: { entryNodeId: "build", nodes: [ { id: "build", actorId: "build_actor", personaId: "coder", }, { id: "verify", actorId: "verify_actor", personaId: "coder", }, ], edges: [ { from: "build", to: "verify", on: "success", }, ], }, } as const; const engine = new SchemaDrivenExecutionEngine({ manifest, settings: { workspaceRoot, stateRoot, projectContextPath, maxDepth: 3, maxRetries: 0, maxChildren: 2, runtimeContext: {}, }, actorExecutors: { build_actor: async () => ({ status: "success", payload: { step: "build", }, }), verify_actor: async () => ({ status: "failure", payload: { error: "verification failed", }, failureKind: "soft", }), }, }); const result = await engine.runSession({ sessionId: "session-terminal-failure", initialPayload: { task: "Aggregate failure status", }, }); assert.equal(result.status, "failure"); assert.deepEqual( result.records.map((record) => `${record.nodeId}:${record.status}`), ["build:success", "verify:failure"], ); }); test("propagates abort signal into actor execution and stops the run", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const manifest = { schemaVersion: "1", topologies: ["sequential"], personas: [ { id: "coder", displayName: "Coder", systemPromptTemplate: "Coder", toolClearance: { allowlist: [], banlist: [], }, }, ], relationships: [], topologyConstraints: { maxDepth: 2, maxRetries: 0, }, pipeline: { entryNodeId: "long-run", nodes: [ { id: "long-run", actorId: "long_actor", personaId: "coder", }, ], edges: [], }, } as const; let observedAbort = false; const engine = new SchemaDrivenExecutionEngine({ manifest, settings: { workspaceRoot, stateRoot, projectContextPath, maxDepth: 2, maxRetries: 0, maxChildren: 2, runtimeContext: {}, }, actorExecutors: { long_actor: async (input) => { await new Promise((resolve, reject) => { const timeout = setTimeout(resolve, 5000); input.signal.addEventListener( "abort", () => { observedAbort = true; clearTimeout(timeout); reject(input.signal.reason ?? new Error("aborted")); }, { once: true }, ); }); return { status: "success", payload: { unreachable: true, }, }; }, }, }); const controller = new AbortController(); const runPromise = engine.runSession({ sessionId: "session-abort", initialPayload: { task: "Abort test", }, signal: controller.signal, }); setTimeout(() => { controller.abort(new Error("manual-abort")); }, 20); await assert.rejects(() => runPromise, /(AbortError|manual-abort|aborted)/i); assert.equal(observedAbort, true); }); test("hard-aborts pipeline on security violations by default", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const manifest = { schemaVersion: "1", topologies: ["retry-unrolled", "sequential"], personas: [ { id: "coder", displayName: "Coder", systemPromptTemplate: "Coder", toolClearance: { allowlist: ["git"], banlist: [], }, }, ], relationships: [], topologyConstraints: { maxDepth: 3, maxRetries: 2, }, pipeline: { entryNodeId: "secure-node", nodes: [ { id: "secure-node", actorId: "secure_actor", personaId: "coder", }, ], edges: [], }, } as const; const engine = new SchemaDrivenExecutionEngine({ manifest, settings: { workspaceRoot, stateRoot, projectContextPath, maxDepth: 3, maxRetries: 2, maxChildren: 2, runtimeContext: {}, }, actorExecutors: { secure_actor: async () => { throw new SecurityViolationError("blocked by policy", { code: "TOOL_NOT_ALLOWED", }); }, }, }); await assert.rejects( () => engine.runSession({ sessionId: "session-security-hard-abort", initialPayload: { task: "Security hard abort", }, }), /blocked by policy/, ); }); test("can map security violations to validation_fail for retry-unrolled remediation", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const manifest = { schemaVersion: "1", topologies: ["retry-unrolled", "sequential"], personas: [ { id: "coder", displayName: "Coder", systemPromptTemplate: "Coder", toolClearance: { allowlist: ["git"], banlist: [], }, }, ], relationships: [], topologyConstraints: { maxDepth: 3, maxRetries: 2, }, pipeline: { entryNodeId: "secure-node", nodes: [ { id: "secure-node", actorId: "secure_actor", personaId: "coder", constraints: { maxRetries: 1, }, }, ], edges: [], }, } as const; let attempts = 0; const engine = new SchemaDrivenExecutionEngine({ manifest, settings: { workspaceRoot, stateRoot, projectContextPath, maxDepth: 3, maxRetries: 2, maxChildren: 2, securityViolationHandling: "validation_fail", runtimeContext: {}, }, actorExecutors: { secure_actor: async () => { attempts += 1; if (attempts === 1) { throw new SecurityViolationError("first attempt blocked", { code: "PATH_TRAVERSAL_BLOCKED", }); } return { status: "success", payload: { fixed: true, }, }; }, }, }); const result = await engine.runSession({ sessionId: "session-security-validation-retry", initialPayload: { task: "Security retry path", }, }); assert.equal(result.status, "success"); assert.deepEqual( result.records.map((record) => `${record.nodeId}:${record.status}:${String(record.attempt)}`), ["secure-node:validation_fail:1", "secure-node:success:2"], ); }); test("runtime event side-channel logs session and node lifecycle without changing pipeline behavior", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-state-")); const projectContextPath = resolve(stateRoot, "project-context.json"); const runtimeEventLogRelativePath = ".ai_ops/events/test-runtime-events.ndjson"; const runtimeEventLogPath = resolve(workspaceRoot, runtimeEventLogRelativePath); const manifest = { schemaVersion: "1", topologies: ["sequential"], personas: [ { id: "runner", displayName: "Runner", systemPromptTemplate: "Runner", toolClearance: { allowlist: ["read_file"], banlist: [], }, }, ], relationships: [], topologyConstraints: { maxDepth: 2, maxRetries: 0, }, pipeline: { entryNodeId: "node-1", nodes: [ { id: "node-1", actorId: "runner_actor", personaId: "runner", }, ], edges: [], }, } as const; const config = loadConfig({ AGENT_RUNTIME_EVENT_LOG_PATH: runtimeEventLogRelativePath, }); const engine = new SchemaDrivenExecutionEngine({ manifest, config, settings: { workspaceRoot, stateRoot, projectContextPath, maxDepth: 2, maxRetries: 0, maxChildren: 1, runtimeContext: {}, }, actorExecutors: { runner_actor: async () => ({ status: "success", payload: { complete: true, usage: { input_tokens: 120, output_tokens: 80, tool_calls: 2, duration_ms: 450, }, }, }), }, }); const result = await engine.runSession({ sessionId: "session-runtime-events", initialPayload: { task: "Emit runtime events", }, }); assert.equal(result.status, "success"); const lines = (await readFile(runtimeEventLogPath, "utf8")) .trim() .split("\n") .filter((line) => line.length > 0); assert.ok(lines.length >= 4); const events = lines.map((line) => JSON.parse(line) as Record); const eventTypes = new Set(events.map((event) => String(event.type))); assert.ok(eventTypes.has("session.started")); assert.ok(eventTypes.has("node.attempt.completed")); assert.ok(eventTypes.has("domain.validation_passed")); assert.ok(eventTypes.has("session.completed")); const nodeAttemptEvent = events.find((event) => event.type === "node.attempt.completed"); assert.ok(nodeAttemptEvent); const usage = nodeAttemptEvent.usage as Record; assert.equal(usage.tokenInput, 120); assert.equal(usage.tokenOutput, 80); assert.equal(usage.toolCalls, 2); assert.equal(usage.durationMs, 450); });