Files
ai_ops/tests/orchestration-engine.test.ts

1198 lines
30 KiB
TypeScript

import test from "node:test";
import assert from "node:assert/strict";
import { mkdtemp, readFile, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { resolve } from "node:path";
import { SchemaDrivenExecutionEngine } from "../src/agents/orchestration.js";
import type { ActorExecutionResult } from "../src/agents/pipeline.js";
import { loadConfig } from "../src/config.js";
import { createDefaultMcpRegistry, createMcpHandlerShell } from "../src/mcp.js";
import { SecurityViolationError } from "../src/security/index.js";
function createManifest(): unknown {
return {
schemaVersion: "1",
topologies: ["hierarchical", "retry-unrolled", "sequential"],
personas: [
{
id: "product",
displayName: "Product",
systemPromptTemplate: "Product planning for {{repo}}",
toolClearance: {
allowlist: ["read_file"],
banlist: ["delete_file"],
},
},
{
id: "task",
displayName: "Task",
systemPromptTemplate: "Task planning for {{repo}}",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: ["git_reset"],
},
},
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder implements {{ticket}}",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: ["rm"],
},
},
{
id: "qa",
displayName: "QA",
systemPromptTemplate: "QA validates {{ticket}}",
toolClearance: {
allowlist: ["read_file"],
banlist: ["write_file"],
},
},
],
relationships: [
{
parentPersonaId: "product",
childPersonaId: "task",
constraints: {
maxChildren: 2,
maxDepth: 2,
},
},
{
parentPersonaId: "task",
childPersonaId: "coder",
constraints: {
maxChildren: 3,
maxDepth: 3,
},
},
],
topologyConstraints: {
maxDepth: 6,
maxRetries: 2,
},
pipeline: {
entryNodeId: "project-gate",
nodes: [
{
id: "project-gate",
actorId: "project_gate",
personaId: "product",
},
{
id: "task-plan",
actorId: "task_plan",
personaId: "task",
},
{
id: "coder-1",
actorId: "coder",
personaId: "coder",
constraints: {
maxRetries: 1,
},
},
{
id: "qa-1",
actorId: "qa",
personaId: "qa",
},
],
edges: [
{
from: "project-gate",
to: "task-plan",
on: "success",
when: [
{
type: "state_flag",
key: "needs_bootstrap",
equals: true,
},
],
},
{
from: "project-gate",
to: "coder-1",
on: "success",
when: [
{
type: "state_flag",
key: "needs_bootstrap",
equals: false,
},
],
},
{
from: "task-plan",
to: "coder-1",
on: "success",
},
{
from: "coder-1",
to: "qa-1",
on: "success",
when: [
{
type: "history_has_event",
event: "validation_failed",
},
],
},
],
},
};
}
test("runs DAG pipeline with state-dependent routing and retry behavior", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
await writeFile(resolve(workspaceRoot, "PRD.md"), "# PRD\n", "utf8");
let coderAttempts = 0;
const engine = new SchemaDrivenExecutionEngine({
manifest: createManifest(),
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
runtimeContext: {
repo: "ai_ops",
ticket: "AIOPS-123",
},
maxChildren: 4,
maxDepth: 8,
maxRetries: 3,
},
actorExecutors: {
project_gate: async () => ({
status: "success",
payload: {
phase: "gate",
},
stateFlags: {
needs_bootstrap: true,
},
}),
task_plan: async (input) => {
assert.match(input.prompt, /ai_ops/);
return {
status: "success",
payload: {
plan: "roadmap",
},
stateFlags: {
roadmap_ready: true,
},
};
},
coder: async (input): Promise<ActorExecutionResult> => {
assert.match(input.prompt, /AIOPS-123/);
assert.deepEqual(input.executionContext.allowedTools, ["read_file", "write_file"]);
assert.equal(input.executionContext.phase, "coder-1");
assert.equal(typeof input.executionContext.modelConstraint, "string");
assert.ok(input.executionContext.modelConstraint.length > 0);
assert.ok(input.security);
coderAttempts += 1;
if (coderAttempts === 1) {
return {
status: "validation_fail",
payload: {
issue: "missing test",
},
};
}
return {
status: "success",
payload: {
code: "done",
},
};
},
qa: async () => ({
status: "success",
payload: {
qa: "ok",
},
}),
},
behaviorHandlers: {
coder: {
onValidationFail: () => ({
lastValidationFailure: "coder-1",
}),
},
},
});
const result = await engine.runSession({
sessionId: "session-orchestration-1",
initialPayload: {
task: "Implement pipeline",
},
});
assert.equal(result.status, "success");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}:${String(record.attempt)}`),
[
"project-gate:success:1",
"task-plan:success:1",
"coder-1:validation_fail:1",
"coder-1:success:2",
"qa-1:success:1",
],
);
assert.equal(result.finalState.flags.needs_bootstrap, true);
assert.equal(result.finalState.flags.roadmap_ready, true);
assert.equal(result.finalState.metadata.lastValidationFailure, "coder-1");
assert.deepEqual(engine.planChildPersonas({ parentPersonaId: "task", depth: 1 }), ["coder"]);
});
test("injects resolved mcp/helpers and enforces Claude tool gate in actor executor", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const mcpConfigPath = resolve(workspaceRoot, "mcp.config.json");
await writeFile(
mcpConfigPath,
JSON.stringify(
{
servers: {
"task-master-tools": {
handler: "claude-task-master",
type: "stdio",
command: "node",
args: ["task-master-mcp.js"],
enabled_tools: ["read_file", "write_file", "search"],
},
},
},
null,
2,
),
"utf8",
);
const config = loadConfig({
...process.env,
MCP_CONFIG_PATH: mcpConfigPath,
});
const customRegistry = createDefaultMcpRegistry();
customRegistry.register(
createMcpHandlerShell({
id: "custom-task-mcp-handler",
description: "custom task handler",
matches: () => false,
}),
);
const manifest = {
schemaVersion: "1" as const,
topologies: ["sequential"],
personas: [
{
id: "task",
displayName: "Task",
systemPromptTemplate: "Task executor",
modelConstraint: "claude-3-haiku",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: ["rm"],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 2,
maxRetries: 0,
},
pipeline: {
entryNodeId: "task-node",
nodes: [
{
id: "task-node",
actorId: "task_actor",
personaId: "task",
},
],
edges: [],
},
};
const engine = new SchemaDrivenExecutionEngine({
manifest,
config,
mcpRegistry: customRegistry,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxChildren: 1,
maxDepth: 2,
maxRetries: 0,
},
actorExecutors: {
task_actor: async (input) => {
assert.deepEqual(input.executionContext.allowedTools, ["read_file", "write_file"]);
assert.equal(input.executionContext.phase, "task-node");
assert.equal(input.executionContext.modelConstraint, "claude-3-haiku");
assert.equal(input.executionContext.security.worktreePath, workspaceRoot);
assert.equal(input.executionContext.security.violationMode, "hard_abort");
const codexConfig = input.mcp.resolveConfig({
providerHint: "codex",
});
const codexServer = (codexConfig.codexConfig?.mcp_servers as Record<string, Record<string, unknown>> | undefined)?.[
"task-master-tools"
];
assert.ok(codexServer);
assert.deepEqual(codexServer.enabled_tools, ["read_file", "write_file"]);
assert.deepEqual(input.mcp.allowedTools, ["read_file", "write_file"]);
assert.deepEqual(
input.mcp.filterToolsForProvider(["read_file", "search", "write_file"]),
["read_file", "write_file"],
);
const claudeConfig = input.mcp.resolveConfig({
providerHint: "claude",
});
assert.ok(claudeConfig.claudeMcpServers?.["task-master-tools"]);
const canUseTool = input.mcp.createClaudeCanUseTool();
const allow = await canUseTool(
"mcp__claude-task-master__read_file",
{},
{
signal: new AbortController().signal,
toolUseID: "allow-1",
},
);
assert.deepEqual(allow, {
behavior: "allow",
toolUseID: "allow-1",
});
await assert.rejects(
() =>
canUseTool(
"mcp__claude-task-master__rm",
{},
{
signal: new AbortController().signal,
toolUseID: "deny-1",
},
),
/Tool .* is not present in allowlist/,
);
await assert.rejects(
() =>
canUseTool(
"mcp__claude-task-master__search",
{},
{
signal: new AbortController().signal,
toolUseID: "deny-2",
},
),
/Tool .* is not present in allowlist/,
);
return {
status: "success",
payload: {
ok: true,
},
};
},
},
});
const result = await engine.runSession({
sessionId: "session-mcp-gate-1",
initialPayload: {
task: "verify mcp gate",
},
});
assert.equal(result.status, "success");
});
test("runs parallel topology blocks concurrently and routes via domain-event edges", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["parallel", "retry-unrolled", "sequential"],
personas: [
{
id: "planner",
displayName: "Planner",
systemPromptTemplate: "Planner {{repo}}",
toolClearance: {
allowlist: ["read_file"],
banlist: [],
},
},
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder {{repo}}",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: [],
},
},
{
id: "integrator",
displayName: "Integrator",
systemPromptTemplate: "Integrator {{repo}}",
toolClearance: {
allowlist: ["read_file"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 5,
maxRetries: 2,
},
pipeline: {
entryNodeId: "plan",
nodes: [
{
id: "plan",
actorId: "plan_actor",
personaId: "planner",
},
{
id: "code-a",
actorId: "code_a",
personaId: "coder",
topology: {
kind: "parallel",
blockId: "implementation",
},
},
{
id: "code-b",
actorId: "code_b",
personaId: "coder",
topology: {
kind: "parallel",
blockId: "implementation",
},
},
{
id: "integrate",
actorId: "integrate_actor",
personaId: "integrator",
},
],
edges: [
{
from: "plan",
to: "code-a",
on: "success",
},
{
from: "plan",
to: "code-b",
on: "success",
},
{
from: "code-a",
to: "integrate",
event: "code_committed",
},
{
from: "code-b",
to: "integrate",
event: "code_committed",
},
],
},
} as const;
let activeCoders = 0;
let maxConcurrentCoders = 0;
let releaseCoders: (() => void) | undefined;
const codersReleased = new Promise<void>((resolve) => {
releaseCoders = resolve;
});
let coderStarts = 0;
let notifyBothCodersStarted: (() => void) | undefined;
const bothCodersStarted = new Promise<void>((resolve) => {
notifyBothCodersStarted = resolve;
});
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
runtimeContext: {
repo: "ai_ops",
},
maxDepth: 5,
maxRetries: 2,
maxChildren: 4,
},
actorExecutors: {
plan_actor: async () => ({
status: "success",
payload: {
phase: "plan",
},
}),
code_a: async () => {
activeCoders += 1;
maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders);
coderStarts += 1;
if (coderStarts === 2) {
notifyBothCodersStarted?.();
}
await codersReleased;
activeCoders = Math.max(activeCoders - 1, 0);
return {
status: "success",
payload: {
branch: "feature/a",
},
events: [
{
type: "code_committed",
payload: {
summary: "Feature A committed",
},
},
],
projectContextPatch: {
artifactPointers: {
feature_a_commit: "feature/a@abc123",
},
},
};
},
code_b: async () => {
activeCoders += 1;
maxConcurrentCoders = Math.max(maxConcurrentCoders, activeCoders);
coderStarts += 1;
if (coderStarts === 2) {
notifyBothCodersStarted?.();
}
await codersReleased;
activeCoders = Math.max(activeCoders - 1, 0);
return {
status: "success",
payload: {
branch: "feature/b",
},
events: [
{
type: "code_committed",
payload: {
summary: "Feature B committed",
},
},
],
projectContextPatch: {
enqueueTasks: [
{
id: "task-integrate",
title: "Integrate feature branches",
status: "pending",
},
],
},
};
},
integrate_actor: async () => ({
status: "success",
payload: {
merged: true,
},
events: [
{
type: "branch_merged",
payload: {
summary: "Branches merged",
},
},
],
}),
},
});
const runPromise = engine.runSession({
sessionId: "session-parallel-domain-events",
initialPayload: {
task: "Parallel implementation",
},
});
await bothCodersStarted;
releaseCoders?.();
const result = await runPromise;
assert.equal(maxConcurrentCoders, 2);
assert.equal(result.status, "success");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}`),
["plan:success", "code-a:success", "code-b:success", "integrate:success"],
);
const storedContextRaw = await readFile(projectContextPath, "utf8");
const storedContext = JSON.parse(storedContextRaw) as {
artifactPointers: Record<string, string>;
taskQueue: Array<{ id: string }>;
};
assert.equal(storedContext.artifactPointers.feature_a_commit, "feature/a@abc123");
assert.equal(storedContext.taskQueue[0]?.id, "task-integrate");
const finalStatePointer = storedContext.artifactPointers["sessions/session-parallel-domain-events/final_state"];
assert.ok(finalStatePointer);
assert.match(finalStatePointer, /state\.json$/);
});
test("fails fast after two sequential hard failures", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: [],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 4,
maxRetries: 0,
},
pipeline: {
entryNodeId: "first",
nodes: [
{
id: "first",
actorId: "first_actor",
personaId: "coder",
},
{
id: "second",
actorId: "second_actor",
personaId: "coder",
},
],
edges: [
{
from: "first",
to: "second",
on: "failure",
},
],
},
} as const;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 4,
maxRetries: 0,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
first_actor: async () => ({
status: "failure",
payload: {
error: "network timeout while reaching upstream API",
},
failureKind: "hard",
}),
second_actor: async () => ({
status: "failure",
payload: {
error: "HTTP 403 from provider",
},
failureKind: "hard",
}),
},
});
await assert.rejects(
() =>
engine.runSession({
sessionId: "session-hard-failure",
initialPayload: {
task: "Trigger hard failures",
},
}),
/Hard failure threshold reached/,
);
});
test("marks aggregate status as failure when a terminal node fails", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: [],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 3,
maxRetries: 0,
},
pipeline: {
entryNodeId: "build",
nodes: [
{
id: "build",
actorId: "build_actor",
personaId: "coder",
},
{
id: "verify",
actorId: "verify_actor",
personaId: "coder",
},
],
edges: [
{
from: "build",
to: "verify",
on: "success",
},
],
},
} as const;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 3,
maxRetries: 0,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
build_actor: async () => ({
status: "success",
payload: {
step: "build",
},
}),
verify_actor: async () => ({
status: "failure",
payload: {
error: "verification failed",
},
failureKind: "soft",
}),
},
});
const result = await engine.runSession({
sessionId: "session-terminal-failure",
initialPayload: {
task: "Aggregate failure status",
},
});
assert.equal(result.status, "failure");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}`),
["build:success", "verify:failure"],
);
});
test("propagates abort signal into actor execution and stops the run", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: [],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 2,
maxRetries: 0,
},
pipeline: {
entryNodeId: "long-run",
nodes: [
{
id: "long-run",
actorId: "long_actor",
personaId: "coder",
},
],
edges: [],
},
} as const;
let observedAbort = false;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 2,
maxRetries: 0,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
long_actor: async (input) => {
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(resolve, 5000);
input.signal.addEventListener(
"abort",
() => {
observedAbort = true;
clearTimeout(timeout);
reject(input.signal.reason ?? new Error("aborted"));
},
{ once: true },
);
});
return {
status: "success",
payload: {
unreachable: true,
},
};
},
},
});
const controller = new AbortController();
const runPromise = engine.runSession({
sessionId: "session-abort",
initialPayload: {
task: "Abort test",
},
signal: controller.signal,
});
setTimeout(() => {
controller.abort(new Error("manual-abort"));
}, 20);
await assert.rejects(() => runPromise, /(AbortError|manual-abort|aborted)/i);
assert.equal(observedAbort, true);
});
test("hard-aborts pipeline on security violations by default", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["retry-unrolled", "sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: ["git"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 3,
maxRetries: 2,
},
pipeline: {
entryNodeId: "secure-node",
nodes: [
{
id: "secure-node",
actorId: "secure_actor",
personaId: "coder",
},
],
edges: [],
},
} as const;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 3,
maxRetries: 2,
maxChildren: 2,
runtimeContext: {},
},
actorExecutors: {
secure_actor: async () => {
throw new SecurityViolationError("blocked by policy", {
code: "TOOL_NOT_ALLOWED",
});
},
},
});
await assert.rejects(
() =>
engine.runSession({
sessionId: "session-security-hard-abort",
initialPayload: {
task: "Security hard abort",
},
}),
/blocked by policy/,
);
});
test("can map security violations to validation_fail for retry-unrolled remediation", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["retry-unrolled", "sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: ["git"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 3,
maxRetries: 2,
},
pipeline: {
entryNodeId: "secure-node",
nodes: [
{
id: "secure-node",
actorId: "secure_actor",
personaId: "coder",
constraints: {
maxRetries: 1,
},
},
],
edges: [],
},
} as const;
let attempts = 0;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 3,
maxRetries: 2,
maxChildren: 2,
securityViolationHandling: "validation_fail",
runtimeContext: {},
},
actorExecutors: {
secure_actor: async () => {
attempts += 1;
if (attempts === 1) {
throw new SecurityViolationError("first attempt blocked", {
code: "PATH_TRAVERSAL_BLOCKED",
});
}
return {
status: "success",
payload: {
fixed: true,
},
};
},
},
});
const result = await engine.runSession({
sessionId: "session-security-validation-retry",
initialPayload: {
task: "Security retry path",
},
});
assert.equal(result.status, "success");
assert.deepEqual(
result.records.map((record) => `${record.nodeId}:${record.status}:${String(record.attempt)}`),
["secure-node:validation_fail:1", "secure-node:success:2"],
);
});
test("runtime event side-channel logs session and node lifecycle without changing pipeline behavior", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const runtimeEventLogRelativePath = ".ai_ops/events/test-runtime-events.ndjson";
const runtimeEventLogPath = resolve(workspaceRoot, runtimeEventLogRelativePath);
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "runner",
displayName: "Runner",
systemPromptTemplate: "Runner",
toolClearance: {
allowlist: ["read_file"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 2,
maxRetries: 0,
},
pipeline: {
entryNodeId: "node-1",
nodes: [
{
id: "node-1",
actorId: "runner_actor",
personaId: "runner",
},
],
edges: [],
},
} as const;
const config = loadConfig({
AGENT_RUNTIME_EVENT_LOG_PATH: runtimeEventLogRelativePath,
});
const engine = new SchemaDrivenExecutionEngine({
manifest,
config,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 2,
maxRetries: 0,
maxChildren: 1,
runtimeContext: {},
},
actorExecutors: {
runner_actor: async () => ({
status: "success",
payload: {
complete: true,
usage: {
input_tokens: 120,
output_tokens: 80,
tool_calls: 2,
duration_ms: 450,
},
},
}),
},
});
const result = await engine.runSession({
sessionId: "session-runtime-events",
initialPayload: {
task: "Emit runtime events",
},
});
assert.equal(result.status, "success");
const lines = (await readFile(runtimeEventLogPath, "utf8"))
.trim()
.split("\n")
.filter((line) => line.length > 0);
assert.ok(lines.length >= 4);
const events = lines.map((line) => JSON.parse(line) as Record<string, unknown>);
const eventTypes = new Set(events.map((event) => String(event.type)));
assert.ok(eventTypes.has("session.started"));
assert.ok(eventTypes.has("node.attempt.completed"));
assert.ok(eventTypes.has("domain.validation_passed"));
assert.ok(eventTypes.has("session.completed"));
const nodeAttemptEvent = events.find((event) => event.type === "node.attempt.completed");
assert.ok(nodeAttemptEvent);
const usage = nodeAttemptEvent.usage as Record<string, unknown>;
assert.equal(usage.tokenInput, 120);
assert.equal(usage.tokenOutput, 80);
assert.equal(usage.toolCalls, 2);
assert.equal(usage.durationMs, 450);
});