Enforce actor-level MCP policy wiring and Claude tool gates

This commit is contained in:
2026-02-23 17:28:55 -05:00
parent 20e944f7d4
commit 3ca9bd3db8
5 changed files with 828 additions and 111 deletions

View File

@@ -11,11 +11,13 @@ TypeScript runtime for deterministic multi-agent execution with:
- Resource provisioning (git worktrees + deterministic port ranges) - Resource provisioning (git worktrees + deterministic port ranges)
- MCP configuration layer with handler policy hooks - MCP configuration layer with handler policy hooks
- Security middleware for shell/tool policy enforcement - Security middleware for shell/tool policy enforcement
- Runtime event fan-out (NDJSON analytics log + optional Discord webhook notifications)
## Architecture Summary ## Architecture Summary
- `SchemaDrivenExecutionEngine.runSession(...)` is the single execution entrypoint. - `SchemaDrivenExecutionEngine.runSession(...)` is the single execution entrypoint.
- `PipelineExecutor` owns runtime control flow and topology dispatch while delegating failure classification and persistence/event side-effects to dedicated policies. - `PipelineExecutor` owns runtime control flow and topology dispatch while delegating failure classification and persistence/event side-effects to dedicated policies.
- Runtime events are emitted as best-effort side-channel telemetry and do not affect orchestration control flow.
- `AgentManager` is an internal utility used by the pipeline when fan-out/retry-unrolled behavior is required. - `AgentManager` is an internal utility used by the pipeline when fan-out/retry-unrolled behavior is required.
- Session state is persisted under `AGENT_STATE_ROOT`. - Session state is persisted under `AGENT_STATE_ROOT`.
- Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with schema-versioned JSON (`schemaVersion`) and domains: - Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with schema-versioned JSON (`schemaVersion`) and domains:
@@ -39,6 +41,7 @@ TypeScript runtime for deterministic multi-agent execution with:
- `provisioning.ts`: resource provisioning and child suballocation helpers - `provisioning.ts`: resource provisioning and child suballocation helpers
- `src/mcp`: MCP config types/conversion/handlers - `src/mcp`: MCP config types/conversion/handlers
- `src/security`: shell AST parsing, rules engine, secure executor, and audit sinks - `src/security`: shell AST parsing, rules engine, secure executor, and audit sinks
- `src/telemetry`: runtime event schema, sink fan-out, file sink, and Discord webhook sink
- `src/examples`: provider entrypoints (`codex.ts`, `claude.ts`) - `src/examples`: provider entrypoints (`codex.ts`, `claude.ts`)
- `src/config.ts`: centralized env parsing/validation/defaulting - `src/config.ts`: centralized env parsing/validation/defaulting
- `tests`: manager, manifest, pipeline/orchestration, state, provisioning, MCP - `tests`: manager, manifest, pipeline/orchestration, state, provisioning, MCP
@@ -101,6 +104,78 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em
- session closure aborts child recursive work - session closure aborts child recursive work
- run summaries expose aggregate `status`: success requires successful terminal executed DAG nodes and no critical-path failure - run summaries expose aggregate `status`: success requires successful terminal executed DAG nodes and no critical-path failure
## Runtime Events
- The pipeline emits runtime lifecycle events (`session.started`, `node.attempt.completed`, `domain.*`, `session.completed`, `session.failed`).
- Runtime events are fan-out only and never used for edge-routing decisions.
- Default sink writes NDJSON to `AGENT_RUNTIME_EVENT_LOG_PATH`.
- Optional Discord sink posts high-visibility lifecycle/error events through webhook configuration.
- Existing security command audit output (`AGENT_SECURITY_AUDIT_LOG_PATH`) remains in place and is also mirrored into runtime events.
### Runtime Event Fields
Each runtime event is written as one NDJSON object with:
- `id`, `timestamp`, `type`, `severity`
- `sessionId`, `nodeId`, `attempt`
- `message`
- optional `usage` (`tokenInput`, `tokenOutput`, `tokenTotal`, `toolCalls`, `durationMs`, `costUsd`)
- optional structured `metadata`
### Runtime Event Setup
Add these variables in `.env` (or use defaults):
```bash
AGENT_RUNTIME_EVENT_LOG_PATH=.ai_ops/events/runtime-events.ndjson
AGENT_RUNTIME_DISCORD_WEBHOOK_URL=
AGENT_RUNTIME_DISCORD_MIN_SEVERITY=critical
AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES=session.started,session.completed,session.failed
```
Notes:
- File sink is always enabled and appends NDJSON.
- Discord sink is enabled only when `AGENT_RUNTIME_DISCORD_WEBHOOK_URL` is set.
- Discord notifications are sent when event severity is at or above `AGENT_RUNTIME_DISCORD_MIN_SEVERITY`.
- Event types in `AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES` are always sent, regardless of severity.
### Event Types Emitted by Runtime
- Session lifecycle:
- `session.started`
- `session.completed`
- `session.failed`
- Node/domain lifecycle:
- `node.attempt.completed`
- `domain.<domain_event_type>` (for example `domain.validation_failed`)
- Security mirror events:
- `security.shell.command_profiled`
- `security.shell.command_allowed`
- `security.shell.command_blocked`
- `security.tool.invocation_allowed`
- `security.tool.invocation_blocked`
### Analytics Quick Start
Inspect latest events:
```bash
tail -n 50 .ai_ops/events/runtime-events.ndjson
```
Count events by type:
```bash
jq -r '.type' .ai_ops/events/runtime-events.ndjson | sort | uniq -c
```
Get only critical events:
```bash
jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson
```
## Security Middleware ## Security Middleware
- Shell command parsing uses async `sh-syntax` (WASM-backed mvdan/sh parser) with fail-closed command/redirect extraction. - Shell command parsing uses async `sh-syntax` (WASM-backed mvdan/sh parser) with fail-closed command/redirect extraction.
@@ -117,6 +192,8 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em
- optional uid/gid drop - optional uid/gid drop
- stdout/stderr streaming hooks for audit - stdout/stderr streaming hooks for audit
- Every actor execution input now includes `security` helpers (`rulesEngine`, `createCommandExecutor(...)`) so executors can enforce shell/tool policy at the execution boundary. - Every actor execution input now includes `security` helpers (`rulesEngine`, `createCommandExecutor(...)`) so executors can enforce shell/tool policy at the execution boundary.
- Every actor execution input now includes `mcp` helpers (`registry`, `resolveConfig(...)`) so MCP server config resolution stays centrally policy-controlled per persona/tool-clearance.
- For Claude-based executors, use `input.mcp.createClaudeCanUseTool()` as the SDK `canUseTool` callback to enforce persona allowlist/banlist before each tool invocation.
- Pipeline behavior on `SecurityViolationError` is configurable: - Pipeline behavior on `SecurityViolationError` is configurable:
- `hard_abort` (default) - `hard_abort` (default)
- `validation_fail` (retry-unrolled remediation) - `validation_fail` (retry-unrolled remediation)
@@ -127,6 +204,7 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em
- `CODEX_API_KEY` - `CODEX_API_KEY`
- `OPENAI_API_KEY` - `OPENAI_API_KEY`
- `OPENAI_AUTH_MODE` (`auto`, `chatgpt`, or `api_key`)
- `OPENAI_BASE_URL` - `OPENAI_BASE_URL`
- `CODEX_SKIP_GIT_CHECK` - `CODEX_SKIP_GIT_CHECK`
- `CLAUDE_CODE_OAUTH_TOKEN` (preferred for Claude auth; takes precedence over `ANTHROPIC_API_KEY`) - `CLAUDE_CODE_OAUTH_TOKEN` (preferred for Claude auth; takes precedence over `ANTHROPIC_API_KEY`)
@@ -171,6 +249,13 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em
- `AGENT_SECURITY_DROP_UID` - `AGENT_SECURITY_DROP_UID`
- `AGENT_SECURITY_DROP_GID` - `AGENT_SECURITY_DROP_GID`
### Runtime Events / Telemetry
- `AGENT_RUNTIME_EVENT_LOG_PATH`
- `AGENT_RUNTIME_DISCORD_WEBHOOK_URL`
- `AGENT_RUNTIME_DISCORD_MIN_SEVERITY` (`info`, `warning`, or `critical`)
- `AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES` (CSV event types such as `session.started,session.completed,session.failed`)
### Runtime-Injected (Do Not Configure In `.env`) ### Runtime-Injected (Do Not Configure In `.env`)
- `AGENT_REPO_ROOT` - `AGENT_REPO_ROOT`
@@ -183,6 +268,14 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em
Defaults are documented in `.env.example`. Defaults are documented in `.env.example`.
Auth behavior notes:
- OpenAI/Codex:
- `OPENAI_AUTH_MODE=auto` (default) prefers API keys when configured, and otherwise relies on existing Codex CLI login (`codex login` / ChatGPT plan auth).
- `OPENAI_AUTH_MODE=chatgpt` always omits API key injection so Codex uses ChatGPT subscription auth/session.
- Claude:
- If `CLAUDE_CODE_OAUTH_TOKEN` and `ANTHROPIC_API_KEY` are both unset, runtime auth options are omitted and Claude Agent SDK can use existing Claude Code login state.
## Quality Gate ## Quality Gate
```bash ```bash

