Files
ai_ops/tests/orchestration-engine.test.ts

919 lines
22 KiB
TypeScript

import test from "node:test";
import assert from "node:assert/strict";
import { mkdtemp, readFile, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { resolve } from "node:path";
import { SchemaDrivenExecutionEngine } from "../src/agents/orchestration.js";
import type { ActorExecutionResult } from "../src/agents/pipeline.js";
import { SecurityViolationError } from "../src/security/index.js";
function createManifest(): unknown {
return {
schemaVersion: "1",
topologies: ["hierarchical", "retry-unrolled", "sequential"],
personas: [
{
id: "product",
displayName: "Product",
systemPromptTemplate: "Product planning for {{repo}}",
toolClearance: {
allowlist: ["read_file"],
banlist: ["delete_file"],
},
},
{
id: "task",
displayName: "Task",
systemPromptTemplate: "Task planning for {{repo}}",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: ["git_reset"],
},
},
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder implements {{ticket}}",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: ["rm"],
},
},
{
id: "qa",
displayName: "QA",
systemPromptTemplate: "QA validates {{ticket}}",
toolClearance: {
allowlist: ["read_file"],
banlist: ["write_file"],
},
},
],
relationships: [
{
parentPersonaId: "product",
childPersonaId: "task",
constraints: {
maxChildren: 2,
maxDepth: 2,
},
},
{
parentPersonaId: "task",
childPersonaId: "coder",
constraints: {
maxChildren: 3,
maxDepth: 3,
},
},
],
topologyConstraints: {
maxDepth: 6,
maxRetries: 2,
},
pipeline: {
entryNodeId: "project-gate",
nodes: [
{
id: "project-gate",
actorId: "project_gate",
personaId: "product",
},
{
id: "task-plan",
actorId: "task_plan",
personaId: "task",
},
{
id: "coder-1",
actorId: "coder",
personaId: "coder",
constraints: {
maxRetries: 1,
},
},
{
id: "qa-1",
actorId: "qa",
personaId: "qa",
},
],
edges: [
{
from: "project-gate",
to: "task-plan",
on: "success",
when: [
{
type: "state_flag",
key: "needs_bootstrap",
equals: true,
},
],
},
{
from: "project-gate",
to: "coder-1",
on: "success",
when: [
{
type: "state_flag",
key: "needs_bootstrap",
equals: false,
},
],
},
{
from: "task-plan",
to: "coder-1",
on: "success",
},
{
from: "coder-1",
to: "qa-1",
on: "success",
when: [
{
type: "history_has_event",
event: "validation_fail",
},
],
},
],
},
};
}
test("runs DAG pipeline with state-dependent routing and retry behavior", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
await writeFile(resolve(workspaceRoot, "PRD.md"), "# PRD\n", "utf8");
let coderAttempts = 0;
const engine = new SchemaDrivenExecutionEngine({
manifest: createManifest(),
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
runtimeContext: {
repo: "ai_ops",
ticket: "AIOPS-123",
},
maxChildren: 4,
maxDepth: 8,
maxRetries: 3,
},
actorExecutors: {
project_gate: async () => ({
status: "success",
payload: {
phase: "gate",
},
stateFlags: {
needs_bootstrap: true,
},
}),
task_plan: async (input) => {
assert.match(input.prompt, /ai_ops/);
return {
status: "success",
payload: {
plan: "roadmap",
},
stateFlags: {
roadmap_ready: true,
},
};
},
coder: async (input): Promise<ActorExecutionResult> => {
assert.match(input.prompt, /AIOPS-123/);
assert.deepEqual(input.toolClearance.allowlist, ["read_file", "write_file"]);
assert.ok(input.security);
coderAttempts += 1;
if (coderAttempts === 1) {
return {
status: "validation_fail",
payload: {
issue: "missing test",
},
};
}
return {
status: "success",
payload: {
code: "done",
},
};
},
qa: async () => ({
status: "success",
payload: {
qa: "ok",
},
}),
},
behaviorHandlers: {
coder: {
onValidationFail: () => ({
lastValidationFailure: "coder-1",
}),
},
},
});
const result = await engine.runSession({
sessionId: "session-orchestration-1",
initialPayload: {
task: "Implement pipeline",
},
});
assert.equal(result.status, "success");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}:${String(record.attempt)}`),
[
"project-gate:success:1",
"task-plan:success:1",
"coder-1:validation_fail:1",
"coder-1:success:2",
"qa-1:success:1",
],
);
assert.equal(result.finalState.flags.needs_bootstrap, true);
assert.equal(result.finalState.flags.roadmap_ready, true);
assert.equal(result.finalState.metadata.lastValidationFailure, "coder-1");
assert.deepEqual(engine.planChildPersonas({ parentPersonaId: "task", depth: 1 }), ["coder"]);
});
test("runs parallel topology blocks concurrently and routes via domain-event edges", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["parallel", "retry-unrolled", "sequential"],
personas: [
{
id: "planner",
displayName: "Planner",
systemPromptTemplate: "Planner {{repo}}",
toolClearance: {
allowlist: ["read_file"],
banlist: [],
},
},
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder {{repo}}",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: [],
},
},
{
id: "integrator",
displayName: "Integrator",
systemPromptTemplate: "Integrator {{repo}}",
toolClearance: {
allowlist: ["read_file"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 5,
maxRetries: 2,
},
pipeline: {
entryNodeId: "plan",
nodes: [
{
id: "plan",
actorId: "plan_actor",
personaId: "planner",
},
{
id: "code-a",
actorId: "code_a",
personaId: "coder",
topology: {
kind: "parallel",
blockId: "implementation",
},
},
{
id: "code-b",
actorId: "code_b",
personaId: "coder",
topology: {
kind: "parallel",
blockId: "implementation",
},
},
{
id: "integrate",
actorId: "integrate_actor",
personaId: "integrator",
},
],
edges: [
{
from: "plan",
to: "code-a",
on: "success",
},
{
from: "plan",
to: "code-b",
on: "success",
},
{
from: "code-a",
to: "integrate",
event: "code_committed",
},
{
from: "code-b",
to: "integrate",
event: "code_committed",
},
],
},
} as const;
let activeCoders = 0;
let maxConcurrentCoders = 0;
let releaseCoders: (() => void) | undefined;
const codersReleased = new Promise<void>((resolve) => {
releaseCoders = resolve;
});
let coderStarts = 0;
let notifyBothCodersStarted: (() => void) | undefined;
const bothCodersStarted = new Promise<void>((resolve) => {
notifyBothCodersStarted = resolve;
});
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
runtimeContext: {
repo: "ai_ops",
},
maxDepth: 5,
maxRetries: 2,
maxChildren: 4,
},
actorExecutors: {
plan_actor: async () => ({
status: "success",
payload: {
phase: "plan",
},
}),
code_a: async () => {
activeCoders += 1;
maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders);
coderStarts += 1;
if (coderStarts === 2) {
notifyBothCodersStarted?.();
}
await codersReleased;
activeCoders = Math.max(activeCoders - 1, 0);
return {
status: "success",
payload: {
branch: "feature/a",
},
events: [
{
type: "code_committed",
payload: {
summary: "Feature A committed",
},
},
],
projectContextPatch: {
artifactPointers: {
feature_a_commit: "feature/a@abc123",
},
},
};
},
code_b: async () => {
activeCoders += 1;
maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders);
coderStarts += 1;
if (coderStarts === 2) {
notifyBothCodersStarted?.();
}
await codersReleased;
activeCoders = Math.max(activeCoders - 1, 0);
return {
status: "success",
payload: {
branch: "feature/b",
},
events: [
{
type: "code_committed",
payload: {
summary: "Feature B committed",
},
},
],
projectContextPatch: {
enqueueTasks: [
{
id: "task-integrate",
title: "Integrate feature branches",
status: "pending",
},
],
},
};
},
integrate_actor: async () => ({
status: "success",
payload: {
merged: true,
},
events: [
{
type: "branch_merged",
payload: {
summary: "Branches merged",
},
},
],
}),
},
});
const runPromise = engine.runSession({
sessionId: "session-parallel-domain-events",
initialPayload: {
task: "Parallel implementation",
},
});
await bothCodersStarted;
releaseCoders?.();
const result = await runPromise;
assert.equal(maxConcurrentCoders, 2);
assert.equal(result.status, "success");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}`),
["plan:success", "code-a:success", "code-b:success", "integrate:success"],
);
const storedContextRaw = await readFile(projectContextPath, "utf8");
const storedContext = JSON.parse(storedContextRaw) as {
artifactPointers: Record<string, string>;
taskQueue: Array<{ id: string }>;
};
assert.equal(storedContext.artifactPointers.feature_a_commit, "feature/a@abc123");
assert.equal(storedContext.taskQueue[0]?.id, "task-integrate");
const finalStatePointer = storedContext.artifactPointers["sessions/session-parallel-domain-events/final_state"];
assert.ok(finalStatePointer);
assert.match(finalStatePointer, /state\.json$/);
});
test("fails fast after two sequential hard failures", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: [],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 4,
maxRetries: 0,
},
pipeline: {
entryNodeId: "first",
nodes: [
{
id: "first",
actorId: "first_actor",
personaId: "coder",
},
{
id: "second",
actorId: "second_actor",
personaId: "coder",
},
],
edges: [
{
from: "first",
to: "second",
on: "failure",
},
],
},
} as const;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 4,
maxRetries: 0,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
first_actor: async () => ({
status: "failure",
payload: {
error: "network timeout while reaching upstream API",
},
failureKind: "hard",
}),
second_actor: async () => ({
status: "failure",
payload: {
error: "HTTP 403 from provider",
},
failureKind: "hard",
}),
},
});
await assert.rejects(
() =>
engine.runSession({
sessionId: "session-hard-failure",
initialPayload: {
task: "Trigger hard failures",
},
}),
/Hard failure threshold reached/,
);
});
test("marks aggregate status as failure when a terminal node fails", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: [],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 3,
maxRetries: 0,
},
pipeline: {
entryNodeId: "build",
nodes: [
{
id: "build",
actorId: "build_actor",
personaId: "coder",
},
{
id: "verify",
actorId: "verify_actor",
personaId: "coder",
},
],
edges: [
{
from: "build",
to: "verify",
on: "success",
},
],
},
} as const;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 3,
maxRetries: 0,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
build_actor: async () => ({
status: "success",
payload: {
step: "build",
},
}),
verify_actor: async () => ({
status: "failure",
payload: {
error: "verification failed",
},
failureKind: "soft",
}),
},
});
const result = await engine.runSession({
sessionId: "session-terminal-failure",
initialPayload: {
task: "Aggregate failure status",
},
});
assert.equal(result.status, "failure");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}`),
["build:success", "verify:failure"],
);
});
test("propagates abort signal into actor execution and stops the run", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: [],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 2,
maxRetries: 0,
},
pipeline: {
entryNodeId: "long-run",
nodes: [
{
id: "long-run",
actorId: "long_actor",
personaId: "coder",
},
],
edges: [],
},
} as const;
let observedAbort = false;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 2,
maxRetries: 0,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
long_actor: async (input) => {
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(resolve, 5000);
input.signal.addEventListener(
"abort",
() => {
observedAbort = true;
clearTimeout(timeout);
reject(input.signal.reason ?? new Error("aborted"));
},
{ once: true },
);
});
return {
status: "success",
payload: {
unreachable: true,
},
};
},
},
});
const controller = new AbortController();
const runPromise = engine.runSession({
sessionId: "session-abort",
initialPayload: {
task: "Abort test",
},
signal: controller.signal,
});
setTimeout(() => {
controller.abort(new Error("manual-abort"));
}, 20);
await assert.rejects(() => runPromise, /(AbortError|manual-abort|aborted)/i);
assert.equal(observedAbort, true);
});
test("hard-aborts pipeline on security violations by default", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["retry-unrolled", "sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: ["git"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 3,
maxRetries: 2,
},
pipeline: {
entryNodeId: "secure-node",
nodes: [
{
id: "secure-node",
actorId: "secure_actor",
personaId: "coder",
},
],
edges: [],
},
} as const;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 3,
maxRetries: 2,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
secure_actor: async () => {
throw new SecurityViolationError("blocked by policy", {
code: "TOOL_NOT_ALLOWED",
});
},
},
});
await assert.rejects(
() =>
engine.runSession({
sessionId: "session-security-hard-abort",
initialPayload: {
task: "Security hard abort",
},
}),
/blocked by policy/,
);
});
test("can map security violations to validation_fail for retry-unrolled remediation", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["retry-unrolled", "sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: ["git"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 3,
maxRetries: 2,
},
pipeline: {
entryNodeId: "secure-node",
nodes: [
{
id: "secure-node",
actorId: "secure_actor",
personaId: "coder",
constraints: {
maxRetries: 1,
},
},
],
edges: [],
},
} as const;
let attempts = 0;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 3,
maxRetries: 2,
maxChildren: 2,
securityViolationHandling: "validation_fail",
runtimeContext: {},
},
actorExecutors: {
secure_actor: async () => {
attempts += 1;
if (attempts === 1) {
throw new SecurityViolationError("first attempt blocked", {
code: "PATH_TRAVERSAL_BLOCKED",
});
}
return {
status: "success",
payload: {
fixed: true,
},
};
},
},
});
const result = await engine.runSession({
sessionId: "session-security-validation-retry",
initialPayload: {
task: "Security retry path",
},
});
assert.equal(result.status, "success");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}:${String(record.attempt)}`),
["secure-node:validation_fail:1", "secure-node:success:2"],
);
});