diff --git a/README.md b/README.md index 16f6da8..434d189 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,13 @@ TypeScript runtime for deterministic multi-agent execution with: - Resource provisioning (git worktrees + deterministic port ranges) - MCP configuration layer with handler policy hooks - Security middleware for shell/tool policy enforcement +- Runtime event fan-out (NDJSON analytics log + optional Discord webhook notifications) ## Architecture Summary - `SchemaDrivenExecutionEngine.runSession(...)` is the single execution entrypoint. - `PipelineExecutor` owns runtime control flow and topology dispatch while delegating failure classification and persistence/event side-effects to dedicated policies. +- Runtime events are emitted as best-effort side-channel telemetry and do not affect orchestration control flow. - `AgentManager` is an internal utility used by the pipeline when fan-out/retry-unrolled behavior is required. - Session state is persisted under `AGENT_STATE_ROOT`. - Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with schema-versioned JSON (`schemaVersion`) and domains: @@ -39,6 +41,7 @@ TypeScript runtime for deterministic multi-agent execution with: - `provisioning.ts`: resource provisioning and child suballocation helpers - `src/mcp`: MCP config types/conversion/handlers - `src/security`: shell AST parsing, rules engine, secure executor, and audit sinks +- `src/telemetry`: runtime event schema, sink fan-out, file sink, and Discord webhook sink - `src/examples`: provider entrypoints (`codex.ts`, `claude.ts`) - `src/config.ts`: centralized env parsing/validation/defaulting - `tests`: manager, manifest, pipeline/orchestration, state, provisioning, MCP @@ -101,6 +104,78 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em - session closure aborts child recursive work - run summaries expose aggregate `status`: success requires successful terminal executed DAG nodes and no critical-path failure +## Runtime Events + +- The pipeline emits runtime lifecycle events (`session.started`, `node.attempt.completed`, `domain.*`, `session.completed`, `session.failed`). +- Runtime events are fan-out only and never used for edge-routing decisions. +- Default sink writes NDJSON to `AGENT_RUNTIME_EVENT_LOG_PATH`. +- Optional Discord sink posts high-visibility lifecycle/error events through webhook configuration. +- Existing security command audit output (`AGENT_SECURITY_AUDIT_LOG_PATH`) remains in place and is also mirrored into runtime events. + +### Runtime Event Fields + +Each runtime event is written as one NDJSON object with: + +- `id`, `timestamp`, `type`, `severity` +- `sessionId`, `nodeId`, `attempt` +- `message` +- optional `usage` (`tokenInput`, `tokenOutput`, `tokenTotal`, `toolCalls`, `durationMs`, `costUsd`) +- optional structured `metadata` + +### Runtime Event Setup + +Add these variables in `.env` (or use defaults): + +```bash +AGENT_RUNTIME_EVENT_LOG_PATH=.ai_ops/events/runtime-events.ndjson +AGENT_RUNTIME_DISCORD_WEBHOOK_URL= +AGENT_RUNTIME_DISCORD_MIN_SEVERITY=critical +AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES=session.started,session.completed,session.failed +``` + +Notes: + +- File sink is always enabled and appends NDJSON. +- Discord sink is enabled only when `AGENT_RUNTIME_DISCORD_WEBHOOK_URL` is set. +- Discord notifications are sent when event severity is at or above `AGENT_RUNTIME_DISCORD_MIN_SEVERITY`. +- Event types in `AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES` are always sent, regardless of severity. + +### Event Types Emitted by Runtime + +- Session lifecycle: + - `session.started` + - `session.completed` + - `session.failed` +- Node/domain lifecycle: + - `node.attempt.completed` + - `domain.` (for example `domain.validation_failed`) +- Security mirror events: + - `security.shell.command_profiled` + - `security.shell.command_allowed` + - `security.shell.command_blocked` + - `security.tool.invocation_allowed` + - `security.tool.invocation_blocked` + +### Analytics Quick Start + +Inspect latest events: + +```bash +tail -n 50 .ai_ops/events/runtime-events.ndjson +``` + +Count events by type: + +```bash +jq -r '.type' .ai_ops/events/runtime-events.ndjson | sort | uniq -c +``` + +Get only critical events: + +```bash +jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson +``` + ## Security Middleware - Shell command parsing uses async `sh-syntax` (WASM-backed mvdan/sh parser) with fail-closed command/redirect extraction. @@ -117,6 +192,8 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em - optional uid/gid drop - stdout/stderr streaming hooks for audit - Every actor execution input now includes `security` helpers (`rulesEngine`, `createCommandExecutor(...)`) so executors can enforce shell/tool policy at the execution boundary. +- Every actor execution input now includes `mcp` helpers (`registry`, `resolveConfig(...)`) so MCP server config resolution stays centrally policy-controlled per persona/tool-clearance. +- For Claude-based executors, use `input.mcp.createClaudeCanUseTool()` as the SDK `canUseTool` callback to enforce persona allowlist/banlist before each tool invocation. - Pipeline behavior on `SecurityViolationError` is configurable: - `hard_abort` (default) - `validation_fail` (retry-unrolled remediation) @@ -127,6 +204,7 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em - `CODEX_API_KEY` - `OPENAI_API_KEY` +- `OPENAI_AUTH_MODE` (`auto`, `chatgpt`, or `api_key`) - `OPENAI_BASE_URL` - `CODEX_SKIP_GIT_CHECK` - `CLAUDE_CODE_OAUTH_TOKEN` (preferred for Claude auth; takes precedence over `ANTHROPIC_API_KEY`) @@ -171,6 +249,13 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em - `AGENT_SECURITY_DROP_UID` - `AGENT_SECURITY_DROP_GID` +### Runtime Events / Telemetry + +- `AGENT_RUNTIME_EVENT_LOG_PATH` +- `AGENT_RUNTIME_DISCORD_WEBHOOK_URL` +- `AGENT_RUNTIME_DISCORD_MIN_SEVERITY` (`info`, `warning`, or `critical`) +- `AGENT_RUNTIME_DISCORD_ALWAYS_NOTIFY_TYPES` (CSV event types such as `session.started,session.completed,session.failed`) + ### Runtime-Injected (Do Not Configure In `.env`) - `AGENT_REPO_ROOT` @@ -183,6 +268,14 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em Defaults are documented in `.env.example`. +Auth behavior notes: + +- OpenAI/Codex: + - `OPENAI_AUTH_MODE=auto` (default) prefers API keys when configured, and otherwise relies on existing Codex CLI login (`codex login` / ChatGPT plan auth). + - `OPENAI_AUTH_MODE=chatgpt` always omits API key injection so Codex uses ChatGPT subscription auth/session. +- Claude: + - If `CLAUDE_CODE_OAUTH_TOKEN` and `ANTHROPIC_API_KEY` are both unset, runtime auth options are omitted and Claude Agent SDK can use existing Claude Code login state. + ## Quality Gate ```bash diff --git a/docs/security-middleware.md b/docs/security-middleware.md index 092a543..529fad8 100644 --- a/docs/security-middleware.md +++ b/docs/security-middleware.md @@ -35,8 +35,14 @@ This middleware provides a first-pass hardening layer for agent-executed shell c `McpRegistry.resolveServerWithHandler(...)` now accepts optional tool clearance and applies it to resolved Codex MCP tool lists (`enabled_tools`, `disabled_tools`). +`ActorExecutionInput` now carries an `mcp` execution context with: + +- `registry`: resolved runtime `McpRegistry` +- `resolveConfig(...)`: centralized MCP config resolution with persona tool-clearance applied +- `createClaudeCanUseTool()`: helper for Claude SDK `canUseTool` callback so each tool invocation is allowlist/banlist-enforced before execution + ## Known limits and TODOs - AST validation is token-based and does not yet model full shell evaluation semantics (e.g. runtime-generated paths from env expansion). -- Audit output is line-oriented file logging; move to a centralized telemetry pipeline for long-term profiling. +- Audit output remains line-oriented file logging; runtime events now mirror security decisions for side-channel analytics and alerting. - Deno sandbox mode is not enforced yet. A future executor mode can wrap shell runs via `deno run` with strict `--allow-read/--allow-run` flags. diff --git a/src/agents/orchestration.ts b/src/agents/orchestration.ts index cedae23..11cc13e 100644 --- a/src/agents/orchestration.ts +++ b/src/agents/orchestration.ts @@ -1,6 +1,6 @@ import { resolve } from "node:path"; import { getConfig, loadConfig, type AppConfig } from "../config.js"; -import { createDefaultMcpRegistry, McpRegistry } from "../mcp.js"; +import { createDefaultMcpRegistry, loadMcpConfigFromEnv, McpRegistry } from "../mcp.js"; import { parseAgentManifest, type AgentManifest } from "./manifest.js"; import { AgentManager } from "./manager.js"; import { @@ -19,9 +19,17 @@ import { FileSystemStateContextManager, type StoredSessionState } from "./state- import type { JsonObject } from "./types.js"; import { SecureCommandExecutor, + type SecurityAuditEvent, + type SecurityAuditSink, SecurityRulesEngine, createFileSecurityAuditSink, } from "../security/index.js"; +import { + RuntimeEventPublisher, + createDiscordWebhookRuntimeEventSink, + createFileRuntimeEventSink, + type RuntimeEventSeverity, +} from "../telemetry/index.js"; export type OrchestrationSettings = { workspaceRoot: string; @@ -76,13 +84,106 @@ function getChildrenByParent(manifest: AgentManifest): Map; + settings: OrchestrationSettings; +}): RuntimeEventPublisher { + const sinks = [ + createFileRuntimeEventSink( + resolve(input.settings.workspaceRoot, input.config.runtimeEvents.logPath), + ), + ]; + + if (input.config.runtimeEvents.discordWebhookUrl) { + sinks.push( + createDiscordWebhookRuntimeEventSink({ + webhookUrl: input.config.runtimeEvents.discordWebhookUrl, + minSeverity: input.config.runtimeEvents.discordMinSeverity, + alwaysNotifyTypes: input.config.runtimeEvents.discordAlwaysNotifyTypes, + }), + ); + } + + return new RuntimeEventPublisher({ + sinks, + }); +} + function createActorSecurityContext(input: { config: Readonly; settings: OrchestrationSettings; + runtimeEventPublisher: RuntimeEventPublisher; }): ActorExecutionSecurityContext { - const auditSink = createFileSecurityAuditSink( + const fileAuditSink = createFileSecurityAuditSink( resolve(input.settings.workspaceRoot, input.config.security.auditLogPath), ); + const auditSink: SecurityAuditSink = (event): void => { + fileAuditSink(event); + void input.runtimeEventPublisher.publish({ + type: `security.${event.type}`, + severity: mapSecurityAuditSeverity(event), + message: toSecurityAuditMessage(event), + metadata: toSecurityAuditMetadata(event), + }); + }; const rulesEngine = new SecurityRulesEngine( { allowedBinaries: input.config.security.shellAllowedBinaries, @@ -126,10 +227,12 @@ export class SchemaDrivenExecutionEngine { private readonly stateManager: FileSystemStateContextManager; private readonly projectContextStore: FileSystemProjectContextStore; private readonly actorExecutors: ReadonlyMap; + private readonly config: Readonly; private readonly settings: OrchestrationSettings; private readonly childrenByParent: Map; private readonly manager: AgentManager; private readonly mcpRegistry: McpRegistry; + private readonly runtimeEventPublisher: RuntimeEventPublisher; private readonly securityContext: ActorExecutionSecurityContext; constructor(input: { @@ -147,6 +250,7 @@ export class SchemaDrivenExecutionEngine { this.manifest = parseAgentManifest(input.manifest); const config = input.config ?? getConfig(); + this.config = config; this.settings = { workspaceRoot: resolve(input.settings?.workspaceRoot ?? process.cwd()), stateRoot: resolve(input.settings?.stateRoot ?? config.orchestration.stateRoot), @@ -179,9 +283,14 @@ export class SchemaDrivenExecutionEngine { maxRecursiveDepth: config.agentManager.maxRecursiveDepth, }); this.mcpRegistry = input.mcpRegistry ?? createDefaultMcpRegistry(); + this.runtimeEventPublisher = createRuntimeEventPublisher({ + config, + settings: this.settings, + }); this.securityContext = createActorSecurityContext({ config, settings: this.settings, + runtimeEventPublisher: this.runtimeEventPublisher, }); for (const persona of this.manifest.personas) { @@ -261,8 +370,21 @@ export class SchemaDrivenExecutionEngine { managerSessionId, projectContextStore: this.projectContextStore, mcpRegistry: this.mcpRegistry, + resolveMcpConfig: ({ providerHint, prompt, toolClearance }) => + loadMcpConfigFromEnv( + { + providerHint, + prompt, + }, + { + config: this.config, + registry: this.mcpRegistry, + toolClearance, + }, + ), securityViolationHandling: this.settings.securityViolationHandling, securityContext: this.securityContext, + runtimeEventPublisher: this.runtimeEventPublisher, }, ); try { diff --git a/src/agents/pipeline.ts b/src/agents/pipeline.ts index 1f37c49..aefdc7b 100644 --- a/src/agents/pipeline.ts +++ b/src/agents/pipeline.ts @@ -17,6 +17,7 @@ import { import type { AgentManifest, PipelineEdge, PipelineNode, RouteCondition } from "./manifest.js"; import type { AgentManager, RecursiveChildIntent } from "./manager.js"; import type { McpRegistry } from "../mcp/handlers.js"; +import type { LoadedMcpConfig, McpLoadContext } from "../mcp/types.js"; import { PersonaRegistry } from "./persona-registry.js"; import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js"; import { @@ -27,11 +28,14 @@ import { import type { JsonObject } from "./types.js"; import { SecureCommandExecutor, + parseToolClearancePolicy, SecurityRulesEngine, SecurityViolationError, type ExecutionEnvPolicy, type SecurityViolationHandling, + type ToolClearancePolicy, } from "../security/index.js"; +import { type RuntimeEventPublisher } from "../telemetry/index.js"; export type ActorResultStatus = "success" | "validation_fail" | "failure"; export type ActorFailureKind = "soft" | "hard"; @@ -47,16 +51,43 @@ export type ActorExecutionResult = { failureCode?: string; }; +export type ActorToolPermissionResult = + | { + behavior: "allow"; + toolUseID?: string; + } + | { + behavior: "deny"; + message: string; + interrupt?: boolean; + toolUseID?: string; + }; + +export type ActorToolPermissionHandler = ( + toolName: string, + input: Record, + options: { + signal: AbortSignal; + toolUseID?: string; + agentID?: string; + }, +) => Promise; + +export type ActorExecutionMcpContext = { + registry: McpRegistry; + resolveConfig: (context?: McpLoadContext) => LoadedMcpConfig; + createToolPermissionHandler: () => ActorToolPermissionHandler; + createClaudeCanUseTool: () => ActorToolPermissionHandler; +}; + export type ActorExecutionInput = { sessionId: string; node: PipelineNode; prompt: string; context: NodeExecutionContext; signal: AbortSignal; - toolClearance: { - allowlist: string[]; - banlist: string[]; - }; + toolClearance: ToolClearancePolicy; + mcp: ActorExecutionMcpContext; security?: ActorExecutionSecurityContext; }; @@ -92,8 +123,10 @@ export type PipelineExecutorOptions = { failurePolicy?: FailurePolicy; lifecycleObserver?: PipelineLifecycleObserver; hardFailureThreshold?: number; + resolveMcpConfig?: (input: McpLoadContext & { toolClearance: ToolClearancePolicy }) => LoadedMcpConfig; securityViolationHandling?: SecurityViolationHandling; securityContext?: ActorExecutionSecurityContext; + runtimeEventPublisher?: RuntimeEventPublisher; }; export type ActorExecutionSecurityContext = { @@ -245,6 +278,63 @@ function toAbortError(signal: AbortSignal): Error { return error; } +function toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + return String(error); +} + +function dedupeStrings(values: readonly string[]): string[] { + const deduped: string[] = []; + const seen = new Set(); + + for (const value of values) { + const normalized = value.trim(); + if (!normalized || seen.has(normalized)) { + continue; + } + seen.add(normalized); + deduped.push(normalized); + } + + return deduped; +} + +function toToolNameCandidates(toolName: string): string[] { + const trimmed = toolName.trim(); + if (!trimmed) { + return []; + } + + const candidates = [trimmed]; + const maybeSuffix = (separator: string): void => { + const index = trimmed.lastIndexOf(separator); + if (index === -1 || index + separator.length >= trimmed.length) { + return; + } + candidates.push(trimmed.slice(index + separator.length)); + }; + + const doubleUnderscoreParts = trimmed.split("__").filter((entry) => entry.length > 0); + if (doubleUnderscoreParts.length >= 2) { + const last = doubleUnderscoreParts[doubleUnderscoreParts.length - 1]; + if (last) { + candidates.push(last); + } + if (doubleUnderscoreParts.length >= 3) { + candidates.push(doubleUnderscoreParts.slice(2).join("__")); + } + } + + maybeSuffix(":"); + maybeSuffix("."); + maybeSuffix("/"); + maybeSuffix("\\"); + + return dedupeStrings(candidates); +} + function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload { if (status === "success") { return { @@ -320,6 +410,7 @@ export class PipelineExecutor { stateManager: this.stateManager, projectContextStore: this.options.projectContextStore, domainEventBus: this.domainEventBus, + runtimeEventPublisher: this.options.runtimeEventPublisher, }); for (const node of manifest.pipeline.nodes) { @@ -342,136 +433,171 @@ export class PipelineExecutor { initialState?: Partial; signal?: AbortSignal; }): Promise { - const projectContext = await this.options.projectContextStore.readState(); - - await this.stateManager.initializeSession(input.sessionId, { - ...(input.initialState ?? {}), - flags: { - ...projectContext.globalFlags, - ...(input.initialState?.flags ?? {}), - }, + await this.options.runtimeEventPublisher?.publish({ + type: "session.started", + severity: "info", + sessionId: input.sessionId, + message: "Pipeline session started.", metadata: { - project_context: { - globalFlags: { ...projectContext.globalFlags }, - artifactPointers: { ...projectContext.artifactPointers }, - taskQueue: projectContext.taskQueue.map((task) => ({ - id: task.id, - title: task.title, - status: task.status, - ...(task.assignee ? { assignee: task.assignee } : {}), - ...(task.metadata ? { metadata: task.metadata } : {}), - })), - }, - ...((input.initialState?.metadata ?? {}) as JsonObject), + entryNodeId: this.manifest.pipeline.entryNodeId, }, }); - await this.stateManager.writeHandoff(input.sessionId, { - nodeId: this.manifest.pipeline.entryNodeId, - payload: input.initialPayload, - }); + try { + const projectContext = await this.options.projectContextStore.readState(); - const records: PipelineExecutionRecord[] = []; - const events: DomainEvent[] = []; - const ready = new Map([[this.manifest.pipeline.entryNodeId, 0]]); - const completedNodes = new Set(); + await this.stateManager.initializeSession(input.sessionId, { + ...(input.initialState ?? {}), + flags: { + ...projectContext.globalFlags, + ...(input.initialState?.flags ?? {}), + }, + metadata: { + project_context: { + globalFlags: { ...projectContext.globalFlags }, + artifactPointers: { ...projectContext.artifactPointers }, + taskQueue: projectContext.taskQueue.map((task) => ({ + id: task.id, + title: task.title, + status: task.status, + ...(task.assignee ? { assignee: task.assignee } : {}), + ...(task.metadata ? { metadata: task.metadata } : {}), + })), + }, + ...((input.initialState?.metadata ?? {}) as JsonObject), + }, + }); - const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 4); - let executionCount = 0; - let sequentialHardFailures = 0; + await this.stateManager.writeHandoff(input.sessionId, { + nodeId: this.manifest.pipeline.entryNodeId, + payload: input.initialPayload, + }); - while (ready.size > 0) { - throwIfAborted(input.signal); - const frontier: QueueItem[] = [...ready.entries()].map(([nodeId, depth]) => ({ nodeId, depth })); - ready.clear(); + const records: PipelineExecutionRecord[] = []; + const events: DomainEvent[] = []; + const ready = new Map([[this.manifest.pipeline.entryNodeId, 0]]); + const completedNodes = new Set(); - for (const group of this.buildExecutionGroups(frontier)) { - const groupResults = group.concurrent - ? await Promise.all( - group.items.map((queueItem) => this.executeNode({ ...queueItem, sessionId: input.sessionId, signal: input.signal })), - ) - : await this.executeSequentialGroup(input.sessionId, group.items, input.signal); + const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 4); + let executionCount = 0; + let sequentialHardFailures = 0; - for (const nodeResult of groupResults) { - records.push(...nodeResult.records); - events.push(...nodeResult.events); + while (ready.size > 0) { + throwIfAborted(input.signal); + const frontier: QueueItem[] = [...ready.entries()].map(([nodeId, depth]) => ({ nodeId, depth })); + ready.clear(); - executionCount += nodeResult.records.length; - if (executionCount > maxExecutions) { - throw new Error("Pipeline execution exceeded the configured safe execution bound."); - } - - for (const wasHardFailure of nodeResult.hardFailureAttempts) { - if (wasHardFailure) { - sequentialHardFailures += 1; - } else { - sequentialHardFailures = 0; - } - - if ( - this.failurePolicy.shouldAbortAfterSequentialHardFailures( - sequentialHardFailures, - this.hardFailureThreshold, + for (const group of this.buildExecutionGroups(frontier)) { + const groupResults = group.concurrent + ? await Promise.all( + group.items.map((queueItem) => this.executeNode({ ...queueItem, sessionId: input.sessionId, signal: input.signal })), ) - ) { - throw new Error( - `Hard failure threshold reached (>=${String(this.hardFailureThreshold)} sequential API/network/403 failures). Pipeline aborted.`, - ); - } - } + : await this.executeSequentialGroup(input.sessionId, group.items, input.signal); - completedNodes.add(nodeResult.queueItem.nodeId); + for (const nodeResult of groupResults) { + records.push(...nodeResult.records); + events.push(...nodeResult.events); - const state = await this.stateManager.readState(input.sessionId); - const candidateEdges = this.edgesBySource.get(nodeResult.queueItem.nodeId) ?? []; - const eventTypes = new Set(nodeResult.finalEventTypes); - - for (const edge of candidateEdges) { - if (!shouldEdgeRun(edge, nodeResult.finalResult.status, eventTypes)) { - continue; + executionCount += nodeResult.records.length; + if (executionCount > maxExecutions) { + throw new Error("Pipeline execution exceeded the configured safe execution bound."); } - if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) { - continue; + for (const wasHardFailure of nodeResult.hardFailureAttempts) { + if (wasHardFailure) { + sequentialHardFailures += 1; + } else { + sequentialHardFailures = 0; + } + + if ( + this.failurePolicy.shouldAbortAfterSequentialHardFailures( + sequentialHardFailures, + this.hardFailureThreshold, + ) + ) { + throw new Error( + `Hard failure threshold reached (>=${String(this.hardFailureThreshold)} sequential API/network/403 failures). Pipeline aborted.`, + ); + } } - if (completedNodes.has(edge.to)) { - continue; - } + completedNodes.add(nodeResult.queueItem.nodeId); - await this.stateManager.writeHandoff(input.sessionId, { - nodeId: edge.to, - fromNodeId: nodeResult.queueItem.nodeId, - payload: nodeResult.finalPayload, - }); + const state = await this.stateManager.readState(input.sessionId); + const candidateEdges = this.edgesBySource.get(nodeResult.queueItem.nodeId) ?? []; + const eventTypes = new Set(nodeResult.finalEventTypes); - const nextDepth = nodeResult.queueItem.depth + 1; - const existingDepth = ready.get(edge.to); - if (existingDepth === undefined || nextDepth < existingDepth) { - ready.set(edge.to, nextDepth); + for (const edge of candidateEdges) { + if (!shouldEdgeRun(edge, nodeResult.finalResult.status, eventTypes)) { + continue; + } + + if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) { + continue; + } + + if (completedNodes.has(edge.to)) { + continue; + } + + await this.stateManager.writeHandoff(input.sessionId, { + nodeId: edge.to, + fromNodeId: nodeResult.queueItem.nodeId, + payload: nodeResult.finalPayload, + }); + + const nextDepth = nodeResult.queueItem.depth + 1; + const existingDepth = ready.get(edge.to); + if (existingDepth === undefined || nextDepth < existingDepth) { + ready.set(edge.to, nextDepth); + } } } } } - } - const finalState = await this.stateManager.readState(input.sessionId); - const status = this.computeAggregateStatus(records); - if (status === "success") { - await this.options.projectContextStore.patchState({ - artifactPointers: { - [`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId), + const finalState = await this.stateManager.readState(input.sessionId); + const status = this.computeAggregateStatus(records); + if (status === "success") { + await this.options.projectContextStore.patchState({ + artifactPointers: { + [`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId), + }, + }); + } + + await this.options.runtimeEventPublisher?.publish({ + type: "session.completed", + severity: status === "success" ? "info" : "critical", + sessionId: input.sessionId, + message: `Pipeline session completed with status "${status}".`, + metadata: { + status, + recordCount: records.length, + eventCount: events.length, }, }); - } - return { - sessionId: input.sessionId, - status, - records, - events, - finalState, - }; + return { + sessionId: input.sessionId, + status, + records, + events, + finalState, + }; + } catch (error) { + await this.options.runtimeEventPublisher?.publish({ + type: "session.failed", + severity: "critical", + sessionId: input.sessionId, + message: `Pipeline session failed: ${toErrorMessage(error)}`, + metadata: { + errorMessage: toErrorMessage(error), + }, + }); + throw error; + } } private computeAggregateStatus(records: PipelineExecutionRecord[]): PipelineAggregateStatus { @@ -730,13 +856,15 @@ export class PipelineExecutor { }): Promise { try { throwIfAborted(input.signal); + const toolClearance = this.personaRegistry.getToolClearance(input.node.personaId); return await input.executor({ sessionId: input.sessionId, node: input.node, prompt: input.prompt, context: input.context, signal: input.signal, - toolClearance: this.personaRegistry.getToolClearance(input.node.personaId), + toolClearance, + mcp: this.buildActorMcpContext(toolClearance), security: this.securityContext, }); } catch (error) { @@ -773,6 +901,113 @@ export class PipelineExecutor { } } + private buildActorMcpContext(toolClearance: ToolClearancePolicy): ActorExecutionMcpContext { + const resolveConfig = (context: McpLoadContext = {}): LoadedMcpConfig => { + if (!this.options.resolveMcpConfig) { + return {}; + } + + return this.options.resolveMcpConfig({ + ...context, + toolClearance, + }); + }; + + const createToolPermissionHandler = (): ActorToolPermissionHandler => + this.createToolPermissionHandler(toolClearance); + + return { + registry: this.options.mcpRegistry, + resolveConfig, + createToolPermissionHandler, + createClaudeCanUseTool: createToolPermissionHandler, + }; + } + + private createToolPermissionHandler(toolClearance: ToolClearancePolicy): ActorToolPermissionHandler { + const normalizedToolClearance = parseToolClearancePolicy(toolClearance); + const allowlist = new Set(normalizedToolClearance.allowlist); + const banlist = new Set(normalizedToolClearance.banlist); + const rulesEngine = this.securityContext?.rulesEngine; + + return async (toolName, _input, options) => { + const toolUseID = options.toolUseID; + if (options.signal.aborted) { + return { + behavior: "deny", + message: "Tool execution denied because the request signal is aborted.", + interrupt: true, + ...(toolUseID ? { toolUseID } : {}), + }; + } + + const candidates = toToolNameCandidates(toolName); + const banMatch = candidates.find((candidate) => banlist.has(candidate)); + if (banMatch) { + if (rulesEngine) { + try { + rulesEngine.assertToolInvocationAllowed({ + tool: banMatch, + toolClearance: normalizedToolClearance, + }); + } catch { + // Security audit event already emitted by rules engine. + } + } + + return { + behavior: "deny", + message: `Tool "${toolName}" is blocked by actor tool policy.`, + interrupt: true, + ...(toolUseID ? { toolUseID } : {}), + }; + } + + if (allowlist.size > 0) { + const allowMatch = candidates.find((candidate) => allowlist.has(candidate)); + if (!allowMatch) { + if (rulesEngine) { + try { + rulesEngine.assertToolInvocationAllowed({ + tool: toolName, + toolClearance: normalizedToolClearance, + }); + } catch { + // Security audit event already emitted by rules engine. + } + } + + return { + behavior: "deny", + message: `Tool "${toolName}" is not in the actor tool allowlist.`, + interrupt: true, + ...(toolUseID ? { toolUseID } : {}), + }; + } + + rulesEngine?.assertToolInvocationAllowed({ + tool: allowMatch, + toolClearance: normalizedToolClearance, + }); + + return { + behavior: "allow", + ...(toolUseID ? { toolUseID } : {}), + }; + } + + rulesEngine?.assertToolInvocationAllowed({ + tool: candidates[0] ?? toolName, + toolClearance: normalizedToolClearance, + }); + + return { + behavior: "allow", + ...(toolUseID ? { toolUseID } : {}), + }; + }; + } + private createAttemptDomainEvents(input: { sessionId: string; nodeId: string; diff --git a/tests/orchestration-engine.test.ts b/tests/orchestration-engine.test.ts index 195c19e..a09a1bf 100644 --- a/tests/orchestration-engine.test.ts +++ b/tests/orchestration-engine.test.ts @@ -5,6 +5,8 @@ import { tmpdir } from "node:os"; import { resolve } from "node:path"; import { SchemaDrivenExecutionEngine } from "../src/agents/orchestration.js"; import type { ActorExecutionResult } from "../src/agents/pipeline.js"; +import { loadConfig } from "../src/config.js"; +import { createDefaultMcpRegistry, createMcpHandlerShell } from "../src/mcp.js"; import { SecurityViolationError } from "../src/security/index.js"; function createManifest(): unknown { @@ -252,6 +254,163 @@ test("runs DAG pipeline with state-dependent routing and retry behavior", async assert.deepEqual(engine.planChildPersonas({ parentPersonaId: "task", depth: 1 }), ["coder"]); }); +test("injects mcp registry/config helpers and enforces Claude tool gate in actor executor", async () => { + const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); + const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); + const projectContextPath = resolve(stateRoot, "project-context.json"); + const mcpConfigPath = resolve(workspaceRoot, "mcp.config.json"); + + await writeFile( + mcpConfigPath, + JSON.stringify( + { + servers: { + "task-master-tools": { + handler: "claude-task-master", + type: "stdio", + command: "node", + args: ["task-master-mcp.js"], + enabled_tools: ["read_file", "write_file", "search"], + }, + }, + }, + null, + 2, + ), + "utf8", + ); + + const config = loadConfig({ + ...process.env, + MCP_CONFIG_PATH: mcpConfigPath, + }); + + const customRegistry = createDefaultMcpRegistry(); + customRegistry.register( + createMcpHandlerShell({ + id: "custom-task-mcp-handler", + description: "custom task handler", + matches: () => false, + }), + ); + + const manifest = { + schemaVersion: "1" as const, + topologies: ["sequential"], + personas: [ + { + id: "task", + displayName: "Task", + systemPromptTemplate: "Task executor", + toolClearance: { + allowlist: ["read_file", "write_file"], + banlist: ["rm"], + }, + }, + ], + relationships: [], + topologyConstraints: { + maxDepth: 2, + maxRetries: 0, + }, + pipeline: { + entryNodeId: "task-node", + nodes: [ + { + id: "task-node", + actorId: "task_actor", + personaId: "task", + }, + ], + edges: [], + }, + }; + + const engine = new SchemaDrivenExecutionEngine({ + manifest, + config, + mcpRegistry: customRegistry, + settings: { + workspaceRoot, + stateRoot, + projectContextPath, + maxChildren: 1, + maxDepth: 2, + maxRetries: 0, + }, + actorExecutors: { + task_actor: async (input) => { + assert.equal(input.mcp.registry, customRegistry); + + const codexConfig = input.mcp.resolveConfig({ + providerHint: "codex", + }); + const codexServer = (codexConfig.codexConfig?.mcp_servers as Record> | undefined)?.[ + "task-master-tools" + ]; + assert.ok(codexServer); + assert.deepEqual(codexServer.enabled_tools, ["read_file", "write_file"]); + assert.deepEqual(codexServer.disabled_tools, ["rm"]); + + const claudeConfig = input.mcp.resolveConfig({ + providerHint: "claude", + }); + assert.ok(claudeConfig.claudeMcpServers?.["task-master-tools"]); + + const canUseTool = input.mcp.createClaudeCanUseTool(); + const allow = await canUseTool( + "mcp__claude-task-master__read_file", + {}, + { + signal: new AbortController().signal, + toolUseID: "allow-1", + }, + ); + assert.deepEqual(allow, { + behavior: "allow", + toolUseID: "allow-1", + }); + + const denyBlocked = await canUseTool( + "mcp__claude-task-master__rm", + {}, + { + signal: new AbortController().signal, + toolUseID: "deny-1", + }, + ); + assert.equal(denyBlocked.behavior, "deny"); + + const denyMissingAllowlist = await canUseTool( + "mcp__claude-task-master__search", + {}, + { + signal: new AbortController().signal, + toolUseID: "deny-2", + }, + ); + assert.equal(denyMissingAllowlist.behavior, "deny"); + + return { + status: "success", + payload: { + ok: true, + }, + }; + }, + }, + }); + + const result = await engine.runSession({ + sessionId: "session-mcp-gate-1", + initialPayload: { + task: "verify mcp gate", + }, + }); + + assert.equal(result.status, "success"); +}); + test("runs parallel topology blocks concurrently and routes via domain-event edges", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); @@ -916,3 +1075,105 @@ test("can map security violations to validation_fail for retry-unrolled remediat ["secure-node:validation_fail:1", "secure-node:success:2"], ); }); + +test("runtime event side-channel logs session and node lifecycle without changing pipeline behavior", async () => { + const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-workspace-")); + const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-runtime-event-state-")); + const projectContextPath = resolve(stateRoot, "project-context.json"); + const runtimeEventLogRelativePath = ".ai_ops/events/test-runtime-events.ndjson"; + const runtimeEventLogPath = resolve(workspaceRoot, runtimeEventLogRelativePath); + + const manifest = { + schemaVersion: "1", + topologies: ["sequential"], + personas: [ + { + id: "runner", + displayName: "Runner", + systemPromptTemplate: "Runner", + toolClearance: { + allowlist: ["read_file"], + banlist: [], + }, + }, + ], + relationships: [], + topologyConstraints: { + maxDepth: 2, + maxRetries: 0, + }, + pipeline: { + entryNodeId: "node-1", + nodes: [ + { + id: "node-1", + actorId: "runner_actor", + personaId: "runner", + }, + ], + edges: [], + }, + } as const; + + const config = loadConfig({ + AGENT_RUNTIME_EVENT_LOG_PATH: runtimeEventLogRelativePath, + }); + + const engine = new SchemaDrivenExecutionEngine({ + manifest, + config, + settings: { + workspaceRoot, + stateRoot, + projectContextPath, + maxDepth: 2, + maxRetries: 0, + maxChildren: 1, + runtimeContext: {}, + }, + actorExecutors: { + runner_actor: async () => ({ + status: "success", + payload: { + complete: true, + usage: { + input_tokens: 120, + output_tokens: 80, + tool_calls: 2, + duration_ms: 450, + }, + }, + }), + }, + }); + + const result = await engine.runSession({ + sessionId: "session-runtime-events", + initialPayload: { + task: "Emit runtime events", + }, + }); + + assert.equal(result.status, "success"); + + const lines = (await readFile(runtimeEventLogPath, "utf8")) + .trim() + .split("\n") + .filter((line) => line.length > 0); + assert.ok(lines.length >= 4); + + const events = lines.map((line) => JSON.parse(line) as Record); + const eventTypes = new Set(events.map((event) => String(event.type))); + assert.ok(eventTypes.has("session.started")); + assert.ok(eventTypes.has("node.attempt.completed")); + assert.ok(eventTypes.has("domain.validation_passed")); + assert.ok(eventTypes.has("session.completed")); + + const nodeAttemptEvent = events.find((event) => event.type === "node.attempt.completed"); + assert.ok(nodeAttemptEvent); + const usage = nodeAttemptEvent.usage as Record; + assert.equal(usage.tokenInput, 120); + assert.equal(usage.tokenOutput, 80); + assert.equal(usage.toolCalls, 2); + assert.equal(usage.durationMs, 450); +});