View File

@@ -35,8 +35,14 @@ This middleware provides a first-pass hardening layer for agent-executed shell c
`McpRegistry.resolveServerWithHandler(...)` now accepts optional tool clearance and applies it to resolved Codex MCP tool lists (`enabled_tools`, `disabled_tools`). `McpRegistry.resolveServerWithHandler(...)` now accepts optional tool clearance and applies it to resolved Codex MCP tool lists (`enabled_tools`, `disabled_tools`).
`ActorExecutionInput` now carries an `mcp` execution context with:
- `registry`: resolved runtime `McpRegistry`
- `resolveConfig(...)`: centralized MCP config resolution with persona tool-clearance applied
- `createClaudeCanUseTool()`: helper for Claude SDK `canUseTool` callback so each tool invocation is allowlist/banlist-enforced before execution
## Known limits and TODOs ## Known limits and TODOs
- AST validation is token-based and does not yet model full shell evaluation semantics (e.g. runtime-generated paths from env expansion). - AST validation is token-based and does not yet model full shell evaluation semantics (e.g. runtime-generated paths from env expansion).
- Audit output is line-oriented file logging; move to a centralized telemetry pipeline for long-term profiling. - Audit output remains line-oriented file logging; runtime events now mirror security decisions for side-channel analytics and alerting.
- Deno sandbox mode is not enforced yet. A future executor mode can wrap shell runs via `deno run` with strict `--allow-read/--allow-run` flags. - Deno sandbox mode is not enforced yet. A future executor mode can wrap shell runs via `deno run` with strict `--allow-read/--allow-run` flags.

View File

