diff --git a/README.md b/README.md index df32c47..4676063 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,10 @@ TypeScript runtime for deterministic multi-agent execution with: ## Architecture Summary - `SchemaDrivenExecutionEngine.runSession(...)` is the single execution entrypoint. -- `PipelineExecutor` owns runtime control flow and topology dispatch. +- `PipelineExecutor` owns runtime control flow and topology dispatch while delegating failure classification and persistence/event side-effects to dedicated policies. - `AgentManager` is an internal utility used by the pipeline when fan-out/retry-unrolled behavior is required. - Session state is persisted under `AGENT_STATE_ROOT`. -- Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with domains: +- Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with schema-versioned JSON (`schemaVersion`) and domains: - `globalFlags` - `artifactPointers` - `taskQueue` @@ -26,7 +26,9 @@ TypeScript runtime for deterministic multi-agent execution with: - `src/agents` - `orchestration.ts`: engine facade and runtime wiring - - `pipeline.ts`: DAG runner, retry matrix, abort propagation, domain-event routing + - `pipeline.ts`: DAG runner, retry matrix, aggregate session status, abort propagation, domain-event routing + - `failure-policy.ts`: hard/soft failure classification policy + - `lifecycle-observer.ts`: persistence/event lifecycle hooks for node attempts - `manifest.ts`: schema parsing/validation for personas/topologies/edges - `manager.ts`: recursive fan-out utility used by pipeline - `state-context.ts`: persisted node handoffs + session state @@ -36,6 +38,7 @@ TypeScript runtime for deterministic multi-agent execution with: - `provisioning.ts`: resource provisioning and child suballocation helpers - `src/mcp`: MCP config types/conversion/handlers - `src/examples`: provider entrypoints (`codex.ts`, `claude.ts`) +- `src/config.ts`: centralized env parsing/validation/defaulting - `tests`: manager, manifest, pipeline/orchestration, state, provisioning, MCP ## Setup @@ -93,6 +96,7 @@ Actors can emit events in `ActorExecutionResult.events`. Pipeline status also em - hard failures: timeout/network/403-like failures tracked sequentially; at 2 consecutive hard failures the pipeline aborts fast - `AbortSignal` is passed into every actor execution input - session closure aborts child recursive work +- run summaries expose aggregate `status`: success requires successful terminal executed DAG nodes and no critical-path failure ## Environment Variables diff --git a/docs/pipeline-policies.md b/docs/pipeline-policies.md new file mode 100644 index 0000000..f443cc1 --- /dev/null +++ b/docs/pipeline-policies.md @@ -0,0 +1,30 @@ +# Pipeline Policies and Lifecycle Hooks + +## Why this exists + +`PipelineExecutor` previously handled DAG traversal, failure heuristics, state persistence, and domain-event emission in one execution loop. This made behavior harder to isolate and test. + +## Current structure + +- `FailurePolicy` (`src/agents/failure-policy.ts`) + - Owns hard vs soft failure classification. + - Determines whether a sequence of hard failures should abort execution. +- `PersistenceLifecycleObserver` (`src/agents/lifecycle-observer.ts`) + - Handles state patching, project-context updates, and domain-event publishing for each node attempt. +- `PipelineExecutor` (`src/agents/pipeline.ts`) + - Coordinates DAG traversal and retry behavior. + - Computes aggregate run status from executed terminal nodes plus critical-path failures. + +## Aggregate status semantics + +Run status is `success` only when both are true: + +1. All executed terminal nodes (leaves in the executed subgraph) have final status `success`. +2. No executed node in the critical path has final status `failure`. + +Otherwise status is `failure`. + +## Persistence guarantees + +State and project-context writes are now atomic via temp-file + rename. +Project-context patch/write operations are serialized both in-process (promise queue) and cross-process (lock file). diff --git a/src/agents/failure-policy.ts b/src/agents/failure-policy.ts new file mode 100644 index 0000000..66dd739 --- /dev/null +++ b/src/agents/failure-policy.ts @@ -0,0 +1,66 @@ +import type { ActorExecutionResult, ActorFailureKind } from "./pipeline.js"; + +function toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + return String(error); +} + +function containsHardFailureSignal(value: string): boolean { + return /(timeout|timed out|network|econnreset|econnrefused|enotfound|403|forbidden)/i.test(value); +} + +function toFailureCodeFromError(error: unknown): string | undefined { + if (!(error instanceof Error)) { + return undefined; + } + + const maybeCode = (error as NodeJS.ErrnoException).code; + return typeof maybeCode === "string" ? maybeCode : undefined; +} + +export class FailurePolicy { + isHardFailure(result: ActorExecutionResult): boolean { + if (result.failureKind === "hard") { + return true; + } + + if (result.status !== "failure") { + return false; + } + + const payloadText = (() => { + const message = result.payload?.error; + return typeof message === "string" ? message : ""; + })(); + + const codeText = result.failureCode ?? ""; + return containsHardFailureSignal(`${codeText} ${payloadText}`); + } + + classifyFailureFromError(error: unknown): { + payloadErrorMessage: string; + failureCode?: string; + failureKind: ActorFailureKind; + } { + const message = toErrorMessage(error); + const failureCode = toFailureCodeFromError(error); + const failureKind = containsHardFailureSignal(`${failureCode ?? ""} ${message}`) + ? "hard" + : "soft"; + + return { + payloadErrorMessage: message, + ...(failureCode ? { failureCode } : {}), + failureKind, + }; + } + + shouldAbortAfterSequentialHardFailures( + sequentialHardFailureCount: number, + threshold: number, + ): boolean { + return sequentialHardFailureCount >= threshold; + } +} diff --git a/src/agents/file-persistence.ts b/src/agents/file-persistence.ts new file mode 100644 index 0000000..59b251d --- /dev/null +++ b/src/agents/file-persistence.ts @@ -0,0 +1,105 @@ +import { randomUUID } from "node:crypto"; +import { mkdir, open, rename, stat, unlink, writeFile } from "node:fs/promises"; +import { basename, dirname, resolve } from "node:path"; + +function sleep(ms: number): Promise { + return new Promise((resolveSleep) => { + setTimeout(resolveSleep, ms); + }); +} + +async function cleanupFile(path: string): Promise { + try { + await unlink(path); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + } +} + +export async function writeUtf8FileAtomic(path: string, content: string): Promise { + const directory = dirname(path); + await mkdir(directory, { recursive: true }); + + const tempFileName = `.${basename(path)}.${String(process.pid)}.${randomUUID()}.tmp`; + const tempPath = resolve(directory, tempFileName); + + try { + await writeFile(tempPath, content, "utf8"); + await rename(tempPath, path); + } catch (error) { + await cleanupFile(tempPath); + throw error; + } +} + +async function tryAcquireFileLock( + lockPath: string, +): Promise> | undefined> { + try { + const handle = await open(lockPath, "wx"); + await handle.writeFile(`${JSON.stringify({ pid: process.pid, acquiredAt: new Date().toISOString() })}\n`); + return handle; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "EEXIST") { + return undefined; + } + throw error; + } +} + +async function clearStaleLock(lockPath: string, staleAfterMs: number): Promise { + try { + const stats = await stat(lockPath); + const ageMs = Date.now() - stats.mtimeMs; + if (ageMs <= staleAfterMs) { + return; + } + + await cleanupFile(lockPath); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + } +} + +export async function withFileLock( + lockPath: string, + operation: () => Promise, + options?: { + maxWaitMs?: number; + retryDelayMs?: number; + staleAfterMs?: number; + }, +): Promise { + const maxWaitMs = options?.maxWaitMs ?? 5000; + const retryDelayMs = options?.retryDelayMs ?? 25; + const staleAfterMs = options?.staleAfterMs ?? 30_000; + + await mkdir(dirname(lockPath), { recursive: true }); + + const startedAt = Date.now(); + + // Busy-wait with bounded retries to coordinate concurrent writers across processes. + while (true) { + const handle = await tryAcquireFileLock(lockPath); + if (handle) { + try { + return await operation(); + } finally { + await handle.close(); + await cleanupFile(lockPath); + } + } + + await clearStaleLock(lockPath, staleAfterMs); + + if (Date.now() - startedAt >= maxWaitMs) { + throw new Error(`Timed out waiting for file lock: ${lockPath}`); + } + + await sleep(retryDelayMs); + } +} diff --git a/src/agents/lifecycle-observer.ts b/src/agents/lifecycle-observer.ts new file mode 100644 index 0000000..079d62e --- /dev/null +++ b/src/agents/lifecycle-observer.ts @@ -0,0 +1,126 @@ +import { + DomainEventBus, + type DomainEvent, + type DomainEventType, +} from "./domain-events.js"; +import type { PipelineNode } from "./manifest.js"; +import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js"; +import { PersonaRegistry } from "./persona-registry.js"; +import { + FileSystemStateContextManager, + type SessionHistoryEntry, +} from "./state-context.js"; +import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js"; + +export type PipelineNodeAttemptObservedEvent = { + sessionId: string; + node: PipelineNode; + attempt: number; + result: ActorExecutionResult; + domainEvents: DomainEvent[]; +}; + +function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined { + if (status === "success") { + return "onTaskComplete"; + } + if (status === "validation_fail") { + return "onValidationFail"; + } + return undefined; +} + +export interface PipelineLifecycleObserver { + onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise; +} + +export class PersistenceLifecycleObserver implements PipelineLifecycleObserver { + constructor( + private readonly input: { + personaRegistry: PersonaRegistry; + stateManager: FileSystemStateContextManager; + projectContextStore: FileSystemProjectContextStore; + domainEventBus?: DomainEventBus; + }, + ) {} + + async onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise { + const behaviorEvent = toBehaviorEvent(event.result.status); + const behaviorPatch = behaviorEvent + ? await this.input.personaRegistry.emitBehaviorEvent({ + personaId: event.node.personaId, + event: behaviorEvent, + sessionId: event.sessionId, + nodeId: event.node.id, + payload: event.result.payload ?? {}, + }) + : {}; + + const legacyHistoryEvent: SessionHistoryEntry = { + nodeId: event.node.id, + event: event.result.status, + timestamp: new Date().toISOString(), + ...(event.result.payload ? { data: event.result.payload } : {}), + }; + + const domainHistoryEvents: SessionHistoryEntry[] = event.domainEvents.map((domainEvent) => ({ + nodeId: event.node.id, + event: domainEvent.type, + timestamp: domainEvent.timestamp, + data: { + source: domainEvent.source, + attempt: domainEvent.attempt, + ...(domainEvent.payload.summary ? { summary: domainEvent.payload.summary } : {}), + ...(domainEvent.payload.errorCode ? { errorCode: domainEvent.payload.errorCode } : {}), + ...(domainEvent.payload.artifactPointer + ? { artifactPointer: domainEvent.payload.artifactPointer } + : {}), + ...(domainEvent.payload.details ? { details: domainEvent.payload.details } : {}), + }, + })); + + await this.input.stateManager.patchState(event.sessionId, { + ...(event.result.stateFlags ? { flags: event.result.stateFlags } : {}), + metadata: { + ...(event.result.stateMetadata ?? {}), + ...behaviorPatch, + }, + historyEvent: legacyHistoryEvent, + historyEvents: domainHistoryEvents, + }); + + const domainEventBus = this.input.domainEventBus; + if (domainEventBus) { + for (const domainEvent of event.domainEvents) { + await domainEventBus.publish(domainEvent); + } + } + + const patch: ProjectContextPatch = { + ...(event.result.projectContextPatch ?? {}), + artifactPointers: { + [`sessions/${event.sessionId}/last_completed_node`]: event.node.id, + [`sessions/${event.sessionId}/last_attempt`]: String(event.attempt), + ...(event.result.projectContextPatch?.artifactPointers ?? {}), + }, + }; + + await this.input.projectContextStore.patchState(patch); + } +} + +export class DomainEventCollector { + private readonly events: DomainEvent[] = []; + + record(events: DomainEvent[]): void { + this.events.push(...events); + } + + toEventTypes(events: DomainEvent[]): DomainEventType[] { + return events.map((event) => event.type); + } + + getAll(): DomainEvent[] { + return [...this.events]; + } +} diff --git a/src/agents/orchestration.ts b/src/agents/orchestration.ts index 2657f80..1fb549f 100644 --- a/src/agents/orchestration.ts +++ b/src/agents/orchestration.ts @@ -1,4 +1,6 @@ import { resolve } from "node:path"; +import { getConfig, loadConfig, type AppConfig } from "../config.js"; +import { createDefaultMcpRegistry, McpRegistry } from "../mcp.js"; import { parseAgentManifest, type AgentManifest } from "./manifest.js"; import { AgentManager } from "./manager.js"; import { @@ -8,7 +10,6 @@ import { } from "./persona-registry.js"; import { PipelineExecutor, type ActorExecutor, type PipelineRunSummary } from "./pipeline.js"; import { FileSystemProjectContextStore } from "./project-context.js"; -import { loadAgentManagerLimitsFromEnv } from "./runtime.js"; import { FileSystemStateContextManager, type StoredSessionState } from "./state-context.js"; import type { JsonObject } from "./types.js"; @@ -26,59 +27,16 @@ export type BehaviorHandlerRegistry = Partial< Record>> >; -function readOptionalIntegerEnv( - key: - | "AGENT_TOPOLOGY_MAX_DEPTH" - | "AGENT_TOPOLOGY_MAX_RETRIES" - | "AGENT_RELATIONSHIP_MAX_CHILDREN", - fallback: number, - min: number, -): number { - const raw = process.env[key]?.trim(); - if (!raw) { - return fallback; - } - - const parsed = Number(raw); - if (!Number.isInteger(parsed) || parsed < min) { - throw new Error(`Environment variable ${key} must be an integer >= ${String(min)}.`); - } - - return parsed; -} - -function readOptionalStringEnv(key: "AGENT_STATE_ROOT", fallback: string): string { - const raw = process.env[key]?.trim(); - if (!raw) { - return fallback; - } - return raw; -} - -function readOptionalProjectContextPathEnv( - key: "AGENT_PROJECT_CONTEXT_PATH", - fallback: string, -): string { - const raw = process.env[key]?.trim(); - if (!raw) { - return fallback; - } - return raw; -} - -export function loadOrchestrationSettingsFromEnv(): Omit< - OrchestrationSettings, - "workspaceRoot" | "runtimeContext" -> { +export function loadOrchestrationSettingsFromEnv( + env: NodeJS.ProcessEnv = process.env, +): Omit { + const config = loadConfig(env); return { - stateRoot: readOptionalStringEnv("AGENT_STATE_ROOT", ".ai_ops/state"), - projectContextPath: readOptionalProjectContextPathEnv( - "AGENT_PROJECT_CONTEXT_PATH", - ".ai_ops/project-context.json", - ), - maxDepth: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_DEPTH", 4, 1), - maxRetries: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_RETRIES", 2, 0), - maxChildren: readOptionalIntegerEnv("AGENT_RELATIONSHIP_MAX_CHILDREN", 4, 1), + stateRoot: config.orchestration.stateRoot, + projectContextPath: config.orchestration.projectContextPath, + maxDepth: config.orchestration.maxDepth, + maxRetries: config.orchestration.maxRetries, + maxChildren: config.orchestration.maxChildren, }; } @@ -115,6 +73,7 @@ export class SchemaDrivenExecutionEngine { private readonly settings: OrchestrationSettings; private readonly childrenByParent: Map; private readonly manager: AgentManager; + private readonly mcpRegistry: McpRegistry; constructor(input: { manifest: AgentManifest | unknown; @@ -125,17 +84,21 @@ export class SchemaDrivenExecutionEngine { runtimeContext?: Record; }; manager?: AgentManager; + mcpRegistry?: McpRegistry; + config?: Readonly; }) { this.manifest = parseAgentManifest(input.manifest); - const defaults = loadOrchestrationSettingsFromEnv(); + const config = input.config ?? getConfig(); this.settings = { workspaceRoot: resolve(input.settings?.workspaceRoot ?? process.cwd()), - stateRoot: resolve(input.settings?.stateRoot ?? defaults.stateRoot), - projectContextPath: resolve(input.settings?.projectContextPath ?? defaults.projectContextPath), - maxDepth: input.settings?.maxDepth ?? defaults.maxDepth, - maxRetries: input.settings?.maxRetries ?? defaults.maxRetries, - maxChildren: input.settings?.maxChildren ?? defaults.maxChildren, + stateRoot: resolve(input.settings?.stateRoot ?? config.orchestration.stateRoot), + projectContextPath: resolve( + input.settings?.projectContextPath ?? config.orchestration.projectContextPath, + ), + maxDepth: input.settings?.maxDepth ?? config.orchestration.maxDepth, + maxRetries: input.settings?.maxRetries ?? config.orchestration.maxRetries, + maxChildren: input.settings?.maxChildren ?? config.orchestration.maxChildren, runtimeContext: { ...(input.settings?.runtimeContext ?? {}), }, @@ -149,7 +112,14 @@ export class SchemaDrivenExecutionEngine { }); this.actorExecutors = toExecutorMap(input.actorExecutors); - this.manager = input.manager ?? new AgentManager(loadAgentManagerLimitsFromEnv()); + this.manager = + input.manager ?? + new AgentManager({ + maxConcurrentAgents: config.agentManager.maxConcurrentAgents, + maxSessionAgents: config.agentManager.maxSessionAgents, + maxRecursiveDepth: config.agentManager.maxRecursiveDepth, + }); + this.mcpRegistry = input.mcpRegistry ?? createDefaultMcpRegistry(); for (const persona of this.manifest.personas) { this.personaRegistry.register({ @@ -227,6 +197,7 @@ export class SchemaDrivenExecutionEngine { manager: this.manager, managerSessionId, projectContextStore: this.projectContextStore, + mcpRegistry: this.mcpRegistry, }, ); try { diff --git a/src/agents/pipeline.ts b/src/agents/pipeline.ts index f78b792..2144dc8 100644 --- a/src/agents/pipeline.ts +++ b/src/agents/pipeline.ts @@ -9,14 +9,19 @@ import { type DomainEventPayload, type DomainEventType, } from "./domain-events.js"; +import { FailurePolicy } from "./failure-policy.js"; +import { + PersistenceLifecycleObserver, + type PipelineLifecycleObserver, +} from "./lifecycle-observer.js"; import type { AgentManifest, PipelineEdge, PipelineNode, RouteCondition } from "./manifest.js"; import type { AgentManager, RecursiveChildIntent } from "./manager.js"; +import type { McpRegistry } from "../mcp/handlers.js"; import { PersonaRegistry } from "./persona-registry.js"; import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js"; import { FileSystemStateContextManager, type NodeExecutionContext, - type SessionHistoryEntry, type StoredSessionState, } from "./state-context.js"; import type { JsonObject } from "./types.js"; @@ -59,11 +64,14 @@ export type PipelineExecutionRecord = { export type PipelineRunSummary = { sessionId: string; + status: PipelineAggregateStatus; records: PipelineExecutionRecord[]; events: DomainEvent[]; finalState: StoredSessionState; }; +export type PipelineAggregateStatus = "success" | "failure"; + export type PipelineExecutorOptions = { workspaceRoot: string; runtimeContext: Record; @@ -72,6 +80,10 @@ export type PipelineExecutorOptions = { manager: AgentManager; managerSessionId: string; projectContextStore: FileSystemProjectContextStore; + mcpRegistry: McpRegistry; + failurePolicy?: FailurePolicy; + lifecycleObserver?: PipelineLifecycleObserver; + hardFailureThreshold?: number; }; type QueueItem = { @@ -110,16 +122,6 @@ type NodeExecutionOutcome = { hardFailureAttempts: boolean[]; }; -function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined { - if (status === "success") { - return "onTaskComplete"; - } - if (status === "validation_fail") { - return "onValidationFail"; - } - return undefined; -} - function shouldEdgeRun( edge: PipelineEdge, status: ActorResultStatus, @@ -228,51 +230,6 @@ function toAbortError(signal: AbortSignal): Error { return error; } -function toErrorMessage(error: unknown): string { - if (error instanceof Error) { - return error.message; - } - return String(error); -} - -function toErrorPayload(error: unknown): JsonObject { - const errorMessage = toErrorMessage(error); - return { - error: errorMessage, - }; -} - -function toFailureCodeFromError(error: unknown): string | undefined { - if (!(error instanceof Error)) { - return undefined; - } - - const maybeCode = (error as NodeJS.ErrnoException).code; - return typeof maybeCode === "string" ? maybeCode : undefined; -} - -function containsHardFailureSignal(value: string): boolean { - return /(timeout|timed out|network|econnreset|econnrefused|enotfound|403|forbidden)/i.test(value); -} - -function inferHardFailure(result: ActorExecutionResult): boolean { - if (result.failureKind === "hard") { - return true; - } - - if (result.status !== "failure") { - return false; - } - - const payloadText = (() => { - const message = result.payload?.error; - return typeof message === "string" ? message : ""; - })(); - - const codeText = result.failureCode ?? ""; - return containsHardFailureSignal(`${codeText} ${payloadText}`); -} - function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload { if (status === "success") { return { @@ -323,6 +280,9 @@ export class PipelineExecutor { private readonly nodeById = new Map(); private readonly edgesBySource = new Map(); private readonly domainEventBus = new DomainEventBus(); + private readonly failurePolicy: FailurePolicy; + private readonly lifecycleObserver: PipelineLifecycleObserver; + private readonly hardFailureThreshold: number; private managerRunCounter = 0; constructor( @@ -332,6 +292,17 @@ export class PipelineExecutor { private readonly actorExecutors: ReadonlyMap, private readonly options: PipelineExecutorOptions, ) { + this.failurePolicy = options.failurePolicy ?? new FailurePolicy(); + this.hardFailureThreshold = options.hardFailureThreshold ?? 2; + this.lifecycleObserver = + options.lifecycleObserver ?? + new PersistenceLifecycleObserver({ + personaRegistry: this.personaRegistry, + stateManager: this.stateManager, + projectContextStore: this.options.projectContextStore, + domainEventBus: this.domainEventBus, + }); + for (const node of manifest.pipeline.nodes) { this.nodeById.set(node.id, node); } @@ -418,9 +389,14 @@ export class PipelineExecutor { sequentialHardFailures = 0; } - if (sequentialHardFailures >= 2) { + if ( + this.failurePolicy.shouldAbortAfterSequentialHardFailures( + sequentialHardFailures, + this.hardFailureThreshold, + ) + ) { throw new Error( - "Hard failure threshold reached (>=2 sequential API/network/403 failures). Pipeline aborted.", + `Hard failure threshold reached (>=${String(this.hardFailureThreshold)} sequential API/network/403 failures). Pipeline aborted.`, ); } } @@ -461,7 +437,8 @@ export class PipelineExecutor { } const finalState = await this.stateManager.readState(input.sessionId); - if (records.length > 0 && records[records.length - 1]?.status === "success") { + const status = this.computeAggregateStatus(records); + if (status === "success") { await this.options.projectContextStore.patchState({ artifactPointers: { [`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId), @@ -471,12 +448,39 @@ export class PipelineExecutor { return { sessionId: input.sessionId, + status, records, events, finalState, }; } + private computeAggregateStatus(records: PipelineExecutionRecord[]): PipelineAggregateStatus { + if (records.length === 0) { + return "failure"; + } + + const finalStatusByNode = new Map(); + for (const record of records) { + finalStatusByNode.set(record.nodeId, record.status); + } + + const executedNodeIds = new Set(finalStatusByNode.keys()); + const terminalNodeIds = [...executedNodeIds].filter((nodeId) => { + const outgoingEdges = this.edgesBySource.get(nodeId) ?? []; + return !outgoingEdges.some((edge) => executedNodeIds.has(edge.to)); + }); + + const allTerminalNodesSucceeded = + terminalNodeIds.length > 0 && + terminalNodeIds.every((nodeId) => finalStatusByNode.get(nodeId) === "success"); + const hasCriticalPathFailure = [...finalStatusByNode.values()].some( + (status) => status === "failure", + ); + + return allTerminalNodesSucceeded && !hasCriticalPathFailure ? "success" : "failure"; + } + private buildExecutionGroups(frontier: QueueItem[]): ExecutionGroup[] { const groupsByKey = new Map(); @@ -611,7 +615,7 @@ export class PipelineExecutor { customEvents: result.events, }); - await this.persistNodeAttempt({ + await this.lifecycleObserver.onNodeAttempt({ sessionId, node, attempt, @@ -629,7 +633,7 @@ export class PipelineExecutor { }); nodeEvents.push(...domainEvents); - const hardFailure = inferHardFailure(result); + const hardFailure = this.failurePolicy.isHardFailure(result); hardFailureAttempts.push(hardFailure); const payloadForNext = result.payload ?? context.handoff.payload; @@ -721,16 +725,15 @@ export class PipelineExecutor { throw toAbortError(input.signal); } - const failureCode = toFailureCodeFromError(error); - const failureKind = containsHardFailureSignal(`${failureCode ?? ""} ${toErrorMessage(error)}`) - ? "hard" - : "soft"; + const classified = this.failurePolicy.classifyFailureFromError(error); return { status: "failure", - payload: toErrorPayload(error), - failureCode, - failureKind, + payload: { + error: classified.payloadErrorMessage, + }, + failureCode: classified.failureCode, + failureKind: classified.failureKind, }; } } @@ -763,68 +766,4 @@ export class PipelineExecutor { }), ); } - - private async persistNodeAttempt(input: { - sessionId: string; - node: PipelineNode; - attempt: number; - result: ActorExecutionResult; - domainEvents: DomainEvent[]; - }): Promise { - const behaviorEvent = toBehaviorEvent(input.result.status); - const behaviorPatch = behaviorEvent - ? await this.personaRegistry.emitBehaviorEvent({ - personaId: input.node.personaId, - event: behaviorEvent, - sessionId: input.sessionId, - nodeId: input.node.id, - payload: input.result.payload ?? {}, - }) - : {}; - - const legacyHistoryEvent: SessionHistoryEntry = { - nodeId: input.node.id, - event: input.result.status, - timestamp: new Date().toISOString(), - ...(input.result.payload ? { data: input.result.payload } : {}), - }; - - const domainHistoryEvents: SessionHistoryEntry[] = input.domainEvents.map((event) => ({ - nodeId: input.node.id, - event: event.type, - timestamp: event.timestamp, - data: { - source: event.source, - attempt: event.attempt, - ...(event.payload.summary ? { summary: event.payload.summary } : {}), - ...(event.payload.errorCode ? { errorCode: event.payload.errorCode } : {}), - ...(event.payload.artifactPointer ? { artifactPointer: event.payload.artifactPointer } : {}), - ...(event.payload.details ? { details: event.payload.details } : {}), - }, - })); - - await this.stateManager.patchState(input.sessionId, { - ...(input.result.stateFlags ? { flags: input.result.stateFlags } : {}), - metadata: { - ...(input.result.stateMetadata ?? {}), - ...behaviorPatch, - }, - historyEvent: legacyHistoryEvent, - historyEvents: domainHistoryEvents, - }); - - for (const event of input.domainEvents) { - await this.domainEventBus.publish(event); - } - - const patch: ProjectContextPatch = { - ...(input.result.projectContextPatch ?? {}), - artifactPointers: { - [`sessions/${input.sessionId}/last_completed_node`]: input.node.id, - [`sessions/${input.sessionId}/last_attempt`]: String(input.attempt), - ...(input.result.projectContextPatch?.artifactPointers ?? {}), - }, - }; - await this.options.projectContextStore.patchState(patch); - } } diff --git a/src/agents/project-context.ts b/src/agents/project-context.ts index e96e07c..45d575a 100644 --- a/src/agents/project-context.ts +++ b/src/agents/project-context.ts @@ -1,7 +1,10 @@ -import { mkdir, readFile, writeFile } from "node:fs/promises"; -import { dirname, resolve } from "node:path"; +import { readFile } from "node:fs/promises"; +import { resolve } from "node:path"; +import { withFileLock, writeUtf8FileAtomic } from "./file-persistence.js"; import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./types.js"; +export const PROJECT_CONTEXT_SCHEMA_VERSION = 1; + export type ProjectTaskStatus = "pending" | "in_progress" | "blocked" | "done"; export type ProjectTask = { @@ -13,6 +16,7 @@ export type ProjectTask = { }; export type ProjectContextState = { + schemaVersion: number; globalFlags: Record; artifactPointers: Record; taskQueue: ProjectTask[]; @@ -27,6 +31,7 @@ export type ProjectContextPatch = { }; const DEFAULT_PROJECT_CONTEXT: ProjectContextState = { + schemaVersion: PROJECT_CONTEXT_SCHEMA_VERSION, globalFlags: {}, artifactPointers: {}, taskQueue: [], @@ -103,20 +108,41 @@ function toStringRecord(value: unknown, label: string): Record { return out; } +function toSchemaVersion(value: unknown): number { + if (value === undefined) { + return PROJECT_CONTEXT_SCHEMA_VERSION; + } + + if (typeof value !== "number" || !Number.isInteger(value) || value < 1) { + throw new Error("Project context schemaVersion must be an integer >= 1."); + } + + return value; +} + function toProjectContextState(value: unknown): ProjectContextState { if (!isRecord(value)) { throw new Error("Project context store is malformed."); } const tasksRaw = value.taskQueue; - if (!Array.isArray(tasksRaw)) { + if (tasksRaw !== undefined && !Array.isArray(tasksRaw)) { throw new Error("Project context taskQueue is malformed."); } return { - globalFlags: toBooleanRecord(value.globalFlags, "Project context globalFlags"), - artifactPointers: toStringRecord(value.artifactPointers, "Project context artifactPointers"), - taskQueue: tasksRaw.map((task, index) => toProjectTask(task, `Project context taskQueue[${String(index)}]`)), + schemaVersion: toSchemaVersion(value.schemaVersion), + globalFlags: + value.globalFlags === undefined + ? { ...DEFAULT_PROJECT_CONTEXT.globalFlags } + : toBooleanRecord(value.globalFlags, "Project context globalFlags"), + artifactPointers: + value.artifactPointers === undefined + ? { ...DEFAULT_PROJECT_CONTEXT.artifactPointers } + : toStringRecord(value.artifactPointers, "Project context artifactPointers"), + taskQueue: (tasksRaw ?? []).map((task, index) => + toProjectTask(task, `Project context taskQueue[${String(index)}]`), + ), }; } @@ -142,10 +168,12 @@ function mergeUpsertTasks(current: ProjectTask[], upserts: ProjectTask[]): Proje export class FileSystemProjectContextStore { private readonly filePath: string; + private readonly lockPath: string; private queue: Promise = Promise.resolve(); constructor(input: { filePath: string }) { this.filePath = resolve(input.filePath); + this.lockPath = `${this.filePath}.lock`; } getFilePath(): string { @@ -153,6 +181,57 @@ export class FileSystemProjectContextStore { } async readState(): Promise { + return this.readStateFromDisk(); + } + + async writeState(state: ProjectContextState): Promise { + await this.runSerialized(async () => { + await withFileLock(this.lockPath, async () => { + const normalizedState = toProjectContextState(state); + await writeUtf8FileAtomic(this.filePath, `${JSON.stringify(normalizedState, null, 2)}\n`); + }); + }); + } + + async patchState(patch: ProjectContextPatch): Promise { + return this.runSerialized(async () => + withFileLock(this.lockPath, async () => { + const current = await this.readStateFromDisk(); + + if (patch.globalFlags) { + Object.assign(current.globalFlags, patch.globalFlags); + } + if (patch.artifactPointers) { + Object.assign(current.artifactPointers, patch.artifactPointers); + } + if (patch.taskQueue) { + current.taskQueue = patch.taskQueue.map((task, index) => + toProjectTask(task, `Project context patch taskQueue[${String(index)}]`), + ); + } + if (patch.enqueueTasks && patch.enqueueTasks.length > 0) { + current.taskQueue.push( + ...patch.enqueueTasks.map((task, index) => + toProjectTask(task, `Project context patch enqueueTasks[${String(index)}]`), + ), + ); + } + if (patch.upsertTasks && patch.upsertTasks.length > 0) { + const upsertTasks = patch.upsertTasks.map((task, index) => + toProjectTask(task, `Project context patch upsertTasks[${String(index)}]`), + ); + current.taskQueue = mergeUpsertTasks(current.taskQueue, upsertTasks); + } + + current.schemaVersion = Math.max(current.schemaVersion, PROJECT_CONTEXT_SCHEMA_VERSION); + + await writeUtf8FileAtomic(this.filePath, `${JSON.stringify(current, null, 2)}\n`); + return current; + }), + ); + } + + private async readStateFromDisk(): Promise { try { const content = await readFile(this.filePath, "utf8"); const parsed = JSON.parse(content) as unknown; @@ -165,48 +244,6 @@ export class FileSystemProjectContextStore { } } - async writeState(state: ProjectContextState): Promise { - await this.runSerialized(async () => { - await mkdir(dirname(this.filePath), { recursive: true }); - await writeFile(this.filePath, `${JSON.stringify(state, null, 2)}\n`, "utf8"); - }); - } - - async patchState(patch: ProjectContextPatch): Promise { - return this.runSerialized(async () => { - const current = await this.readState(); - - if (patch.globalFlags) { - Object.assign(current.globalFlags, patch.globalFlags); - } - if (patch.artifactPointers) { - Object.assign(current.artifactPointers, patch.artifactPointers); - } - if (patch.taskQueue) { - current.taskQueue = patch.taskQueue.map((task, index) => - toProjectTask(task, `Project context patch taskQueue[${String(index)}]`), - ); - } - if (patch.enqueueTasks && patch.enqueueTasks.length > 0) { - current.taskQueue.push( - ...patch.enqueueTasks.map((task, index) => - toProjectTask(task, `Project context patch enqueueTasks[${String(index)}]`), - ), - ); - } - if (patch.upsertTasks && patch.upsertTasks.length > 0) { - const upsertTasks = patch.upsertTasks.map((task, index) => - toProjectTask(task, `Project context patch upsertTasks[${String(index)}]`), - ); - current.taskQueue = mergeUpsertTasks(current.taskQueue, upsertTasks); - } - - await mkdir(dirname(this.filePath), { recursive: true }); - await writeFile(this.filePath, `${JSON.stringify(current, null, 2)}\n`, "utf8"); - return current; - }); - } - private runSerialized(operation: () => Promise): Promise { const run = this.queue.then(operation, operation); this.queue = run.then( diff --git a/src/agents/runtime.ts b/src/agents/runtime.ts index 4f4712d..31a0318 100644 --- a/src/agents/runtime.ts +++ b/src/agents/runtime.ts @@ -1,3 +1,4 @@ +import { getConfig, loadConfig, type AppConfig } from "../config.js"; import { AgentManager, type AgentManagerLimits } from "./manager.js"; import { createDefaultResourceProvisioningOrchestrator, @@ -5,143 +6,58 @@ import { type ResourceProvisioningOrchestrator, } from "./provisioning.js"; -const DEFAULT_LIMITS: AgentManagerLimits = { - maxConcurrentAgents: 4, - maxSessionAgents: 2, - maxRecursiveDepth: 3, -}; - -const DEFAULT_PROVISIONING_CONFIG: BuiltInProvisioningConfigInput = { - gitWorktree: { - rootDirectory: ".ai_ops/worktrees", - baseRef: "HEAD", - }, - portRange: { - basePort: 36000, - blockSize: 32, - blockCount: 512, - primaryPortOffset: 0, - lockDirectory: ".ai_ops/locks/ports", - }, -}; - -function readPositiveIntegerEnv( - key: "AGENT_MAX_CONCURRENT" | "AGENT_MAX_SESSION" | "AGENT_MAX_RECURSIVE_DEPTH", - fallback: number, -): number { - const rawValue = process.env[key]?.trim(); - if (!rawValue) { - return fallback; - } - - const parsed = Number(rawValue); - if (!Number.isInteger(parsed) || parsed < 1) { - throw new Error(`Environment variable ${key} must be a positive integer.`); - } - - return parsed; -} - -function readOptionalStringEnv(key: string, fallback: string): string { - const rawValue = process.env[key]?.trim(); - if (!rawValue) { - return fallback; - } - return rawValue; -} - -function readIntegerEnv( - key: string, - fallback: number, - bounds: { - min: number; - }, -): number { - const rawValue = process.env[key]?.trim(); - if (!rawValue) { - return fallback; - } - - const parsed = Number(rawValue); - if (!Number.isInteger(parsed) || parsed < bounds.min) { - throw new Error(`Environment variable ${key} must be an integer >= ${String(bounds.min)}.`); - } - return parsed; -} - -export function loadAgentManagerLimitsFromEnv(): AgentManagerLimits { +function toProvisioningConfig(input: Readonly): BuiltInProvisioningConfigInput { return { - maxConcurrentAgents: readPositiveIntegerEnv( - "AGENT_MAX_CONCURRENT", - DEFAULT_LIMITS.maxConcurrentAgents, - ), - maxSessionAgents: readPositiveIntegerEnv( - "AGENT_MAX_SESSION", - DEFAULT_LIMITS.maxSessionAgents, - ), - maxRecursiveDepth: readPositiveIntegerEnv( - "AGENT_MAX_RECURSIVE_DEPTH", - DEFAULT_LIMITS.maxRecursiveDepth, - ), + gitWorktree: { + rootDirectory: input.provisioning.gitWorktree.rootDirectory, + baseRef: input.provisioning.gitWorktree.baseRef, + }, + portRange: { + basePort: input.provisioning.portRange.basePort, + blockSize: input.provisioning.portRange.blockSize, + blockCount: input.provisioning.portRange.blockCount, + primaryPortOffset: input.provisioning.portRange.primaryPortOffset, + lockDirectory: input.provisioning.portRange.lockDirectory, + }, + }; +} + +export function loadAgentManagerLimitsFromEnv(env: NodeJS.ProcessEnv = process.env): AgentManagerLimits { + const config = loadConfig(env); + return { + maxConcurrentAgents: config.agentManager.maxConcurrentAgents, + maxSessionAgents: config.agentManager.maxSessionAgents, + maxRecursiveDepth: config.agentManager.maxRecursiveDepth, }; } let managerSingleton: AgentManager | undefined; let provisioningSingleton: ResourceProvisioningOrchestrator | undefined; -export function getAgentManager(): AgentManager { +export function getAgentManager(config: Readonly = getConfig()): AgentManager { if (!managerSingleton) { - managerSingleton = new AgentManager(loadAgentManagerLimitsFromEnv()); + managerSingleton = new AgentManager({ + maxConcurrentAgents: config.agentManager.maxConcurrentAgents, + maxSessionAgents: config.agentManager.maxSessionAgents, + maxRecursiveDepth: config.agentManager.maxRecursiveDepth, + }); } return managerSingleton; } -export function loadProvisioningConfigFromEnv(): BuiltInProvisioningConfigInput { - return { - gitWorktree: { - rootDirectory: readOptionalStringEnv( - "AGENT_WORKTREE_ROOT", - DEFAULT_PROVISIONING_CONFIG.gitWorktree?.rootDirectory ?? ".ai_ops/worktrees", - ), - baseRef: readOptionalStringEnv( - "AGENT_WORKTREE_BASE_REF", - DEFAULT_PROVISIONING_CONFIG.gitWorktree?.baseRef ?? "HEAD", - ), - }, - portRange: { - basePort: readIntegerEnv( - "AGENT_PORT_BASE", - DEFAULT_PROVISIONING_CONFIG.portRange?.basePort ?? 36000, - { min: 1 }, - ), - blockSize: readIntegerEnv( - "AGENT_PORT_BLOCK_SIZE", - DEFAULT_PROVISIONING_CONFIG.portRange?.blockSize ?? 32, - { min: 1 }, - ), - blockCount: readIntegerEnv( - "AGENT_PORT_BLOCK_COUNT", - DEFAULT_PROVISIONING_CONFIG.portRange?.blockCount ?? 512, - { min: 1 }, - ), - primaryPortOffset: readIntegerEnv( - "AGENT_PORT_PRIMARY_OFFSET", - DEFAULT_PROVISIONING_CONFIG.portRange?.primaryPortOffset ?? 0, - { min: 0 }, - ), - lockDirectory: readOptionalStringEnv( - "AGENT_PORT_LOCK_DIR", - DEFAULT_PROVISIONING_CONFIG.portRange?.lockDirectory ?? ".ai_ops/locks/ports", - ), - }, - }; +export function loadProvisioningConfigFromEnv( + env: NodeJS.ProcessEnv = process.env, +): BuiltInProvisioningConfigInput { + return toProvisioningConfig(loadConfig(env)); } -export function getResourceProvisioningOrchestrator(): ResourceProvisioningOrchestrator { +export function getResourceProvisioningOrchestrator( + config: Readonly = getConfig(), +): ResourceProvisioningOrchestrator { if (!provisioningSingleton) { provisioningSingleton = createDefaultResourceProvisioningOrchestrator( - loadProvisioningConfigFromEnv(), + toProvisioningConfig(config), ); } return provisioningSingleton; diff --git a/src/agents/state-context.ts b/src/agents/state-context.ts index 6a8ae97..df3a80d 100644 --- a/src/agents/state-context.ts +++ b/src/agents/state-context.ts @@ -1,5 +1,6 @@ -import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { mkdir, readFile } from "node:fs/promises"; import { dirname, resolve } from "node:path"; +import { writeUtf8FileAtomic } from "./file-persistence.js"; import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./types.js"; export type SessionHistoryEntry = { @@ -200,7 +201,7 @@ export class FileSystemStateContextManager { async writeState(sessionId: string, state: StoredSessionState): Promise { const path = toStatePath(this.rootDirectory, sessionId); await mkdir(dirname(path), { recursive: true }); - await writeFile(path, `${JSON.stringify(state, null, 2)}\n`, "utf8"); + await writeUtf8FileAtomic(path, `${JSON.stringify(state, null, 2)}\n`); } async patchState( @@ -248,7 +249,7 @@ export class FileSystemStateContextManager { const path = toHandoffPath(this.rootDirectory, sessionId, handoff.nodeId); await mkdir(dirname(path), { recursive: true }); - await writeFile(path, `${JSON.stringify(nodeHandoff, null, 2)}\n`, "utf8"); + await writeUtf8FileAtomic(path, `${JSON.stringify(nodeHandoff, null, 2)}\n`); return nodeHandoff; } diff --git a/src/config.ts b/src/config.ts new file mode 100644 index 0000000..8f6f14c --- /dev/null +++ b/src/config.ts @@ -0,0 +1,277 @@ +import type { AgentManagerLimits } from "./agents/manager.js"; +import type { BuiltInProvisioningConfig } from "./agents/provisioning.js"; + +export type ProviderRuntimeConfig = { + codexApiKey?: string; + openAiApiKey?: string; + openAiBaseUrl?: string; + codexSkipGitCheck: boolean; + anthropicApiKey?: string; + claudeModel?: string; + claudeCodePath?: string; +}; + +export type McpRuntimeConfig = { + configPath: string; +}; + +export type OrchestrationRuntimeConfig = { + stateRoot: string; + projectContextPath: string; + maxDepth: number; + maxRetries: number; + maxChildren: number; +}; + +export type DiscoveryRuntimeConfig = { + fileRelativePath: string; +}; + +export type AppConfig = { + provider: ProviderRuntimeConfig; + mcp: McpRuntimeConfig; + agentManager: AgentManagerLimits; + orchestration: OrchestrationRuntimeConfig; + provisioning: BuiltInProvisioningConfig; + discovery: DiscoveryRuntimeConfig; +}; + +const DEFAULT_AGENT_MANAGER: AgentManagerLimits = { + maxConcurrentAgents: 4, + maxSessionAgents: 2, + maxRecursiveDepth: 3, +}; + +const DEFAULT_ORCHESTRATION: OrchestrationRuntimeConfig = { + stateRoot: ".ai_ops/state", + projectContextPath: ".ai_ops/project-context.json", + maxDepth: 4, + maxRetries: 2, + maxChildren: 4, +}; + +const DEFAULT_PROVISIONING: BuiltInProvisioningConfig = { + gitWorktree: { + rootDirectory: ".ai_ops/worktrees", + baseRef: "HEAD", + }, + portRange: { + basePort: 36000, + blockSize: 32, + blockCount: 512, + primaryPortOffset: 0, + lockDirectory: ".ai_ops/locks/ports", + }, +}; + +const DEFAULT_DISCOVERY: DiscoveryRuntimeConfig = { + fileRelativePath: ".agent-context/resources.json", +}; + +function readOptionalString( + env: NodeJS.ProcessEnv, + key: string, +): string | undefined { + const value = env[key]?.trim(); + if (!value) { + return undefined; + } + return value; +} + +function readStringWithFallback( + env: NodeJS.ProcessEnv, + key: string, + fallback: string, +): string { + return readOptionalString(env, key) ?? fallback; +} + +function readIntegerWithBounds( + env: NodeJS.ProcessEnv, + key: string, + fallback: number, + bounds: { + min: number; + }, +): number { + const raw = env[key]?.trim(); + if (!raw) { + return fallback; + } + + const parsed = Number(raw); + if (!Number.isInteger(parsed) || parsed < bounds.min) { + throw new Error(`Environment variable ${key} must be an integer >= ${String(bounds.min)}.`); + } + + return parsed; +} + +function readBooleanWithFallback( + env: NodeJS.ProcessEnv, + key: string, + fallback: boolean, +): boolean { + const raw = env[key]?.trim(); + if (!raw) { + return fallback; + } + + if (raw === "true") { + return true; + } + if (raw === "false") { + return false; + } + + throw new Error(`Environment variable ${key} must be "true" or "false".`); +} + +function deepFreeze(value: T): Readonly { + if (value === null || typeof value !== "object") { + return value; + } + + const record = value as Record; + for (const nested of Object.values(record)) { + deepFreeze(nested); + } + + return Object.freeze(value); +} + +export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly { + const config: AppConfig = { + provider: { + codexApiKey: readOptionalString(env, "CODEX_API_KEY"), + openAiApiKey: readOptionalString(env, "OPENAI_API_KEY"), + openAiBaseUrl: readOptionalString(env, "OPENAI_BASE_URL"), + codexSkipGitCheck: readBooleanWithFallback(env, "CODEX_SKIP_GIT_CHECK", true), + anthropicApiKey: readOptionalString(env, "ANTHROPIC_API_KEY"), + claudeModel: readOptionalString(env, "CLAUDE_MODEL"), + claudeCodePath: readOptionalString(env, "CLAUDE_CODE_PATH"), + }, + mcp: { + configPath: readStringWithFallback(env, "MCP_CONFIG_PATH", "./mcp.config.json"), + }, + agentManager: { + maxConcurrentAgents: readIntegerWithBounds( + env, + "AGENT_MAX_CONCURRENT", + DEFAULT_AGENT_MANAGER.maxConcurrentAgents, + { min: 1 }, + ), + maxSessionAgents: readIntegerWithBounds( + env, + "AGENT_MAX_SESSION", + DEFAULT_AGENT_MANAGER.maxSessionAgents, + { min: 1 }, + ), + maxRecursiveDepth: readIntegerWithBounds( + env, + "AGENT_MAX_RECURSIVE_DEPTH", + DEFAULT_AGENT_MANAGER.maxRecursiveDepth, + { min: 1 }, + ), + }, + orchestration: { + stateRoot: readStringWithFallback( + env, + "AGENT_STATE_ROOT", + DEFAULT_ORCHESTRATION.stateRoot, + ), + projectContextPath: readStringWithFallback( + env, + "AGENT_PROJECT_CONTEXT_PATH", + DEFAULT_ORCHESTRATION.projectContextPath, + ), + maxDepth: readIntegerWithBounds( + env, + "AGENT_TOPOLOGY_MAX_DEPTH", + DEFAULT_ORCHESTRATION.maxDepth, + { min: 1 }, + ), + maxRetries: readIntegerWithBounds( + env, + "AGENT_TOPOLOGY_MAX_RETRIES", + DEFAULT_ORCHESTRATION.maxRetries, + { min: 0 }, + ), + maxChildren: readIntegerWithBounds( + env, + "AGENT_RELATIONSHIP_MAX_CHILDREN", + DEFAULT_ORCHESTRATION.maxChildren, + { min: 1 }, + ), + }, + provisioning: { + gitWorktree: { + rootDirectory: readStringWithFallback( + env, + "AGENT_WORKTREE_ROOT", + DEFAULT_PROVISIONING.gitWorktree.rootDirectory, + ), + baseRef: readStringWithFallback( + env, + "AGENT_WORKTREE_BASE_REF", + DEFAULT_PROVISIONING.gitWorktree.baseRef, + ), + }, + portRange: { + basePort: readIntegerWithBounds( + env, + "AGENT_PORT_BASE", + DEFAULT_PROVISIONING.portRange.basePort, + { min: 1 }, + ), + blockSize: readIntegerWithBounds( + env, + "AGENT_PORT_BLOCK_SIZE", + DEFAULT_PROVISIONING.portRange.blockSize, + { min: 1 }, + ), + blockCount: readIntegerWithBounds( + env, + "AGENT_PORT_BLOCK_COUNT", + DEFAULT_PROVISIONING.portRange.blockCount, + { min: 1 }, + ), + primaryPortOffset: readIntegerWithBounds( + env, + "AGENT_PORT_PRIMARY_OFFSET", + DEFAULT_PROVISIONING.portRange.primaryPortOffset, + { min: 0 }, + ), + lockDirectory: readStringWithFallback( + env, + "AGENT_PORT_LOCK_DIR", + DEFAULT_PROVISIONING.portRange.lockDirectory, + ), + }, + }, + discovery: { + fileRelativePath: readStringWithFallback( + env, + "AGENT_DISCOVERY_FILE_RELATIVE_PATH", + DEFAULT_DISCOVERY.fileRelativePath, + ), + }, + }; + + return deepFreeze(config); +} + +let configSingleton: Readonly | undefined; + +export function getConfig(): Readonly { + if (!configSingleton) { + configSingleton = loadConfig(process.env); + } + + return configSingleton; +} + +export function clearConfigCacheForTests(): void { + configSingleton = undefined; +} diff --git a/src/examples/claude.ts b/src/examples/claude.ts index fc3f68b..7db2f2d 100644 --- a/src/examples/claude.ts +++ b/src/examples/claude.ts @@ -1,99 +1,75 @@ import "dotenv/config"; import { query, type Options } from "@anthropic-ai/claude-agent-sdk"; import { pathToFileURL } from "node:url"; -import { getAgentManager, getResourceProvisioningOrchestrator } from "../agents/runtime.js"; -import { loadMcpConfigFromEnv } from "../mcp.js"; +import { getConfig } from "../config.js"; +import { createSessionContext } from "./session-context.js"; function requiredPrompt(argv: string[]): string { const prompt = argv.slice(2).join(" ").trim(); if (!prompt) { - throw new Error("Usage: npm run claude -- \"your prompt\""); + throw new Error('Usage: npm run claude -- "your prompt"'); } return prompt; } -function buildOptions(): Options { +function buildOptions(config = getConfig()): Options { return { maxTurns: 1, - ...(process.env.CLAUDE_MODEL ? { model: process.env.CLAUDE_MODEL } : {}), - ...(process.env.CLAUDE_CODE_PATH - ? { pathToClaudeCodeExecutable: process.env.CLAUDE_CODE_PATH } + ...(config.provider.claudeModel ? { model: config.provider.claudeModel } : {}), + ...(config.provider.claudeCodePath + ? { pathToClaudeCodeExecutable: config.provider.claudeCodePath } : {}), }; } export async function runClaudePrompt(prompt: string): Promise { - const agentManager = getAgentManager(); - const agentSession = agentManager.createSession(); - const resourceProvisioning = getResourceProvisioningOrchestrator(); - const mcp = loadMcpConfigFromEnv({ - providerHint: "claude", + const config = getConfig(); + const sessionContext = await createSessionContext("claude", { prompt, + config, }); - let provisionedResources: - | Awaited> - | undefined; try { - provisionedResources = await resourceProvisioning.provisionSession({ - sessionId: agentSession.id, - resources: [{ kind: "git-worktree" }, { kind: "port-range" }], - }); - const runtimeInjection = await provisionedResources.buildRuntimeInjection({ - discoveryFileRelativePath: process.env.AGENT_DISCOVERY_FILE_RELATIVE_PATH, - baseEnv: process.env, - }); - const promptWithContext = provisionedResources.composePrompt(prompt, [ - `Discovery file: ${runtimeInjection.discoveryFilePath}`, - "Resource env vars are pre-injected (AGENT_WORKTREE_PATH, AGENT_PORT_RANGE_START, AGENT_PORT_RANGE_END, AGENT_PORT_PRIMARY).", - ]); + const finalResponse = await sessionContext.runInSession(async () => { + const session = query({ + prompt: sessionContext.promptWithContext, + options: { + ...buildOptions(config), + ...(sessionContext.mcp.claudeMcpServers ? { mcpServers: sessionContext.mcp.claudeMcpServers } : {}), + cwd: sessionContext.runtimeInjection.workingDirectory, + env: sessionContext.runtimeInjection.env, + }, + }); - const finalResponse = await agentSession.runAgent({ - depth: 0, - run: async () => { - const session = query({ - prompt: promptWithContext, - options: { - ...buildOptions(), - ...(mcp.claudeMcpServers ? { mcpServers: mcp.claudeMcpServers } : {}), - cwd: runtimeInjection.workingDirectory, - env: runtimeInjection.env, - }, - }); + let result = ""; - let result = ""; - - try { - for await (const message of session) { - if (message.type === "result" && message.subtype === "success") { - result = message.result.trim(); - } - - if (message.type === "result" && message.subtype !== "success") { - const detail = message.errors.join("; "); - throw new Error( - `Claude query failed (${message.subtype})${detail ? `: ${detail}` : ""}`, - ); - } + try { + for await (const message of session) { + if (message.type === "result" && message.subtype === "success") { + result = message.result.trim(); } - } finally { - session.close(); - } - if (!result) { - throw new Error("Claude run completed without a final result."); + if (message.type === "result" && message.subtype !== "success") { + const detail = message.errors.join("; "); + throw new Error( + `Claude query failed (${message.subtype})${detail ? `: ${detail}` : ""}`, + ); + } } + } finally { + session.close(); + } - return result; - }, + if (!result) { + throw new Error("Claude run completed without a final result."); + } + + return result; }); console.log(finalResponse); } finally { - if (provisionedResources) { - await provisionedResources.release(); - } - agentSession.close(); + await sessionContext.close(); } } diff --git a/src/examples/codex.ts b/src/examples/codex.ts index 97ecb12..d12d4d9 100644 --- a/src/examples/codex.ts +++ b/src/examples/codex.ts @@ -1,67 +1,43 @@ import "dotenv/config"; import { Codex } from "@openai/codex-sdk"; import { pathToFileURL } from "node:url"; -import { getAgentManager, getResourceProvisioningOrchestrator } from "../agents/runtime.js"; -import { loadMcpConfigFromEnv } from "../mcp.js"; +import { getConfig } from "../config.js"; +import { createSessionContext } from "./session-context.js"; function requiredPrompt(argv: string[]): string { const prompt = argv.slice(2).join(" ").trim(); if (!prompt) { - throw new Error("Usage: npm run codex -- \"your prompt\""); + throw new Error('Usage: npm run codex -- "your prompt"'); } return prompt; } export async function runCodexPrompt(prompt: string): Promise { - const agentManager = getAgentManager(); - const agentSession = agentManager.createSession(); - const resourceProvisioning = getResourceProvisioningOrchestrator(); - const apiKey = process.env.CODEX_API_KEY ?? process.env.OPENAI_API_KEY; - const mcp = loadMcpConfigFromEnv({ - providerHint: "codex", + const config = getConfig(); + const sessionContext = await createSessionContext("codex", { prompt, + config, }); - let provisionedResources: - | Awaited> - | undefined; try { - provisionedResources = await resourceProvisioning.provisionSession({ - sessionId: agentSession.id, - resources: [{ kind: "git-worktree" }, { kind: "port-range" }], - }); - const runtimeInjection = await provisionedResources.buildRuntimeInjection({ - discoveryFileRelativePath: process.env.AGENT_DISCOVERY_FILE_RELATIVE_PATH, - baseEnv: process.env, - }); + const apiKey = config.provider.codexApiKey ?? config.provider.openAiApiKey; const codex = new Codex({ ...(apiKey ? { apiKey } : {}), - ...(process.env.OPENAI_BASE_URL ? { baseUrl: process.env.OPENAI_BASE_URL } : {}), - ...(mcp.codexConfig ? { config: mcp.codexConfig } : {}), - env: runtimeInjection.env, + ...(config.provider.openAiBaseUrl ? { baseUrl: config.provider.openAiBaseUrl } : {}), + ...(sessionContext.mcp.codexConfig ? { config: sessionContext.mcp.codexConfig } : {}), + env: sessionContext.runtimeInjection.env, }); const thread = codex.startThread({ - workingDirectory: runtimeInjection.workingDirectory, - skipGitRepoCheck: process.env.CODEX_SKIP_GIT_CHECK !== "false", + workingDirectory: sessionContext.runtimeInjection.workingDirectory, + skipGitRepoCheck: config.provider.codexSkipGitCheck, }); - const promptWithContext = provisionedResources.composePrompt(prompt, [ - `Discovery file: ${runtimeInjection.discoveryFilePath}`, - "Resource env vars are pre-injected (AGENT_WORKTREE_PATH, AGENT_PORT_RANGE_START, AGENT_PORT_RANGE_END, AGENT_PORT_PRIMARY).", - ]); - - const turn = await agentSession.runAgent({ - depth: 0, - run: () => thread.run(promptWithContext), - }); + const turn = await sessionContext.runInSession(() => thread.run(sessionContext.promptWithContext)); console.log(turn.finalResponse.trim() || "(No response text returned)"); } finally { - if (provisionedResources) { - await provisionedResources.release(); - } - agentSession.close(); + await sessionContext.close(); } } diff --git a/src/examples/session-context.ts b/src/examples/session-context.ts new file mode 100644 index 0000000..32c9db6 --- /dev/null +++ b/src/examples/session-context.ts @@ -0,0 +1,105 @@ +import { getConfig, type AppConfig } from "../config.js"; +import type { AgentSession } from "../agents/manager.js"; +import type { ProvisionedResources } from "../agents/provisioning.js"; +import { + getAgentManager, + getResourceProvisioningOrchestrator, +} from "../agents/runtime.js"; +import { + getDefaultMcpRegistry, + loadMcpConfigFromEnv, + type LoadedMcpConfig, + type McpRegistry, +} from "../mcp.js"; + +export type SessionProvider = "codex" | "claude"; + +export type SessionContext = { + provider: SessionProvider; + sessionId: string; + mcp: LoadedMcpConfig; + promptWithContext: string; + runtimeInjection: Awaited>; + runInSession: (run: () => Promise) => Promise; + close: () => Promise; +}; + +export async function createSessionContext( + provider: SessionProvider, + input: { + prompt: string; + config?: Readonly; + mcpRegistry?: McpRegistry; + }, +): Promise { + const config = input.config ?? getConfig(); + const mcpRegistry = input.mcpRegistry ?? getDefaultMcpRegistry(); + const agentManager = getAgentManager(config); + const agentSession = agentManager.createSession(); + const resourceProvisioning = getResourceProvisioningOrchestrator(config); + + let provisionedResources: ProvisionedResources | undefined; + let closed = false; + + const close = async (): Promise => { + if (closed) { + return; + } + closed = true; + + if (provisionedResources) { + await provisionedResources.release(); + } + + agentSession.close(); + }; + + try { + provisionedResources = await resourceProvisioning.provisionSession({ + sessionId: agentSession.id, + resources: [{ kind: "git-worktree" }, { kind: "port-range" }], + }); + + const runtimeInjection = await provisionedResources.buildRuntimeInjection({ + discoveryFileRelativePath: config.discovery.fileRelativePath, + baseEnv: process.env, + }); + + const promptWithContext = provisionedResources.composePrompt(input.prompt, [ + `Discovery file: ${runtimeInjection.discoveryFilePath}`, + "Resource env vars are pre-injected (AGENT_WORKTREE_PATH, AGENT_PORT_RANGE_START, AGENT_PORT_RANGE_END, AGENT_PORT_PRIMARY).", + ]); + + const mcp = loadMcpConfigFromEnv( + { + providerHint: provider, + prompt: input.prompt, + }, + { + config, + registry: mcpRegistry, + }, + ); + + return { + provider, + sessionId: agentSession.id, + mcp, + promptWithContext, + runtimeInjection, + runInSession: (run: () => Promise) => + runWithAgentSession(agentSession, run), + close, + }; + } catch (error) { + await close(); + throw error; + } +} + +async function runWithAgentSession(agentSession: AgentSession, run: () => Promise): Promise { + return agentSession.runAgent({ + depth: 0, + run, + }); +} diff --git a/src/mcp.ts b/src/mcp.ts index 2e8bb3a..47cd4a6 100644 --- a/src/mcp.ts +++ b/src/mcp.ts @@ -1,17 +1,18 @@ import { existsSync, readFileSync } from "node:fs"; import { resolve } from "node:path"; import type { CodexOptions } from "@openai/codex-sdk"; +import { getConfig, type AppConfig } from "./config.js"; +import { normalizeSharedMcpConfigFile } from "./mcp/converters.js"; import { + createDefaultMcpRegistry, createMcpHandlerShell, - listMcpHandlers, - registerMcpHandler, - resolveServerWithHandler, type McpHandlerBusinessLogic, type McpHandlerBusinessLogicInput, type McpHandlerInput, type McpHandlerResult, type McpHandlerShellOptions, type McpHandlerUtils, + McpRegistry, type McpServerHandler, } from "./mcp/handlers.js"; import type { @@ -24,16 +25,15 @@ function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); } -function readConfigFile(pathFromEnv: string | undefined): { +function readConfigFile(configPath: string): { config?: SharedMcpConfigFile; sourcePath?: string; } { - const explicitPath = pathFromEnv?.trim(); - const candidatePath = explicitPath || "./mcp.config.json"; + const candidatePath = configPath.trim() || "./mcp.config.json"; const resolvedPath = resolve(process.cwd(), candidatePath); if (!existsSync(resolvedPath)) { - if (explicitPath) { + if (candidatePath !== "./mcp.config.json") { throw new Error(`MCP config file not found: ${resolvedPath}`); } return {}; @@ -45,11 +45,29 @@ function readConfigFile(pathFromEnv: string | undefined): { throw new Error(`MCP config file must contain a JSON object: ${resolvedPath}`); } - return { config: parsed as SharedMcpConfigFile, sourcePath: resolvedPath }; + return { + config: normalizeSharedMcpConfigFile(parsed as SharedMcpConfigFile), + sourcePath: resolvedPath, + }; } -export function loadMcpConfigFromEnv(context: McpLoadContext = {}): LoadedMcpConfig { - const { config, sourcePath } = readConfigFile(process.env.MCP_CONFIG_PATH); +const defaultMcpRegistry = createDefaultMcpRegistry(); + +export function getDefaultMcpRegistry(): McpRegistry { + return defaultMcpRegistry; +} + +export function loadMcpConfigFromEnv( + context: McpLoadContext = {}, + options?: { + config?: Readonly; + registry?: McpRegistry; + }, +): LoadedMcpConfig { + const runtimeConfig = options?.config ?? getConfig(); + const registry = options?.registry ?? defaultMcpRegistry; + + const { config, sourcePath } = readConfigFile(runtimeConfig.mcp.configPath); if (!config) { return {}; } @@ -59,7 +77,7 @@ export function loadMcpConfigFromEnv(context: McpLoadContext = {}): LoadedMcpCon const resolvedHandlers: Record = {}; for (const [serverName, server] of Object.entries(config.servers ?? {})) { - const resolved = resolveServerWithHandler({ + const resolved = registry.resolveServerWithHandler({ serverName, server, context, @@ -102,7 +120,15 @@ export function loadMcpConfigFromEnv(context: McpLoadContext = {}): LoadedMcpCon }; } -export { createMcpHandlerShell, listMcpHandlers, registerMcpHandler }; +export function registerMcpHandler(handler: McpServerHandler): void { + defaultMcpRegistry.register(handler); +} + +export function listMcpHandlers(): McpServerHandler[] { + return defaultMcpRegistry.listHandlers(); +} + +export { createDefaultMcpRegistry, createMcpHandlerShell, McpRegistry }; export type { LoadedMcpConfig, McpHandlerBusinessLogic, diff --git a/src/mcp/converters.ts b/src/mcp/converters.ts index 01e5a06..bf98594 100644 --- a/src/mcp/converters.ts +++ b/src/mcp/converters.ts @@ -1,5 +1,42 @@ import type { McpServerConfig } from "@anthropic-ai/claude-agent-sdk"; -import type { CodexConfigObject, SharedMcpServer, Transport } from "./types.js"; +import type { + CodexConfigObject, + SharedMcpConfigFile, + SharedMcpServer, + Transport, +} from "./types.js"; + +function mergeHeaders(server: SharedMcpServer): Record | undefined { + const merged = { + ...(server.http_headers ?? {}), + ...(server.headers ?? {}), + }; + + return Object.keys(merged).length > 0 ? merged : undefined; +} + +export function normalizeSharedMcpServer(server: SharedMcpServer): SharedMcpServer { + const { headers: _headers, http_headers: _httpHeaders, ...rest } = server; + const normalizedHeaders = mergeHeaders(server); + + return { + ...rest, + ...(normalizedHeaders ? { headers: normalizedHeaders } : {}), + }; +} + +export function normalizeSharedMcpConfigFile(config: SharedMcpConfigFile): SharedMcpConfigFile { + const normalizedServers: Record = {}; + + for (const [serverName, server] of Object.entries(config.servers ?? {})) { + normalizedServers[serverName] = normalizeSharedMcpServer(server); + } + + return { + ...config, + ...(Object.keys(normalizedServers).length > 0 ? { servers: normalizedServers } : {}), + }; +} export function inferTransport(server: SharedMcpServer): Transport { if (server.type) { @@ -10,6 +47,7 @@ export function inferTransport(server: SharedMcpServer): Transport { export function toCodexServerConfig(serverName: string, server: SharedMcpServer): CodexConfigObject { const type = inferTransport(server); + const headers = mergeHeaders(server); if (type === "stdio" && !server.command) { throw new Error(`Shared MCP server "${serverName}" requires "command" for stdio transport.`); @@ -38,9 +76,8 @@ export function toCodexServerConfig(serverName: string, server: SharedMcpServer) if (server.bearer_token_env_var) { config.bearer_token_env_var = server.bearer_token_env_var; } - const httpHeaders = server.http_headers ?? server.headers; - if (httpHeaders) { - config.http_headers = httpHeaders; + if (headers) { + config.http_headers = headers; } if (server.env_http_headers) config.env_http_headers = server.env_http_headers; if (server.env_vars) config.env_vars = server.env_vars; @@ -50,6 +87,7 @@ export function toCodexServerConfig(serverName: string, server: SharedMcpServer) export function toClaudeServerConfig(serverName: string, server: SharedMcpServer): McpServerConfig { const type = inferTransport(server); + const headers = mergeHeaders(server); if (type === "stdio") { if (!server.command) { @@ -70,7 +108,6 @@ export function toClaudeServerConfig(serverName: string, server: SharedMcpServer return { type, url: server.url, - ...(server.headers ? { headers: server.headers } : {}), + ...(headers ? { headers } : {}), }; } - diff --git a/src/mcp/handlers.ts b/src/mcp/handlers.ts index 8668c57..40e941c 100644 --- a/src/mcp/handlers.ts +++ b/src/mcp/handlers.ts @@ -127,107 +127,121 @@ function applyEnabledByDefault(input: McpHandlerBusinessLogicInput): McpHandlerR : input.baseResult; } -const context7Handler = createMcpHandlerShell({ - id: "context7", - description: - "Dedicated extension point for Context7 policy/behavior. Business logic belongs in applyBusinessLogic.", - matches: (input) => isNamedLike(input, ["context7"]), - applyBusinessLogic: applyEnabledByDefault, -}); - -const claudeTaskMasterHandler = createMcpHandlerShell({ - id: "claude-task-master", - description: - "Dedicated extension point for Claude Task Master policy/behavior. Business logic belongs in applyBusinessLogic.", - matches: (input) => - isNamedLike(input, ["claude-task-master", "task-master", "taskmaster"]), - applyBusinessLogic: applyEnabledByDefault, -}); - -const genericHandler: McpServerHandler = { - id: "generic", - description: "Default passthrough mapping for project-specific MCP servers.", - matches: () => true, - resolve: ({ serverName, server, utils: localUtils }) => - createDefaultResult({ serverName, server, localUtils }), -}; - -const handlerRegistry = new Map(); -const handlerOrder: string[] = []; - -function installBuiltinHandlers(): void { - registerMcpHandler(context7Handler); - registerMcpHandler(claudeTaskMasterHandler); - registerMcpHandler(genericHandler); -} - -export function registerMcpHandler(handler: McpServerHandler): void { - if (handlerRegistry.has(handler.id)) { - handlerRegistry.set(handler.id, handler); - return; - } - handlerRegistry.set(handler.id, handler); - handlerOrder.push(handler.id); -} - -export function listMcpHandlers(): McpServerHandler[] { - return handlerOrder - .map((id) => handlerRegistry.get(id)) - .filter((handler): handler is McpServerHandler => Boolean(handler)); -} - -function resolveHandler(serverName: string, server: SharedMcpServer): McpServerHandler { - if (server.handler) { - const explicit = handlerRegistry.get(server.handler); - if (!explicit) { - throw new Error( - `Unknown MCP handler "${server.handler}" configured for server "${serverName}".`, - ); - } - return explicit; - } - - for (const id of handlerOrder) { - const handler = handlerRegistry.get(id); - if (!handler || id === "generic") { - continue; - } - if (handler.matches({ serverName, server })) { - return handler; - } - } - - const fallback = handlerRegistry.get("generic"); - if (!fallback) { - throw new Error('No MCP fallback handler registered. Expected handler id "generic".'); - } - return fallback; -} - -export function resolveServerWithHandler(input: { - serverName: string; - server: SharedMcpServer; - context: McpLoadContext; - fullConfig: SharedMcpConfigFile; -}): McpHandlerResult & { handlerId: string } { - const { serverName, server, context, fullConfig } = input; - const handler = resolveHandler(serverName, server); - const handlerConfig = { - ...(fullConfig.handlerSettings?.[handler.id] ?? {}), - ...(server.handlerOptions ?? {}), - }; - const result = handler.resolve({ - serverName, - server, - context, - handlerConfig, - fullConfig, - utils, +function createBuiltinHandlers(): McpServerHandler[] { + const context7Handler = createMcpHandlerShell({ + id: "context7", + description: + "Dedicated extension point for Context7 policy/behavior. Business logic belongs in applyBusinessLogic.", + matches: (input) => isNamedLike(input, ["context7"]), + applyBusinessLogic: applyEnabledByDefault, }); - return { - ...result, - handlerId: handler.id, + + const claudeTaskMasterHandler = createMcpHandlerShell({ + id: "claude-task-master", + description: + "Dedicated extension point for Claude Task Master policy/behavior. Business logic belongs in applyBusinessLogic.", + matches: (input) => + isNamedLike(input, ["claude-task-master", "task-master", "taskmaster"]), + applyBusinessLogic: applyEnabledByDefault, + }); + + const genericHandler: McpServerHandler = { + id: "generic", + description: "Default passthrough mapping for project-specific MCP servers.", + matches: () => true, + resolve: ({ serverName, server, utils: localUtils }) => + createDefaultResult({ serverName, server, localUtils }), }; + + return [context7Handler, claudeTaskMasterHandler, genericHandler]; } -installBuiltinHandlers(); +export class McpRegistry { + private readonly handlerRegistry = new Map(); + private readonly handlerOrder: string[] = []; + + constructor(input?: { handlers?: McpServerHandler[] }) { + for (const handler of input?.handlers ?? []) { + this.register(handler); + } + } + + register(handler: McpServerHandler): void { + if (this.handlerRegistry.has(handler.id)) { + this.handlerRegistry.set(handler.id, handler); + return; + } + + this.handlerRegistry.set(handler.id, handler); + this.handlerOrder.push(handler.id); + } + + listHandlers(): McpServerHandler[] { + return this.handlerOrder + .map((id) => this.handlerRegistry.get(id)) + .filter((handler): handler is McpServerHandler => Boolean(handler)); + } + + resolveServerWithHandler(input: { + serverName: string; + server: SharedMcpServer; + context: McpLoadContext; + fullConfig: SharedMcpConfigFile; + }): McpHandlerResult & { handlerId: string } { + const { serverName, server, context, fullConfig } = input; + const handler = this.resolveHandler(serverName, server); + const handlerConfig = { + ...(fullConfig.handlerSettings?.[handler.id] ?? {}), + ...(server.handlerOptions ?? {}), + }; + + const result = handler.resolve({ + serverName, + server, + context, + handlerConfig, + fullConfig, + utils, + }); + + return { + ...result, + handlerId: handler.id, + }; + } + + private resolveHandler(serverName: string, server: SharedMcpServer): McpServerHandler { + if (server.handler) { + const explicit = this.handlerRegistry.get(server.handler); + if (!explicit) { + throw new Error( + `Unknown MCP handler "${server.handler}" configured for server "${serverName}".`, + ); + } + return explicit; + } + + for (const id of this.handlerOrder) { + const handler = this.handlerRegistry.get(id); + if (!handler || id === "generic") { + continue; + } + if (handler.matches({ serverName, server })) { + return handler; + } + } + + const fallback = this.handlerRegistry.get("generic"); + if (!fallback) { + throw new Error('No MCP fallback handler registered. Expected handler id "generic".'); + } + + return fallback; + } +} + +export function createDefaultMcpRegistry(): McpRegistry { + return new McpRegistry({ + handlers: createBuiltinHandlers(), + }); +} diff --git a/tests/config.test.ts b/tests/config.test.ts new file mode 100644 index 0000000..bea5122 --- /dev/null +++ b/tests/config.test.ts @@ -0,0 +1,22 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { loadConfig } from "../src/config.js"; + +test("loads defaults and freezes config", () => { + const config = loadConfig({}); + + assert.equal(config.agentManager.maxConcurrentAgents, 4); + assert.equal(config.orchestration.maxDepth, 4); + assert.equal(config.provisioning.portRange.basePort, 36000); + assert.equal(config.discovery.fileRelativePath, ".agent-context/resources.json"); + assert.equal(Object.isFrozen(config), true); + assert.equal(Object.isFrozen(config.orchestration), true); +}); + +test("validates boolean env values", () => { + assert.throws( + () => loadConfig({ CODEX_SKIP_GIT_CHECK: "maybe" }), + /must be "true" or "false"/, + ); +}); + diff --git a/tests/mcp-converters.test.ts b/tests/mcp-converters.test.ts index 8d2c191..2070967 100644 --- a/tests/mcp-converters.test.ts +++ b/tests/mcp-converters.test.ts @@ -2,6 +2,7 @@ import test from "node:test"; import assert from "node:assert/strict"; import { inferTransport, + normalizeSharedMcpServer, toClaudeServerConfig, toCodexServerConfig, } from "../src/mcp/converters.js"; @@ -41,6 +42,42 @@ test("maps shared headers to codex http_headers", () => { }); }); +test("normalizes header aliases into a single headers object", () => { + const normalized = normalizeSharedMcpServer({ + url: "http://localhost:3000/mcp", + http_headers: { + "X-Source": "legacy", + }, + headers: { + Authorization: "Bearer token", + }, + }); + + assert.deepEqual(normalized.headers, { + "X-Source": "legacy", + Authorization: "Bearer token", + }); + assert.equal("http_headers" in normalized, false); +}); + +test("maps legacy http_headers alias for claude conversion", () => { + const claudeConfig = toClaudeServerConfig("legacy-http-headers", { + type: "http", + url: "http://localhost:3000/mcp", + http_headers: { + Authorization: "Bearer token", + }, + }); + + assert.deepEqual(claudeConfig, { + type: "http", + url: "http://localhost:3000/mcp", + headers: { + Authorization: "Bearer token", + }, + }); +}); + test("throws for claude http server without url", () => { assert.throws( () => toClaudeServerConfig("bad-http", { type: "http" }), diff --git a/tests/mcp-registry.test.ts b/tests/mcp-registry.test.ts new file mode 100644 index 0000000..83ee9ff --- /dev/null +++ b/tests/mcp-registry.test.ts @@ -0,0 +1,65 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { + McpRegistry, + createDefaultMcpRegistry, + createMcpHandlerShell, +} from "../src/mcp/handlers.js"; + +test("mcp registries are isolated instances", () => { + const registryA = createDefaultMcpRegistry(); + const registryB = createDefaultMcpRegistry(); + + registryA.register( + createMcpHandlerShell({ + id: "custom-a", + description: "custom handler", + matches: () => false, + }), + ); + + assert.equal(registryA.listHandlers().some((handler) => handler.id === "custom-a"), true); + assert.equal(registryB.listHandlers().some((handler) => handler.id === "custom-a"), false); +}); + +test("mcp registry resolves generic fallback by default", () => { + const registry = createDefaultMcpRegistry(); + + const resolved = registry.resolveServerWithHandler({ + serverName: "local-files", + server: { + type: "stdio", + command: "npx", + args: ["-y", "@modelcontextprotocol/server-filesystem", "."], + }, + context: {}, + fullConfig: { + servers: {}, + }, + }); + + assert.equal(resolved.handlerId, "generic"); + assert.ok(resolved.codex); + assert.ok(resolved.claude); +}); + +test("mcp registry rejects unknown explicit handlers", () => { + const registry = new McpRegistry(); + + assert.throws( + () => + registry.resolveServerWithHandler({ + serverName: "broken", + server: { + type: "http", + url: "http://localhost:3000/mcp", + handler: "missing-handler", + }, + context: {}, + fullConfig: { + servers: {}, + }, + }), + /Unknown MCP handler/, + ); +}); diff --git a/tests/orchestration-engine.test.ts b/tests/orchestration-engine.test.ts index 0768199..3d3cf7b 100644 --- a/tests/orchestration-engine.test.ts +++ b/tests/orchestration-engine.test.ts @@ -230,6 +230,7 @@ test("runs DAG pipeline with state-dependent routing and retry behavior", async task: "Implement pipeline", }, }); + assert.equal(result.status, "success"); assert.deepEqual( result.records.map((record) => `${record.nodeId}:${record.status}:${String(record.attempt)}`), @@ -471,6 +472,7 @@ test("runs parallel topology blocks concurrently and routes via domain-event edg const result = await runPromise; assert.equal(maxConcurrentCoders, 2); + assert.equal(result.status, "success"); assert.deepEqual( result.records.map((record) => `${record.nodeId}:${record.status}`), ["plan:success", "code-a:success", "code-b:success", "integrate:success"], @@ -577,6 +579,96 @@ test("fails fast after two sequential hard failures", async () => { ); }); +test("marks aggregate status as failure when a terminal node fails", async () => { + const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); + const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); + const projectContextPath = resolve(stateRoot, "project-context.json"); + + const manifest = { + schemaVersion: "1", + topologies: ["sequential"], + personas: [ + { + id: "coder", + displayName: "Coder", + systemPromptTemplate: "Coder", + toolClearance: { + allowlist: [], + banlist: [], + }, + }, + ], + relationships: [], + topologyConstraints: { + maxDepth: 3, + maxRetries: 0, + }, + pipeline: { + entryNodeId: "build", + nodes: [ + { + id: "build", + actorId: "build_actor", + personaId: "coder", + }, + { + id: "verify", + actorId: "verify_actor", + personaId: "coder", + }, + ], + edges: [ + { + from: "build", + to: "verify", + on: "success", + }, + ], + }, + } as const; + + const engine = new SchemaDrivenExecutionEngine({ + manifest, + settings: { + workspaceRoot, + stateRoot, + projectContextPath, + maxDepth: 3, + maxRetries: 0, + maxChildren: 2, + runtimeContext: {}, + }, + actorExecutors: { + build_actor: async () => ({ + status: "success", + payload: { + step: "build", + }, + }), + verify_actor: async () => ({ + status: "failure", + payload: { + error: "verification failed", + }, + failureKind: "soft", + }), + }, + }); + + const result = await engine.runSession({ + sessionId: "session-terminal-failure", + initialPayload: { + task: "Aggregate failure status", + }, + }); + + assert.equal(result.status, "failure"); + assert.deepEqual( + result.records.map((record) => `${record.nodeId}:${record.status}`), + ["build:success", "verify:failure"], + ); +}); + test("propagates abort signal into actor execution and stops the run", async () => { const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-")); const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-")); diff --git a/tests/project-context.test.ts b/tests/project-context.test.ts index d7de7b8..7c7ec5b 100644 --- a/tests/project-context.test.ts +++ b/tests/project-context.test.ts @@ -1,6 +1,6 @@ import test from "node:test"; import assert from "node:assert/strict"; -import { mkdtemp } from "node:fs/promises"; +import { mkdtemp, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { resolve } from "node:path"; import { FileSystemProjectContextStore } from "../src/agents/project-context.js"; @@ -13,6 +13,7 @@ test("project context store reads defaults and applies domain patches", async () const initial = await store.readState(); assert.deepEqual(initial, { + schemaVersion: 1, globalFlags: {}, artifactPointers: {}, taskQueue: [], @@ -55,4 +56,35 @@ test("project context store reads defaults and applies domain patches", async () updated.taskQueue.map((task) => `${task.id}:${task.status}`), ["task-1:in_progress", "task-2:pending"], ); + assert.equal(updated.schemaVersion, 1); +}); + +test("project context parser merges missing root keys with defaults", async () => { + const root = await mkdtemp(resolve(tmpdir(), "ai-ops-project-context-")); + const filePath = resolve(root, "project-context.json"); + const store = new FileSystemProjectContextStore({ filePath }); + + await writeFile( + filePath, + `${JSON.stringify( + { + taskQueue: [ + { + id: "task-1", + title: "Migrate", + status: "pending", + }, + ], + }, + null, + 2, + )}\n`, + "utf8", + ); + + const state = await store.readState(); + assert.equal(state.schemaVersion, 1); + assert.deepEqual(state.globalFlags, {}); + assert.deepEqual(state.artifactPointers, {}); + assert.equal(state.taskQueue[0]?.id, "task-1"); });