@@ -1,6 +1,6 @@
import { resolve } from "node:path"; import { resolve } from "node:path";
import { getConfig, loadConfig, type AppConfig } from "../config.js"; import { getConfig, loadConfig, type AppConfig } from "../config.js";
import { createDefaultMcpRegistry, McpRegistry } from "../mcp.js"; import { createDefaultMcpRegistry, loadMcpConfigFromEnv, McpRegistry } from "../mcp.js";
import { parseAgentManifest, type AgentManifest } from "./manifest.js"; import { parseAgentManifest, type AgentManifest } from "./manifest.js";
import { AgentManager } from "./manager.js"; import { AgentManager } from "./manager.js";
import { import {
@@ -19,9 +19,17 @@ import { FileSystemStateContextManager, type StoredSessionState } from "./state-
import type { JsonObject } from "./types.js"; import type { JsonObject } from "./types.js";
import { import {
SecureCommandExecutor, SecureCommandExecutor,
type SecurityAuditEvent,
type SecurityAuditSink,
SecurityRulesEngine, SecurityRulesEngine,
createFileSecurityAuditSink, createFileSecurityAuditSink,
} from "../security/index.js"; } from "../security/index.js";
import {
RuntimeEventPublisher,
createDiscordWebhookRuntimeEventSink,
createFileRuntimeEventSink,
type RuntimeEventSeverity,
} from "../telemetry/index.js";
export type OrchestrationSettings = { export type OrchestrationSettings = {
workspaceRoot: string; workspaceRoot: string;
@@ -76,13 +84,106 @@ function getChildrenByParent(manifest: AgentManifest): Map<string, AgentManifest
return map; return map;
} }
function mapSecurityAuditSeverity(event: SecurityAuditEvent): RuntimeEventSeverity {
if (event.type === "shell.command_blocked" || event.type === "tool.invocation_blocked") {
return "critical";
}
return "info";
}
function toSecurityAuditMessage(event: SecurityAuditEvent): string {
if (event.type === "shell.command_blocked") {
return event.reason;
}
if (event.type === "tool.invocation_blocked") {
return event.reason;
}
if (event.type === "shell.command_allowed") {
return "Shell command passed security validation.";
}
if (event.type === "shell.command_profiled") {
return "Shell command parsed for security validation.";
}
return `Tool "${event.tool}" passed policy checks.`;
}
function toSecurityAuditMetadata(event: SecurityAuditEvent): JsonObject {
if (event.type === "shell.command_profiled") {
return {
command: event.command,
cwd: event.cwd,
commandCount: event.parsed.commandCount,
};
}
if (event.type === "shell.command_allowed") {
return {
command: event.command,
cwd: event.cwd,
commandCount: event.commandCount,
};
}
if (event.type === "shell.command_blocked") {
return {
command: event.command,
cwd: event.cwd,
reason: event.reason,
code: event.code,
};
}
if (event.type === "tool.invocation_allowed") {
return {
tool: event.tool,
};
}
return {
tool: event.tool,
reason: event.reason,
code: event.code,
};
}
function createRuntimeEventPublisher(input: {
config: Readonly<AppConfig>;
settings: OrchestrationSettings;
}): RuntimeEventPublisher {
const sinks = [
createFileRuntimeEventSink(
resolve(input.settings.workspaceRoot, input.config.runtimeEvents.logPath),
),
];
if (input.config.runtimeEvents.discordWebhookUrl) {
sinks.push(
createDiscordWebhookRuntimeEventSink({
webhookUrl: input.config.runtimeEvents.discordWebhookUrl,
minSeverity: input.config.runtimeEvents.discordMinSeverity,
alwaysNotifyTypes: input.config.runtimeEvents.discordAlwaysNotifyTypes,
}),
);
}
return new RuntimeEventPublisher({
sinks,
});
}
function createActorSecurityContext(input: { function createActorSecurityContext(input: {
config: Readonly<AppConfig>; config: Readonly<AppConfig>;
settings: OrchestrationSettings; settings: OrchestrationSettings;
runtimeEventPublisher: RuntimeEventPublisher;
}): ActorExecutionSecurityContext { }): ActorExecutionSecurityContext {
const auditSink = createFileSecurityAuditSink( const fileAuditSink = createFileSecurityAuditSink(
resolve(input.settings.workspaceRoot, input.config.security.auditLogPath), resolve(input.settings.workspaceRoot, input.config.security.auditLogPath),
); );
const auditSink: SecurityAuditSink = (event): void => {
fileAuditSink(event);
void input.runtimeEventPublisher.publish({
type: `security.${event.type}`,
severity: mapSecurityAuditSeverity(event),
message: toSecurityAuditMessage(event),
metadata: toSecurityAuditMetadata(event),
});
};
const rulesEngine = new SecurityRulesEngine( const rulesEngine = new SecurityRulesEngine(
{ {
allowedBinaries: input.config.security.shellAllowedBinaries, allowedBinaries: input.config.security.shellAllowedBinaries,
@@ -126,10 +227,12 @@ export class SchemaDrivenExecutionEngine {
private readonly stateManager: FileSystemStateContextManager; private readonly stateManager: FileSystemStateContextManager;
private readonly projectContextStore: FileSystemProjectContextStore; private readonly projectContextStore: FileSystemProjectContextStore;
private readonly actorExecutors: ReadonlyMap<string, ActorExecutor>; private readonly actorExecutors: ReadonlyMap<string, ActorExecutor>;
private readonly config: Readonly<AppConfig>;
private readonly settings: OrchestrationSettings; private readonly settings: OrchestrationSettings;
private readonly childrenByParent: Map<string, AgentManifest["relationships"]>; private readonly childrenByParent: Map<string, AgentManifest["relationships"]>;
private readonly manager: AgentManager; private readonly manager: AgentManager;
private readonly mcpRegistry: McpRegistry; private readonly mcpRegistry: McpRegistry;
private readonly runtimeEventPublisher: RuntimeEventPublisher;
private readonly securityContext: ActorExecutionSecurityContext; private readonly securityContext: ActorExecutionSecurityContext;
constructor(input: { constructor(input: {
@@ -147,6 +250,7 @@ export class SchemaDrivenExecutionEngine {
this.manifest = parseAgentManifest(input.manifest); this.manifest = parseAgentManifest(input.manifest);
const config = input.config ?? getConfig(); const config = input.config ?? getConfig();
this.config = config;
this.settings = { this.settings = {
workspaceRoot: resolve(input.settings?.workspaceRoot ?? process.cwd()), workspaceRoot: resolve(input.settings?.workspaceRoot ?? process.cwd()),
stateRoot: resolve(input.settings?.stateRoot ?? config.orchestration.stateRoot), stateRoot: resolve(input.settings?.stateRoot ?? config.orchestration.stateRoot),
@@ -179,9 +283,14 @@ export class SchemaDrivenExecutionEngine {
maxRecursiveDepth: config.agentManager.maxRecursiveDepth, maxRecursiveDepth: config.agentManager.maxRecursiveDepth,
}); });
this.mcpRegistry = input.mcpRegistry ?? createDefaultMcpRegistry(); this.mcpRegistry = input.mcpRegistry ?? createDefaultMcpRegistry();
this.runtimeEventPublisher = createRuntimeEventPublisher({
config,
settings: this.settings,
});
this.securityContext = createActorSecurityContext({ this.securityContext = createActorSecurityContext({
config, config,
settings: this.settings, settings: this.settings,
runtimeEventPublisher: this.runtimeEventPublisher,
}); });
for (const persona of this.manifest.personas) { for (const persona of this.manifest.personas) {
@@ -261,8 +370,21 @@ export class SchemaDrivenExecutionEngine {
managerSessionId, managerSessionId,
projectContextStore: this.projectContextStore, projectContextStore: this.projectContextStore,
mcpRegistry: this.mcpRegistry, mcpRegistry: this.mcpRegistry,
resolveMcpConfig: ({ providerHint, prompt, toolClearance }) =>
loadMcpConfigFromEnv(
{
providerHint,
prompt,
},
{
config: this.config,
registry: this.mcpRegistry,
toolClearance,
},
),
securityViolationHandling: this.settings.securityViolationHandling, securityViolationHandling: this.settings.securityViolationHandling,
securityContext: this.securityContext, securityContext: this.securityContext,
runtimeEventPublisher: this.runtimeEventPublisher,
}, },
); );
try { try {

View File

@@ -17,6 +17,7 @@ import {
import type { AgentManifest, PipelineEdge, PipelineNode, RouteCondition } from "./manifest.js"; import type { AgentManifest, PipelineEdge, PipelineNode, RouteCondition } from "./manifest.js";
import type { AgentManager, RecursiveChildIntent } from "./manager.js"; import type { AgentManager, RecursiveChildIntent } from "./manager.js";
import type { McpRegistry } from "../mcp/handlers.js"; import type { McpRegistry } from "../mcp/handlers.js";
import type { LoadedMcpConfig, McpLoadContext } from "../mcp/types.js";
import { PersonaRegistry } from "./persona-registry.js"; import { PersonaRegistry } from "./persona-registry.js";
import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js"; import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js";
import { import {
@@ -27,11 +28,14 @@ import {
import type { JsonObject } from "./types.js"; import type { JsonObject } from "./types.js";
import { import {
SecureCommandExecutor, SecureCommandExecutor,
parseToolClearancePolicy,
SecurityRulesEngine, SecurityRulesEngine,
SecurityViolationError, SecurityViolationError,
type ExecutionEnvPolicy, type ExecutionEnvPolicy,
type SecurityViolationHandling, type SecurityViolationHandling,
type ToolClearancePolicy,
} from "../security/index.js"; } from "../security/index.js";
import { type RuntimeEventPublisher } from "../telemetry/index.js";
export type ActorResultStatus = "success" | "validation_fail" | "failure"; export type ActorResultStatus = "success" | "validation_fail" | "failure";
export type ActorFailureKind = "soft" | "hard"; export type ActorFailureKind = "soft" | "hard";
@@ -47,16 +51,43 @@ export type ActorExecutionResult = {
failureCode?: string; failureCode?: string;
}; };
export type ActorToolPermissionResult =
| {
behavior: "allow";
toolUseID?: string;
}
| {
behavior: "deny";
message: string;
interrupt?: boolean;
toolUseID?: string;
};
export type ActorToolPermissionHandler = (
toolName: string,
input: Record<string, unknown>,
options: {
signal: AbortSignal;
toolUseID?: string;
agentID?: string;
},
) => Promise<ActorToolPermissionResult>;
export type ActorExecutionMcpContext = {
registry: McpRegistry;
resolveConfig: (context?: McpLoadContext) => LoadedMcpConfig;
createToolPermissionHandler: () => ActorToolPermissionHandler;
createClaudeCanUseTool: () => ActorToolPermissionHandler;
};
export type ActorExecutionInput = { export type ActorExecutionInput = {
sessionId: string; sessionId: string;
node: PipelineNode; node: PipelineNode;
prompt: string; prompt: string;
context: NodeExecutionContext; context: NodeExecutionContext;
signal: AbortSignal; signal: AbortSignal;
toolClearance: { toolClearance: ToolClearancePolicy;
allowlist: string[]; mcp: ActorExecutionMcpContext;
banlist: string[];
};
security?: ActorExecutionSecurityContext; security?: ActorExecutionSecurityContext;
}; };
@@ -92,8 +123,10 @@ export type PipelineExecutorOptions = {
failurePolicy?: FailurePolicy; failurePolicy?: FailurePolicy;
lifecycleObserver?: PipelineLifecycleObserver; lifecycleObserver?: PipelineLifecycleObserver;
hardFailureThreshold?: number; hardFailureThreshold?: number;
resolveMcpConfig?: (input: McpLoadContext & { toolClearance: ToolClearancePolicy }) => LoadedMcpConfig;
securityViolationHandling?: SecurityViolationHandling; securityViolationHandling?: SecurityViolationHandling;
securityContext?: ActorExecutionSecurityContext; securityContext?: ActorExecutionSecurityContext;
runtimeEventPublisher?: RuntimeEventPublisher;
}; };
export type ActorExecutionSecurityContext = { export type ActorExecutionSecurityContext = {
@@ -245,6 +278,63 @@ function toAbortError(signal: AbortSignal): Error {
return error; return error;
} }
function toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
function dedupeStrings(values: readonly string[]): string[] {
const deduped: string[] = [];
const seen = new Set<string>();
for (const value of values) {
const normalized = value.trim();
if (!normalized || seen.has(normalized)) {
continue;
}
seen.add(normalized);
deduped.push(normalized);
}
return deduped;
}
function toToolNameCandidates(toolName: string): string[] {
const trimmed = toolName.trim();
if (!trimmed) {
return [];
}
const candidates = [trimmed];
const maybeSuffix = (separator: string): void => {
const index = trimmed.lastIndexOf(separator);
if (index === -1 || index + separator.length >= trimmed.length) {
return;
}
candidates.push(trimmed.slice(index + separator.length));
};
const doubleUnderscoreParts = trimmed.split("__").filter((entry) => entry.length > 0);
if (doubleUnderscoreParts.length >= 2) {
const last = doubleUnderscoreParts[doubleUnderscoreParts.length - 1];
if (last) {
candidates.push(last);
}
if (doubleUnderscoreParts.length >= 3) {
candidates.push(doubleUnderscoreParts.slice(2).join("__"));
}
}
maybeSuffix(":");
maybeSuffix(".");
maybeSuffix("/");
maybeSuffix("\\");
return dedupeStrings(candidates);
}
function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload { function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload {
if (status === "success") { if (status === "success") {
return { return {
@@ -320,6 +410,7 @@ export class PipelineExecutor {
stateManager: this.stateManager, stateManager: this.stateManager,
projectContextStore: this.options.projectContextStore, projectContextStore: this.options.projectContextStore,
domainEventBus: this.domainEventBus, domainEventBus: this.domainEventBus,
runtimeEventPublisher: this.options.runtimeEventPublisher,
}); });
for (const node of manifest.pipeline.nodes) { for (const node of manifest.pipeline.nodes) {
@@ -342,136 +433,171 @@ export class PipelineExecutor {
initialState?: Partial<StoredSessionState>; initialState?: Partial<StoredSessionState>;
signal?: AbortSignal; signal?: AbortSignal;
}): Promise<PipelineRunSummary> { }): Promise<PipelineRunSummary> {
const projectContext = await this.options.projectContextStore.readState(); await this.options.runtimeEventPublisher?.publish({
type: "session.started",
await this.stateManager.initializeSession(input.sessionId, { severity: "info",
...(input.initialState ?? {}), sessionId: input.sessionId,
flags: { message: "Pipeline session started.",
...projectContext.globalFlags,
...(input.initialState?.flags ?? {}),
},
metadata: { metadata: {
project_context: { entryNodeId: this.manifest.pipeline.entryNodeId,
globalFlags: { ...projectContext.globalFlags },
artifactPointers: { ...projectContext.artifactPointers },
taskQueue: projectContext.taskQueue.map((task) => ({
id: task.id,
title: task.title,
status: task.status,
...(task.assignee ? { assignee: task.assignee } : {}),
...(task.metadata ? { metadata: task.metadata } : {}),
})),
},
...((input.initialState?.metadata ?? {}) as JsonObject),
}, },
}); });
await this.stateManager.writeHandoff(input.sessionId, { try {
nodeId: this.manifest.pipeline.entryNodeId, const projectContext = await this.options.projectContextStore.readState();
payload: input.initialPayload,
});
const records: PipelineExecutionRecord[] = []; await this.stateManager.initializeSession(input.sessionId, {
const events: DomainEvent[] = []; ...(input.initialState ?? {}),
const ready = new Map<string, number>([[this.manifest.pipeline.entryNodeId, 0]]); flags: {
const completedNodes = new Set<string>(); ...projectContext.globalFlags,
...(input.initialState?.flags ?? {}),
},
metadata: {
project_context: {
globalFlags: { ...projectContext.globalFlags },
artifactPointers: { ...projectContext.artifactPointers },
taskQueue: projectContext.taskQueue.map((task) => ({
id: task.id,
title: task.title,
status: task.status,
...(task.assignee ? { assignee: task.assignee } : {}),
...(task.metadata ? { metadata: task.metadata } : {}),
})),
},
...((input.initialState?.metadata ?? {}) as JsonObject),
},
});
const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 4); await this.stateManager.writeHandoff(input.sessionId, {
let executionCount = 0; nodeId: this.manifest.pipeline.entryNodeId,
let sequentialHardFailures = 0; payload: input.initialPayload,
});
while (ready.size > 0) { const records: PipelineExecutionRecord[] = [];
throwIfAborted(input.signal); const events: DomainEvent[] = [];
const frontier: QueueItem[] = [...ready.entries()].map(([nodeId, depth]) => ({ nodeId, depth })); const ready = new Map<string, number>([[this.manifest.pipeline.entryNodeId, 0]]);
ready.clear(); const completedNodes = new Set<string>();
for (const group of this.buildExecutionGroups(frontier)) { const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 4);
const groupResults = group.concurrent let executionCount = 0;
? await Promise.all( let sequentialHardFailures = 0;
group.items.map((queueItem) => this.executeNode({ ...queueItem, sessionId: input.sessionId, signal: input.signal })),
)
: await this.executeSequentialGroup(input.sessionId, group.items, input.signal);
for (const nodeResult of groupResults) { while (ready.size > 0) {
records.push(...nodeResult.records); throwIfAborted(input.signal);
events.push(...nodeResult.events); const frontier: QueueItem[] = [...ready.entries()].map(([nodeId, depth]) => ({ nodeId, depth }));
ready.clear();
executionCount += nodeResult.records.length; for (const group of this.buildExecutionGroups(frontier)) {
if (executionCount > maxExecutions) { const groupResults = group.concurrent
throw new Error("Pipeline execution exceeded the configured safe execution bound."); ? await Promise.all(
} group.items.map((queueItem) => this.executeNode({ ...queueItem, sessionId: input.sessionId, signal: input.signal })),
for (const wasHardFailure of nodeResult.hardFailureAttempts) {
if (wasHardFailure) {
sequentialHardFailures += 1;
} else {
sequentialHardFailures = 0;
}
if (
this.failurePolicy.shouldAbortAfterSequentialHardFailures(
sequentialHardFailures,
this.hardFailureThreshold,
) )
) { : await this.executeSequentialGroup(input.sessionId, group.items, input.signal);
throw new Error(
`Hard failure threshold reached (>=${String(this.hardFailureThreshold)} sequential API/network/403 failures). Pipeline aborted.`,
);
}
}
completedNodes.add(nodeResult.queueItem.nodeId); for (const nodeResult of groupResults) {
records.push(...nodeResult.records);
events.push(...nodeResult.events);
const state = await this.stateManager.readState(input.sessionId); executionCount += nodeResult.records.length;
const candidateEdges = this.edgesBySource.get(nodeResult.queueItem.nodeId) ?? []; if (executionCount > maxExecutions) {
const eventTypes = new Set(nodeResult.finalEventTypes); throw new Error("Pipeline execution exceeded the configured safe execution bound.");
for (const edge of candidateEdges) {
if (!shouldEdgeRun(edge, nodeResult.finalResult.status, eventTypes)) {
continue;
} }
if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) { for (const wasHardFailure of nodeResult.hardFailureAttempts) {
continue; if (wasHardFailure) {
sequentialHardFailures += 1;
} else {
sequentialHardFailures = 0;
}
if (
this.failurePolicy.shouldAbortAfterSequentialHardFailures(
sequentialHardFailures,
this.hardFailureThreshold,
)
) {
throw new Error(
`Hard failure threshold reached (>=${String(this.hardFailureThreshold)} sequential API/network/403 failures). Pipeline aborted.`,
);
}
} }
if (completedNodes.has(edge.to)) { completedNodes.add(nodeResult.queueItem.nodeId);
continue;
}
await this.stateManager.writeHandoff(input.sessionId, { const state = await this.stateManager.readState(input.sessionId);
nodeId: edge.to, const candidateEdges = this.edgesBySource.get(nodeResult.queueItem.nodeId) ?? [];
fromNodeId: nodeResult.queueItem.nodeId, const eventTypes = new Set(nodeResult.finalEventTypes);
payload: nodeResult.finalPayload,
});
const nextDepth = nodeResult.queueItem.depth + 1; for (const edge of candidateEdges) {
const existingDepth = ready.get(edge.to); if (!shouldEdgeRun(edge, nodeResult.finalResult.status, eventTypes)) {
if (existingDepth === undefined || nextDepth < existingDepth) { continue;
ready.set(edge.to, nextDepth); }
if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) {
continue;
}
if (completedNodes.has(edge.to)) {
continue;
}
await this.stateManager.writeHandoff(input.sessionId, {
nodeId: edge.to,
fromNodeId: nodeResult.queueItem.nodeId,
payload: nodeResult.finalPayload,
});
const nextDepth = nodeResult.queueItem.depth + 1;
const existingDepth = ready.get(edge.to);
if (existingDepth === undefined || nextDepth < existingDepth) {
ready.set(edge.to, nextDepth);
}
} }
} }
} }
} }
}
const finalState = await this.stateManager.readState(input.sessionId); const finalState = await this.stateManager.readState(input.sessionId);
const status = this.computeAggregateStatus(records); const status = this.computeAggregateStatus(records);
if (status === "success") { if (status === "success") {
await this.options.projectContextStore.patchState({ await this.options.projectContextStore.patchState({
artifactPointers: { artifactPointers: {
[`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId), [`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId),
},
});
}
await this.options.runtimeEventPublisher?.publish({
type: "session.completed",
severity: status === "success" ? "info" : "critical",
sessionId: input.sessionId,
message: `Pipeline session completed with status "${status}".`,
metadata: {
status,
recordCount: records.length,
eventCount: events.length,
}, },
}); });
}
return { return {
sessionId: input.sessionId, sessionId: input.sessionId,
status, status,
records, records,
events, events,
finalState, finalState,
}; };
} catch (error) {
await this.options.runtimeEventPublisher?.publish({
type: "session.failed",
severity: "critical",
sessionId: input.sessionId,
message: `Pipeline session failed: ${toErrorMessage(error)}`,
metadata: {
errorMessage: toErrorMessage(error),
},
});
throw error;
}
} }
private computeAggregateStatus(records: PipelineExecutionRecord[]): PipelineAggregateStatus { private computeAggregateStatus(records: PipelineExecutionRecord[]): PipelineAggregateStatus {
@@ -730,13 +856,15 @@ export class PipelineExecutor {
}): Promise<ActorExecutionResult> { }): Promise<ActorExecutionResult> {
try { try {
throwIfAborted(input.signal); throwIfAborted(input.signal);
const toolClearance = this.personaRegistry.getToolClearance(input.node.personaId);
return await input.executor({ return await input.executor({
sessionId: input.sessionId, sessionId: input.sessionId,
node: input.node, node: input.node,
prompt: input.prompt, prompt: input.prompt,
context: input.context, context: input.context,
signal: input.signal, signal: input.signal,
toolClearance: this.personaRegistry.getToolClearance(input.node.personaId), toolClearance,
mcp: this.buildActorMcpContext(toolClearance),
security: this.securityContext, security: this.securityContext,
}); });
} catch (error) { } catch (error) {
@@ -773,6 +901,113 @@ export class PipelineExecutor {
} }
} }
private buildActorMcpContext(toolClearance: ToolClearancePolicy): ActorExecutionMcpContext {
const resolveConfig = (context: McpLoadContext = {}): LoadedMcpConfig => {
if (!this.options.resolveMcpConfig) {
return {};
}
return this.options.resolveMcpConfig({
...context,
toolClearance,
});
};
const createToolPermissionHandler = (): ActorToolPermissionHandler =>
this.createToolPermissionHandler(toolClearance);
return {
registry: this.options.mcpRegistry,
resolveConfig,
createToolPermissionHandler,
createClaudeCanUseTool: createToolPermissionHandler,
};
}
private createToolPermissionHandler(toolClearance: ToolClearancePolicy): ActorToolPermissionHandler {
const normalizedToolClearance = parseToolClearancePolicy(toolClearance);
const allowlist = new Set(normalizedToolClearance.allowlist);
const banlist = new Set(normalizedToolClearance.banlist);
const rulesEngine = this.securityContext?.rulesEngine;
return async (toolName, _input, options) => {
const toolUseID = options.toolUseID;
if (options.signal.aborted) {
return {
behavior: "deny",
message: "Tool execution denied because the request signal is aborted.",
interrupt: true,
...(toolUseID ? { toolUseID } : {}),
};
}
const candidates = toToolNameCandidates(toolName);
const banMatch = candidates.find((candidate) => banlist.has(candidate));
if (banMatch) {
if (rulesEngine) {
try {
rulesEngine.assertToolInvocationAllowed({
tool: banMatch,
toolClearance: normalizedToolClearance,
});
} catch {
// Security audit event already emitted by rules engine.
}
}
return {
behavior: "deny",
message: `Tool "${toolName}" is blocked by actor tool policy.`,
interrupt: true,
...(toolUseID ? { toolUseID } : {}),
};
}
if (allowlist.size > 0) {
const allowMatch = candidates.find((candidate) => allowlist.has(candidate));
if (!allowMatch) {
if (rulesEngine) {
try {
rulesEngine.assertToolInvocationAllowed({
tool: toolName,
toolClearance: normalizedToolClearance,
});
} catch {
// Security audit event already emitted by rules engine.
}
}
return {
behavior: "deny",
message: `Tool "${toolName}" is not in the actor tool allowlist.`,
interrupt: true,
...(toolUseID ? { toolUseID } : {}),
};
}
rulesEngine?.assertToolInvocationAllowed({
tool: allowMatch,
toolClearance: normalizedToolClearance,
});
return {
behavior: "allow",
...(toolUseID ? { toolUseID } : {}),
};
}
rulesEngine?.assertToolInvocationAllowed({
tool: candidates[0] ?? toolName,
toolClearance: normalizedToolClearance,
});
return {
behavior: "allow",
...(toolUseID ? { toolUseID } : {}),
};
};
}
private createAttemptDomainEvents(input: { private createAttemptDomainEvents(input: {
sessionId: string; sessionId: string;
nodeId: string; nodeId: string;

View File

@@ -5,6 +5,8 @@ import { tmpdir } from "node:os";
import { resolve } from "node:path"; import { resolve } from "node:path";
import { SchemaDrivenExecutionEngine } from "../src/agents/orchestration.js"; import { SchemaDrivenExecutionEngine } from "../src/agents/orchestration.js";
import type { ActorExecutionResult } from "../src/agents/pipeline.js"; import type { ActorExecutionResult } from "../src/agents/pipeline.js";
import { loadConfig } from "../src/config.js";
import { createDefaultMcpRegistry, createMcpHandlerShell } from "../src/mcp.js";
import { SecurityViolationError } from "../src/security/index.js"; import { SecurityViolationError } from "../src/security/index.js";
function createManifest(): unknown { function createManifest(): unknown {
@@ -252,6 +254,163 @@ test("runs DAG pipeline with state-dependent routing and retry behavior", async
assert.deepEqual(engine.planChildPersonas({ parentPersonaId: "task", depth: 1 }), ["coder"]); assert.deepEqual(engine.planChildPersonas({ parentPersonaId: "task", depth: 1 }), ["coder"]);
}); });
test("injects mcp registry/config helpers and enforces Claude tool gate in actor executor", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const mcpConfigPath = resolve(workspaceRoot, "mcp.config.json");
await writeFile(
mcpConfigPath,
JSON.stringify(
{
servers: {
"task-master-tools": {
handler: "claude-task-master",
type: "stdio",
command: "node",
args: ["task-master-mcp.js"],
enabled_tools: ["read_file", "write_file", "search"],
},
},
},
null,
2,
),
"utf8",
);
const config = loadConfig({
...process.env,
MCP_CONFIG_PATH: mcpConfigPath,
});
const customRegistry = createDefaultMcpRegistry();
customRegistry.register(
createMcpHandlerShell({
id: "custom-task-mcp-handler",
description: "custom task handler",
matches: () => false,
}),
);
const manifest = {
schemaVersion: "1" as const,
topologies: ["sequential"],
personas: [
{
id: "task",
displayName: "Task",
systemPromptTemplate: "Task executor",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: ["rm"],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 2,
maxRetries: 0,
},
pipeline: {
entryNodeId: "task-node",
nodes: [
{
id: "task-node",
actorId: "task_actor",
personaId: "task",
},
],
edges: [],
},
};
const engine = new SchemaDrivenExecutionEngine({
manifest,
config,
mcpRegistry: customRegistry,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxChildren: 1,
maxDepth: 2,
maxRetries: 0,
},
actorExecutors: {
task_actor: async (input) => {
assert.equal(input.mcp.registry, customRegistry);
const codexConfig = input.mcp.resolveConfig({
providerHint: "codex",
});
const codexServer = (codexConfig.codexConfig?.mcp_servers as Record<string, Record<string, unknown>> | undefined)?.[
"task-master-tools"
];
assert.ok(codexServer);
assert.deepEqual(codexServer.enabled_tools, ["read_file", "write_file"]);
assert.deepEqual(codexServer.disabled_tools, ["rm"]);
const claudeConfig = input.mcp.resolveConfig({
providerHint: "claude",
});
assert.ok(claudeConfig.claudeMcpServers?.["task-master-tools"]);
const canUseTool = input.mcp.createClaudeCanUseTool();
const allow = await canUseTool(
"mcp__claude-task-master__read_file",
{},
{
signal: new AbortController().signal,
toolUseID: "allow-1",
},
);
assert.deepEqual(allow, {
behavior: "allow",
toolUseID: "allow-1",
});
const denyBlocked = await canUseTool(
"mcp__claude-task-master__rm",
{},
{
signal: new AbortController().signal,
toolUseID: "deny-1",
},
);
assert.equal(denyBlocked.behavior, "deny");
const denyMissingAllowlist = await canUseTool(
"mcp__claude-task-master__search",
{},
{
signal: new AbortController().signal,
toolUseID: "deny-2",
},
);
assert.equal(denyMissingAllowlist.behavior, "deny");
return {
status: "success",
payload: {
ok: true,
},
};
},
},
});
const result = await engine.runSession({
sessionId: "session-mcp-gate-1",
initialPayload: {
task: "verify mcp gate",
},
});
assert.equal(result.status, "success");
});
test("runs parallel topology blocks concurrently and routes via domain-event edges", async () => { test("runs parallel topology blocks concurrently and routes via domain-event edges", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
@@ -916,3 +1075,105 @@ test("can map security violations to validation_fail for retry-unrolled remediat
["secure-node:validation_fail:1", "secure-node:success:2"], ["secure-node:validation_fail:1", "secure-node:success:2"],
); );
}); });
test("runtime event side-channel logs session and node lifecycle without changing pipeline behavior", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const runtimeEventLogRelativePath = ".ai_ops/events/test-runtime-events.ndjson";
const runtimeEventLogPath = resolve(workspaceRoot, runtimeEventLogRelativePath);
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "runner",
displayName: "Runner",
systemPromptTemplate: "Runner",
toolClearance: {
allowlist: ["read_file"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 2,
maxRetries: 0,
},
pipeline: {
entryNodeId: "node-1",
nodes: [
{
id: "node-1",
actorId: "runner_actor",
personaId: "runner",
},
],
edges: [],
},
} as const;
const config = loadConfig({
AGENT_RUNTIME_EVENT_LOG_PATH: runtimeEventLogRelativePath,
});
const engine = new SchemaDrivenExecutionEngine({
manifest,
config,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxDepth: 2,
maxRetries: 0,
maxChildren: 1,
runtimeContext: {},
},
actorExecutors: {
runner_actor: async () => ({
status: "success",
payload: {
complete: true,
usage: {
input_tokens: 120,
output_tokens: 80,
tool_calls: 2,
duration_ms: 450,
},
},
}),
},
});
const result = await engine.runSession({
sessionId: "session-runtime-events",
initialPayload: {
task: "Emit runtime events",
},
});
assert.equal(result.status, "success");
const lines = (await readFile(runtimeEventLogPath, "utf8"))
.trim()
.split("\n")
.filter((line) => line.length > 0);
assert.ok(lines.length >= 4);
const events = lines.map((line) => JSON.parse(line) as Record<string, unknown>);
const eventTypes = new Set(events.map((event) => String(event.type)));
assert.ok(eventTypes.has("session.started"));
assert.ok(eventTypes.has("node.attempt.completed"));
assert.ok(eventTypes.has("domain.validation_passed"));
assert.ok(eventTypes.has("session.completed"));
const nodeAttemptEvent = events.find((event) => event.type === "node.attempt.completed");
assert.ok(nodeAttemptEvent);
const usage = nodeAttemptEvent.usage as Record<string, unknown>;
assert.equal(usage.tokenInput, 120);
assert.equal(usage.tokenOutput, 80);
assert.equal(usage.toolCalls, 2);
assert.equal(usage.durationMs, 450);
});