Refactor pipeline policies, MCP registry, and unified config/runtime

This commit is contained in:
2026-02-23 13:56:45 -05:00
parent 889087daa1
commit 9b4216dda9
22 changed files with 1441 additions and 587 deletions

View File

@@ -0,0 +1,66 @@
import type { ActorExecutionResult, ActorFailureKind } from "./pipeline.js";
function toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
function containsHardFailureSignal(value: string): boolean {
return /(timeout|timed out|network|econnreset|econnrefused|enotfound|403|forbidden)/i.test(value);
}
function toFailureCodeFromError(error: unknown): string | undefined {
if (!(error instanceof Error)) {
return undefined;
}
const maybeCode = (error as NodeJS.ErrnoException).code;
return typeof maybeCode === "string" ? maybeCode : undefined;
}
export class FailurePolicy {
isHardFailure(result: ActorExecutionResult): boolean {
if (result.failureKind === "hard") {
return true;
}
if (result.status !== "failure") {
return false;
}
const payloadText = (() => {
const message = result.payload?.error;
return typeof message === "string" ? message : "";
})();
const codeText = result.failureCode ?? "";
return containsHardFailureSignal(`${codeText} ${payloadText}`);
}
classifyFailureFromError(error: unknown): {
payloadErrorMessage: string;
failureCode?: string;
failureKind: ActorFailureKind;
} {
const message = toErrorMessage(error);
const failureCode = toFailureCodeFromError(error);
const failureKind = containsHardFailureSignal(`${failureCode ?? ""} ${message}`)
? "hard"
: "soft";
return {
payloadErrorMessage: message,
...(failureCode ? { failureCode } : {}),
failureKind,
};
}
shouldAbortAfterSequentialHardFailures(
sequentialHardFailureCount: number,
threshold: number,
): boolean {
return sequentialHardFailureCount >= threshold;
}
}

View File

@@ -0,0 +1,105 @@
import { randomUUID } from "node:crypto";
import { mkdir, open, rename, stat, unlink, writeFile } from "node:fs/promises";
import { basename, dirname, resolve } from "node:path";
function sleep(ms: number): Promise<void> {
return new Promise((resolveSleep) => {
setTimeout(resolveSleep, ms);
});
}
async function cleanupFile(path: string): Promise<void> {
try {
await unlink(path);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
}
}
export async function writeUtf8FileAtomic(path: string, content: string): Promise<void> {
const directory = dirname(path);
await mkdir(directory, { recursive: true });
const tempFileName = `.${basename(path)}.${String(process.pid)}.${randomUUID()}.tmp`;
const tempPath = resolve(directory, tempFileName);
try {
await writeFile(tempPath, content, "utf8");
await rename(tempPath, path);
} catch (error) {
await cleanupFile(tempPath);
throw error;
}
}
async function tryAcquireFileLock(
lockPath: string,
): Promise<Awaited<ReturnType<typeof open>> | undefined> {
try {
const handle = await open(lockPath, "wx");
await handle.writeFile(`${JSON.stringify({ pid: process.pid, acquiredAt: new Date().toISOString() })}\n`);
return handle;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "EEXIST") {
return undefined;
}
throw error;
}
}
async function clearStaleLock(lockPath: string, staleAfterMs: number): Promise<void> {
try {
const stats = await stat(lockPath);
const ageMs = Date.now() - stats.mtimeMs;
if (ageMs <= staleAfterMs) {
return;
}
await cleanupFile(lockPath);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
}
}
export async function withFileLock<T>(
lockPath: string,
operation: () => Promise<T>,
options?: {
maxWaitMs?: number;
retryDelayMs?: number;
staleAfterMs?: number;
},
): Promise<T> {
const maxWaitMs = options?.maxWaitMs ?? 5000;
const retryDelayMs = options?.retryDelayMs ?? 25;
const staleAfterMs = options?.staleAfterMs ?? 30_000;
await mkdir(dirname(lockPath), { recursive: true });
const startedAt = Date.now();
// Busy-wait with bounded retries to coordinate concurrent writers across processes.
while (true) {
const handle = await tryAcquireFileLock(lockPath);
if (handle) {
try {
return await operation();
} finally {
await handle.close();
await cleanupFile(lockPath);
}
}
await clearStaleLock(lockPath, staleAfterMs);
if (Date.now() - startedAt >= maxWaitMs) {
throw new Error(`Timed out waiting for file lock: ${lockPath}`);
}
await sleep(retryDelayMs);
}
}

View File

@@ -0,0 +1,126 @@
import {
DomainEventBus,
type DomainEvent,
type DomainEventType,
} from "./domain-events.js";
import type { PipelineNode } from "./manifest.js";
import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js";
import { PersonaRegistry } from "./persona-registry.js";
import {
FileSystemStateContextManager,
type SessionHistoryEntry,
} from "./state-context.js";
import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js";
export type PipelineNodeAttemptObservedEvent = {
sessionId: string;
node: PipelineNode;
attempt: number;
result: ActorExecutionResult;
domainEvents: DomainEvent[];
};
function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined {
if (status === "success") {
return "onTaskComplete";
}
if (status === "validation_fail") {
return "onValidationFail";
}
return undefined;
}
export interface PipelineLifecycleObserver {
onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise<void>;
}
export class PersistenceLifecycleObserver implements PipelineLifecycleObserver {
constructor(
private readonly input: {
personaRegistry: PersonaRegistry;
stateManager: FileSystemStateContextManager;
projectContextStore: FileSystemProjectContextStore;
domainEventBus?: DomainEventBus;
},
) {}
async onNodeAttempt(event: PipelineNodeAttemptObservedEvent): Promise<void> {
const behaviorEvent = toBehaviorEvent(event.result.status);
const behaviorPatch = behaviorEvent
? await this.input.personaRegistry.emitBehaviorEvent({
personaId: event.node.personaId,
event: behaviorEvent,
sessionId: event.sessionId,
nodeId: event.node.id,
payload: event.result.payload ?? {},
})
: {};
const legacyHistoryEvent: SessionHistoryEntry = {
nodeId: event.node.id,
event: event.result.status,
timestamp: new Date().toISOString(),
...(event.result.payload ? { data: event.result.payload } : {}),
};
const domainHistoryEvents: SessionHistoryEntry[] = event.domainEvents.map((domainEvent) => ({
nodeId: event.node.id,
event: domainEvent.type,
timestamp: domainEvent.timestamp,
data: {
source: domainEvent.source,
attempt: domainEvent.attempt,
...(domainEvent.payload.summary ? { summary: domainEvent.payload.summary } : {}),
...(domainEvent.payload.errorCode ? { errorCode: domainEvent.payload.errorCode } : {}),
...(domainEvent.payload.artifactPointer
? { artifactPointer: domainEvent.payload.artifactPointer }
: {}),
...(domainEvent.payload.details ? { details: domainEvent.payload.details } : {}),
},
}));
await this.input.stateManager.patchState(event.sessionId, {
...(event.result.stateFlags ? { flags: event.result.stateFlags } : {}),
metadata: {
...(event.result.stateMetadata ?? {}),
...behaviorPatch,
},
historyEvent: legacyHistoryEvent,
historyEvents: domainHistoryEvents,
});
const domainEventBus = this.input.domainEventBus;
if (domainEventBus) {
for (const domainEvent of event.domainEvents) {
await domainEventBus.publish(domainEvent);
}
}
const patch: ProjectContextPatch = {
...(event.result.projectContextPatch ?? {}),
artifactPointers: {
[`sessions/${event.sessionId}/last_completed_node`]: event.node.id,
[`sessions/${event.sessionId}/last_attempt`]: String(event.attempt),
...(event.result.projectContextPatch?.artifactPointers ?? {}),
},
};
await this.input.projectContextStore.patchState(patch);
}
}
export class DomainEventCollector {
private readonly events: DomainEvent[] = [];
record(events: DomainEvent[]): void {
this.events.push(...events);
}
toEventTypes(events: DomainEvent[]): DomainEventType[] {
return events.map((event) => event.type);
}
getAll(): DomainEvent[] {
return [...this.events];
}
}

View File

@@ -1,4 +1,6 @@
import { resolve } from "node:path";
import { getConfig, loadConfig, type AppConfig } from "../config.js";
import { createDefaultMcpRegistry, McpRegistry } from "../mcp.js";
import { parseAgentManifest, type AgentManifest } from "./manifest.js";
import { AgentManager } from "./manager.js";
import {
@@ -8,7 +10,6 @@ import {
} from "./persona-registry.js";
import { PipelineExecutor, type ActorExecutor, type PipelineRunSummary } from "./pipeline.js";
import { FileSystemProjectContextStore } from "./project-context.js";
import { loadAgentManagerLimitsFromEnv } from "./runtime.js";
import { FileSystemStateContextManager, type StoredSessionState } from "./state-context.js";
import type { JsonObject } from "./types.js";
@@ -26,59 +27,16 @@ export type BehaviorHandlerRegistry = Partial<
Record<string, Partial<Record<PersonaBehaviorEvent, PersonaBehaviorHandler>>>
>;
function readOptionalIntegerEnv(
key:
| "AGENT_TOPOLOGY_MAX_DEPTH"
| "AGENT_TOPOLOGY_MAX_RETRIES"
| "AGENT_RELATIONSHIP_MAX_CHILDREN",
fallback: number,
min: number,
): number {
const raw = process.env[key]?.trim();
if (!raw) {
return fallback;
}
const parsed = Number(raw);
if (!Number.isInteger(parsed) || parsed < min) {
throw new Error(`Environment variable ${key} must be an integer >= ${String(min)}.`);
}
return parsed;
}
function readOptionalStringEnv(key: "AGENT_STATE_ROOT", fallback: string): string {
const raw = process.env[key]?.trim();
if (!raw) {
return fallback;
}
return raw;
}
function readOptionalProjectContextPathEnv(
key: "AGENT_PROJECT_CONTEXT_PATH",
fallback: string,
): string {
const raw = process.env[key]?.trim();
if (!raw) {
return fallback;
}
return raw;
}
export function loadOrchestrationSettingsFromEnv(): Omit<
OrchestrationSettings,
"workspaceRoot" | "runtimeContext"
> {
export function loadOrchestrationSettingsFromEnv(
env: NodeJS.ProcessEnv = process.env,
): Omit<OrchestrationSettings, "workspaceRoot" | "runtimeContext"> {
const config = loadConfig(env);
return {
stateRoot: readOptionalStringEnv("AGENT_STATE_ROOT", ".ai_ops/state"),
projectContextPath: readOptionalProjectContextPathEnv(
"AGENT_PROJECT_CONTEXT_PATH",
".ai_ops/project-context.json",
),
maxDepth: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_DEPTH", 4, 1),
maxRetries: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_RETRIES", 2, 0),
maxChildren: readOptionalIntegerEnv("AGENT_RELATIONSHIP_MAX_CHILDREN", 4, 1),
stateRoot: config.orchestration.stateRoot,
projectContextPath: config.orchestration.projectContextPath,
maxDepth: config.orchestration.maxDepth,
maxRetries: config.orchestration.maxRetries,
maxChildren: config.orchestration.maxChildren,
};
}
@@ -115,6 +73,7 @@ export class SchemaDrivenExecutionEngine {
private readonly settings: OrchestrationSettings;
private readonly childrenByParent: Map<string, AgentManifest["relationships"]>;
private readonly manager: AgentManager;
private readonly mcpRegistry: McpRegistry;
constructor(input: {
manifest: AgentManifest | unknown;
@@ -125,17 +84,21 @@ export class SchemaDrivenExecutionEngine {
runtimeContext?: Record<string, string | number | boolean>;
};
manager?: AgentManager;
mcpRegistry?: McpRegistry;
config?: Readonly<AppConfig>;
}) {
this.manifest = parseAgentManifest(input.manifest);
const defaults = loadOrchestrationSettingsFromEnv();
const config = input.config ?? getConfig();
this.settings = {
workspaceRoot: resolve(input.settings?.workspaceRoot ?? process.cwd()),
stateRoot: resolve(input.settings?.stateRoot ?? defaults.stateRoot),
projectContextPath: resolve(input.settings?.projectContextPath ?? defaults.projectContextPath),
maxDepth: input.settings?.maxDepth ?? defaults.maxDepth,
maxRetries: input.settings?.maxRetries ?? defaults.maxRetries,
maxChildren: input.settings?.maxChildren ?? defaults.maxChildren,
stateRoot: resolve(input.settings?.stateRoot ?? config.orchestration.stateRoot),
projectContextPath: resolve(
input.settings?.projectContextPath ?? config.orchestration.projectContextPath,
),
maxDepth: input.settings?.maxDepth ?? config.orchestration.maxDepth,
maxRetries: input.settings?.maxRetries ?? config.orchestration.maxRetries,
maxChildren: input.settings?.maxChildren ?? config.orchestration.maxChildren,
runtimeContext: {
...(input.settings?.runtimeContext ?? {}),
},
@@ -149,7 +112,14 @@ export class SchemaDrivenExecutionEngine {
});
this.actorExecutors = toExecutorMap(input.actorExecutors);
this.manager = input.manager ?? new AgentManager(loadAgentManagerLimitsFromEnv());
this.manager =
input.manager ??
new AgentManager({
maxConcurrentAgents: config.agentManager.maxConcurrentAgents,
maxSessionAgents: config.agentManager.maxSessionAgents,
maxRecursiveDepth: config.agentManager.maxRecursiveDepth,
});
this.mcpRegistry = input.mcpRegistry ?? createDefaultMcpRegistry();
for (const persona of this.manifest.personas) {
this.personaRegistry.register({
@@ -227,6 +197,7 @@ export class SchemaDrivenExecutionEngine {
manager: this.manager,
managerSessionId,
projectContextStore: this.projectContextStore,
mcpRegistry: this.mcpRegistry,
},
);
try {

View File

@@ -9,14 +9,19 @@ import {
type DomainEventPayload,
type DomainEventType,
} from "./domain-events.js";
import { FailurePolicy } from "./failure-policy.js";
import {
PersistenceLifecycleObserver,
type PipelineLifecycleObserver,
} from "./lifecycle-observer.js";
import type { AgentManifest, PipelineEdge, PipelineNode, RouteCondition } from "./manifest.js";
import type { AgentManager, RecursiveChildIntent } from "./manager.js";
import type { McpRegistry } from "../mcp/handlers.js";
import { PersonaRegistry } from "./persona-registry.js";
import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js";
import {
FileSystemStateContextManager,
type NodeExecutionContext,
type SessionHistoryEntry,
type StoredSessionState,
} from "./state-context.js";
import type { JsonObject } from "./types.js";
@@ -59,11 +64,14 @@ export type PipelineExecutionRecord = {
export type PipelineRunSummary = {
sessionId: string;
status: PipelineAggregateStatus;
records: PipelineExecutionRecord[];
events: DomainEvent[];
finalState: StoredSessionState;
};
export type PipelineAggregateStatus = "success" | "failure";
export type PipelineExecutorOptions = {
workspaceRoot: string;
runtimeContext: Record<string, string | number | boolean>;
@@ -72,6 +80,10 @@ export type PipelineExecutorOptions = {
manager: AgentManager;
managerSessionId: string;
projectContextStore: FileSystemProjectContextStore;
mcpRegistry: McpRegistry;
failurePolicy?: FailurePolicy;
lifecycleObserver?: PipelineLifecycleObserver;
hardFailureThreshold?: number;
};
type QueueItem = {
@@ -110,16 +122,6 @@ type NodeExecutionOutcome = {
hardFailureAttempts: boolean[];
};
function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined {
if (status === "success") {
return "onTaskComplete";
}
if (status === "validation_fail") {
return "onValidationFail";
}
return undefined;
}
function shouldEdgeRun(
edge: PipelineEdge,
status: ActorResultStatus,
@@ -228,51 +230,6 @@ function toAbortError(signal: AbortSignal): Error {
return error;
}
function toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
function toErrorPayload(error: unknown): JsonObject {
const errorMessage = toErrorMessage(error);
return {
error: errorMessage,
};
}
function toFailureCodeFromError(error: unknown): string | undefined {
if (!(error instanceof Error)) {
return undefined;
}
const maybeCode = (error as NodeJS.ErrnoException).code;
return typeof maybeCode === "string" ? maybeCode : undefined;
}
function containsHardFailureSignal(value: string): boolean {
return /(timeout|timed out|network|econnreset|econnrefused|enotfound|403|forbidden)/i.test(value);
}
function inferHardFailure(result: ActorExecutionResult): boolean {
if (result.failureKind === "hard") {
return true;
}
if (result.status !== "failure") {
return false;
}
const payloadText = (() => {
const message = result.payload?.error;
return typeof message === "string" ? message : "";
})();
const codeText = result.failureCode ?? "";
return containsHardFailureSignal(`${codeText} ${payloadText}`);
}
function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload {
if (status === "success") {
return {
@@ -323,6 +280,9 @@ export class PipelineExecutor {
private readonly nodeById = new Map<string, PipelineNode>();
private readonly edgesBySource = new Map<string, PipelineEdge[]>();
private readonly domainEventBus = new DomainEventBus();
private readonly failurePolicy: FailurePolicy;
private readonly lifecycleObserver: PipelineLifecycleObserver;
private readonly hardFailureThreshold: number;
private managerRunCounter = 0;
constructor(
@@ -332,6 +292,17 @@ export class PipelineExecutor {
private readonly actorExecutors: ReadonlyMap<string, ActorExecutor>,
private readonly options: PipelineExecutorOptions,
) {
this.failurePolicy = options.failurePolicy ?? new FailurePolicy();
this.hardFailureThreshold = options.hardFailureThreshold ?? 2;
this.lifecycleObserver =
options.lifecycleObserver ??
new PersistenceLifecycleObserver({
personaRegistry: this.personaRegistry,
stateManager: this.stateManager,
projectContextStore: this.options.projectContextStore,
domainEventBus: this.domainEventBus,
});
for (const node of manifest.pipeline.nodes) {
this.nodeById.set(node.id, node);
}
@@ -418,9 +389,14 @@ export class PipelineExecutor {
sequentialHardFailures = 0;
}
if (sequentialHardFailures >= 2) {
if (
this.failurePolicy.shouldAbortAfterSequentialHardFailures(
sequentialHardFailures,
this.hardFailureThreshold,
)
) {
throw new Error(
"Hard failure threshold reached (>=2 sequential API/network/403 failures). Pipeline aborted.",
`Hard failure threshold reached (>=${String(this.hardFailureThreshold)} sequential API/network/403 failures). Pipeline aborted.`,
);
}
}
@@ -461,7 +437,8 @@ export class PipelineExecutor {
}
const finalState = await this.stateManager.readState(input.sessionId);
if (records.length > 0 && records[records.length - 1]?.status === "success") {
const status = this.computeAggregateStatus(records);
if (status === "success") {
await this.options.projectContextStore.patchState({
artifactPointers: {
[`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId),
@@ -471,12 +448,39 @@ export class PipelineExecutor {
return {
sessionId: input.sessionId,
status,
records,
events,
finalState,
};
}
private computeAggregateStatus(records: PipelineExecutionRecord[]): PipelineAggregateStatus {
if (records.length === 0) {
return "failure";
}
const finalStatusByNode = new Map<string, ActorResultStatus>();
for (const record of records) {
finalStatusByNode.set(record.nodeId, record.status);
}
const executedNodeIds = new Set(finalStatusByNode.keys());
const terminalNodeIds = [...executedNodeIds].filter((nodeId) => {
const outgoingEdges = this.edgesBySource.get(nodeId) ?? [];
return !outgoingEdges.some((edge) => executedNodeIds.has(edge.to));
});
const allTerminalNodesSucceeded =
terminalNodeIds.length > 0 &&
terminalNodeIds.every((nodeId) => finalStatusByNode.get(nodeId) === "success");
const hasCriticalPathFailure = [...finalStatusByNode.values()].some(
(status) => status === "failure",
);
return allTerminalNodesSucceeded && !hasCriticalPathFailure ? "success" : "failure";
}
private buildExecutionGroups(frontier: QueueItem[]): ExecutionGroup[] {
const groupsByKey = new Map<string, ExecutionGroup>();
@@ -611,7 +615,7 @@ export class PipelineExecutor {
customEvents: result.events,
});
await this.persistNodeAttempt({
await this.lifecycleObserver.onNodeAttempt({
sessionId,
node,
attempt,
@@ -629,7 +633,7 @@ export class PipelineExecutor {
});
nodeEvents.push(...domainEvents);
const hardFailure = inferHardFailure(result);
const hardFailure = this.failurePolicy.isHardFailure(result);
hardFailureAttempts.push(hardFailure);
const payloadForNext = result.payload ?? context.handoff.payload;
@@ -721,16 +725,15 @@ export class PipelineExecutor {
throw toAbortError(input.signal);
}
const failureCode = toFailureCodeFromError(error);
const failureKind = containsHardFailureSignal(`${failureCode ?? ""} ${toErrorMessage(error)}`)
? "hard"
: "soft";
const classified = this.failurePolicy.classifyFailureFromError(error);
return {
status: "failure",
payload: toErrorPayload(error),
failureCode,
failureKind,
payload: {
error: classified.payloadErrorMessage,
},
failureCode: classified.failureCode,
failureKind: classified.failureKind,
};
}
}
@@ -763,68 +766,4 @@ export class PipelineExecutor {
}),
);
}
private async persistNodeAttempt(input: {
sessionId: string;
node: PipelineNode;
attempt: number;
result: ActorExecutionResult;
domainEvents: DomainEvent[];
}): Promise<void> {
const behaviorEvent = toBehaviorEvent(input.result.status);
const behaviorPatch = behaviorEvent
? await this.personaRegistry.emitBehaviorEvent({
personaId: input.node.personaId,
event: behaviorEvent,
sessionId: input.sessionId,
nodeId: input.node.id,
payload: input.result.payload ?? {},
})
: {};
const legacyHistoryEvent: SessionHistoryEntry = {
nodeId: input.node.id,
event: input.result.status,
timestamp: new Date().toISOString(),
...(input.result.payload ? { data: input.result.payload } : {}),
};
const domainHistoryEvents: SessionHistoryEntry[] = input.domainEvents.map((event) => ({
nodeId: input.node.id,
event: event.type,
timestamp: event.timestamp,
data: {
source: event.source,
attempt: event.attempt,
...(event.payload.summary ? { summary: event.payload.summary } : {}),
...(event.payload.errorCode ? { errorCode: event.payload.errorCode } : {}),
...(event.payload.artifactPointer ? { artifactPointer: event.payload.artifactPointer } : {}),
...(event.payload.details ? { details: event.payload.details } : {}),
},
}));
await this.stateManager.patchState(input.sessionId, {
...(input.result.stateFlags ? { flags: input.result.stateFlags } : {}),
metadata: {
...(input.result.stateMetadata ?? {}),
...behaviorPatch,
},
historyEvent: legacyHistoryEvent,
historyEvents: domainHistoryEvents,
});
for (const event of input.domainEvents) {
await this.domainEventBus.publish(event);
}
const patch: ProjectContextPatch = {
...(input.result.projectContextPatch ?? {}),
artifactPointers: {
[`sessions/${input.sessionId}/last_completed_node`]: input.node.id,
[`sessions/${input.sessionId}/last_attempt`]: String(input.attempt),
...(input.result.projectContextPatch?.artifactPointers ?? {}),
},
};
await this.options.projectContextStore.patchState(patch);
}
}

View File

@@ -1,7 +1,10 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, resolve } from "node:path";
import { readFile } from "node:fs/promises";
import { resolve } from "node:path";
import { withFileLock, writeUtf8FileAtomic } from "./file-persistence.js";
import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./types.js";
export const PROJECT_CONTEXT_SCHEMA_VERSION = 1;
export type ProjectTaskStatus = "pending" | "in_progress" | "blocked" | "done";
export type ProjectTask = {
@@ -13,6 +16,7 @@ export type ProjectTask = {
};
export type ProjectContextState = {
schemaVersion: number;
globalFlags: Record<string, boolean>;
artifactPointers: Record<string, string>;
taskQueue: ProjectTask[];
@@ -27,6 +31,7 @@ export type ProjectContextPatch = {
};
const DEFAULT_PROJECT_CONTEXT: ProjectContextState = {
schemaVersion: PROJECT_CONTEXT_SCHEMA_VERSION,
globalFlags: {},
artifactPointers: {},
taskQueue: [],
@@ -103,20 +108,41 @@ function toStringRecord(value: unknown, label: string): Record<string, string> {
return out;
}
function toSchemaVersion(value: unknown): number {
if (value === undefined) {
return PROJECT_CONTEXT_SCHEMA_VERSION;
}
if (typeof value !== "number" || !Number.isInteger(value) || value < 1) {
throw new Error("Project context schemaVersion must be an integer >= 1.");
}
return value;
}
function toProjectContextState(value: unknown): ProjectContextState {
if (!isRecord(value)) {
throw new Error("Project context store is malformed.");
}
const tasksRaw = value.taskQueue;
if (!Array.isArray(tasksRaw)) {
if (tasksRaw !== undefined && !Array.isArray(tasksRaw)) {
throw new Error("Project context taskQueue is malformed.");
}
return {
globalFlags: toBooleanRecord(value.globalFlags, "Project context globalFlags"),
artifactPointers: toStringRecord(value.artifactPointers, "Project context artifactPointers"),
taskQueue: tasksRaw.map((task, index) => toProjectTask(task, `Project context taskQueue[${String(index)}]`)),
schemaVersion: toSchemaVersion(value.schemaVersion),
globalFlags:
value.globalFlags === undefined
? { ...DEFAULT_PROJECT_CONTEXT.globalFlags }
: toBooleanRecord(value.globalFlags, "Project context globalFlags"),
artifactPointers:
value.artifactPointers === undefined
? { ...DEFAULT_PROJECT_CONTEXT.artifactPointers }
: toStringRecord(value.artifactPointers, "Project context artifactPointers"),
taskQueue: (tasksRaw ?? []).map((task, index) =>
toProjectTask(task, `Project context taskQueue[${String(index)}]`),
),
};
}
@@ -142,10 +168,12 @@ function mergeUpsertTasks(current: ProjectTask[], upserts: ProjectTask[]): Proje
export class FileSystemProjectContextStore {
private readonly filePath: string;
private readonly lockPath: string;
private queue: Promise<void> = Promise.resolve();
constructor(input: { filePath: string }) {
this.filePath = resolve(input.filePath);
this.lockPath = `${this.filePath}.lock`;
}
getFilePath(): string {
@@ -153,6 +181,57 @@ export class FileSystemProjectContextStore {
}
async readState(): Promise<ProjectContextState> {
return this.readStateFromDisk();
}
async writeState(state: ProjectContextState): Promise<void> {
await this.runSerialized(async () => {
await withFileLock(this.lockPath, async () => {
const normalizedState = toProjectContextState(state);
await writeUtf8FileAtomic(this.filePath, `${JSON.stringify(normalizedState, null, 2)}\n`);
});
});
}
async patchState(patch: ProjectContextPatch): Promise<ProjectContextState> {
return this.runSerialized(async () =>
withFileLock(this.lockPath, async () => {
const current = await this.readStateFromDisk();
if (patch.globalFlags) {
Object.assign(current.globalFlags, patch.globalFlags);
}
if (patch.artifactPointers) {
Object.assign(current.artifactPointers, patch.artifactPointers);
}
if (patch.taskQueue) {
current.taskQueue = patch.taskQueue.map((task, index) =>
toProjectTask(task, `Project context patch taskQueue[${String(index)}]`),
);
}
if (patch.enqueueTasks && patch.enqueueTasks.length > 0) {
current.taskQueue.push(
...patch.enqueueTasks.map((task, index) =>
toProjectTask(task, `Project context patch enqueueTasks[${String(index)}]`),
),
);
}
if (patch.upsertTasks && patch.upsertTasks.length > 0) {
const upsertTasks = patch.upsertTasks.map((task, index) =>
toProjectTask(task, `Project context patch upsertTasks[${String(index)}]`),
);
current.taskQueue = mergeUpsertTasks(current.taskQueue, upsertTasks);
}
current.schemaVersion = Math.max(current.schemaVersion, PROJECT_CONTEXT_SCHEMA_VERSION);
await writeUtf8FileAtomic(this.filePath, `${JSON.stringify(current, null, 2)}\n`);
return current;
}),
);
}
private async readStateFromDisk(): Promise<ProjectContextState> {
try {
const content = await readFile(this.filePath, "utf8");
const parsed = JSON.parse(content) as unknown;
@@ -165,48 +244,6 @@ export class FileSystemProjectContextStore {
}
}
async writeState(state: ProjectContextState): Promise<void> {
await this.runSerialized(async () => {
await mkdir(dirname(this.filePath), { recursive: true });
await writeFile(this.filePath, `${JSON.stringify(state, null, 2)}\n`, "utf8");
});
}
async patchState(patch: ProjectContextPatch): Promise<ProjectContextState> {
return this.runSerialized(async () => {
const current = await this.readState();
if (patch.globalFlags) {
Object.assign(current.globalFlags, patch.globalFlags);
}
if (patch.artifactPointers) {
Object.assign(current.artifactPointers, patch.artifactPointers);
}
if (patch.taskQueue) {
current.taskQueue = patch.taskQueue.map((task, index) =>
toProjectTask(task, `Project context patch taskQueue[${String(index)}]`),
);
}
if (patch.enqueueTasks && patch.enqueueTasks.length > 0) {
current.taskQueue.push(
...patch.enqueueTasks.map((task, index) =>
toProjectTask(task, `Project context patch enqueueTasks[${String(index)}]`),
),
);
}
if (patch.upsertTasks && patch.upsertTasks.length > 0) {
const upsertTasks = patch.upsertTasks.map((task, index) =>
toProjectTask(task, `Project context patch upsertTasks[${String(index)}]`),
);
current.taskQueue = mergeUpsertTasks(current.taskQueue, upsertTasks);
}
await mkdir(dirname(this.filePath), { recursive: true });
await writeFile(this.filePath, `${JSON.stringify(current, null, 2)}\n`, "utf8");
return current;
});
}
private runSerialized<T>(operation: () => Promise<T>): Promise<T> {
const run = this.queue.then(operation, operation);
this.queue = run.then(

View File

@@ -1,3 +1,4 @@
import { getConfig, loadConfig, type AppConfig } from "../config.js";
import { AgentManager, type AgentManagerLimits } from "./manager.js";
import {
createDefaultResourceProvisioningOrchestrator,
@@ -5,143 +6,58 @@ import {
type ResourceProvisioningOrchestrator,
} from "./provisioning.js";
const DEFAULT_LIMITS: AgentManagerLimits = {
maxConcurrentAgents: 4,
maxSessionAgents: 2,
maxRecursiveDepth: 3,
};
const DEFAULT_PROVISIONING_CONFIG: BuiltInProvisioningConfigInput = {
gitWorktree: {
rootDirectory: ".ai_ops/worktrees",
baseRef: "HEAD",
},
portRange: {
basePort: 36000,
blockSize: 32,
blockCount: 512,
primaryPortOffset: 0,
lockDirectory: ".ai_ops/locks/ports",
},
};
function readPositiveIntegerEnv(
key: "AGENT_MAX_CONCURRENT" | "AGENT_MAX_SESSION" | "AGENT_MAX_RECURSIVE_DEPTH",
fallback: number,
): number {
const rawValue = process.env[key]?.trim();
if (!rawValue) {
return fallback;
}
const parsed = Number(rawValue);
if (!Number.isInteger(parsed) || parsed < 1) {
throw new Error(`Environment variable ${key} must be a positive integer.`);
}
return parsed;
}
function readOptionalStringEnv(key: string, fallback: string): string {
const rawValue = process.env[key]?.trim();
if (!rawValue) {
return fallback;
}
return rawValue;
}
function readIntegerEnv(
key: string,
fallback: number,
bounds: {
min: number;
},
): number {
const rawValue = process.env[key]?.trim();
if (!rawValue) {
return fallback;
}
const parsed = Number(rawValue);
if (!Number.isInteger(parsed) || parsed < bounds.min) {
throw new Error(`Environment variable ${key} must be an integer >= ${String(bounds.min)}.`);
}
return parsed;
}
export function loadAgentManagerLimitsFromEnv(): AgentManagerLimits {
function toProvisioningConfig(input: Readonly<AppConfig>): BuiltInProvisioningConfigInput {
return {
maxConcurrentAgents: readPositiveIntegerEnv(
"AGENT_MAX_CONCURRENT",
DEFAULT_LIMITS.maxConcurrentAgents,
),
maxSessionAgents: readPositiveIntegerEnv(
"AGENT_MAX_SESSION",
DEFAULT_LIMITS.maxSessionAgents,
),
maxRecursiveDepth: readPositiveIntegerEnv(
"AGENT_MAX_RECURSIVE_DEPTH",
DEFAULT_LIMITS.maxRecursiveDepth,
),
gitWorktree: {
rootDirectory: input.provisioning.gitWorktree.rootDirectory,
baseRef: input.provisioning.gitWorktree.baseRef,
},
portRange: {
basePort: input.provisioning.portRange.basePort,
blockSize: input.provisioning.portRange.blockSize,
blockCount: input.provisioning.portRange.blockCount,
primaryPortOffset: input.provisioning.portRange.primaryPortOffset,
lockDirectory: input.provisioning.portRange.lockDirectory,
},
};
}
export function loadAgentManagerLimitsFromEnv(env: NodeJS.ProcessEnv = process.env): AgentManagerLimits {
const config = loadConfig(env);
return {
maxConcurrentAgents: config.agentManager.maxConcurrentAgents,
maxSessionAgents: config.agentManager.maxSessionAgents,
maxRecursiveDepth: config.agentManager.maxRecursiveDepth,
};
}
let managerSingleton: AgentManager | undefined;
let provisioningSingleton: ResourceProvisioningOrchestrator | undefined;
export function getAgentManager(): AgentManager {
export function getAgentManager(config: Readonly<AppConfig> = getConfig()): AgentManager {
if (!managerSingleton) {
managerSingleton = new AgentManager(loadAgentManagerLimitsFromEnv());
managerSingleton = new AgentManager({
maxConcurrentAgents: config.agentManager.maxConcurrentAgents,
maxSessionAgents: config.agentManager.maxSessionAgents,
maxRecursiveDepth: config.agentManager.maxRecursiveDepth,
});
}
return managerSingleton;
}
export function loadProvisioningConfigFromEnv(): BuiltInProvisioningConfigInput {
return {
gitWorktree: {
rootDirectory: readOptionalStringEnv(
"AGENT_WORKTREE_ROOT",
DEFAULT_PROVISIONING_CONFIG.gitWorktree?.rootDirectory ?? ".ai_ops/worktrees",
),
baseRef: readOptionalStringEnv(
"AGENT_WORKTREE_BASE_REF",
DEFAULT_PROVISIONING_CONFIG.gitWorktree?.baseRef ?? "HEAD",
),
},
portRange: {
basePort: readIntegerEnv(
"AGENT_PORT_BASE",
DEFAULT_PROVISIONING_CONFIG.portRange?.basePort ?? 36000,
{ min: 1 },
),
blockSize: readIntegerEnv(
"AGENT_PORT_BLOCK_SIZE",
DEFAULT_PROVISIONING_CONFIG.portRange?.blockSize ?? 32,
{ min: 1 },
),
blockCount: readIntegerEnv(
"AGENT_PORT_BLOCK_COUNT",
DEFAULT_PROVISIONING_CONFIG.portRange?.blockCount ?? 512,
{ min: 1 },
),
primaryPortOffset: readIntegerEnv(
"AGENT_PORT_PRIMARY_OFFSET",
DEFAULT_PROVISIONING_CONFIG.portRange?.primaryPortOffset ?? 0,
{ min: 0 },
),
lockDirectory: readOptionalStringEnv(
"AGENT_PORT_LOCK_DIR",
DEFAULT_PROVISIONING_CONFIG.portRange?.lockDirectory ?? ".ai_ops/locks/ports",
),
},
};
export function loadProvisioningConfigFromEnv(
env: NodeJS.ProcessEnv = process.env,
): BuiltInProvisioningConfigInput {
return toProvisioningConfig(loadConfig(env));
}
export function getResourceProvisioningOrchestrator(): ResourceProvisioningOrchestrator {
export function getResourceProvisioningOrchestrator(
config: Readonly<AppConfig> = getConfig(),
): ResourceProvisioningOrchestrator {
if (!provisioningSingleton) {
provisioningSingleton = createDefaultResourceProvisioningOrchestrator(
loadProvisioningConfigFromEnv(),
toProvisioningConfig(config),
);
}
return provisioningSingleton;

View File

@@ -1,5 +1,6 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { mkdir, readFile } from "node:fs/promises";
import { dirname, resolve } from "node:path";
import { writeUtf8FileAtomic } from "./file-persistence.js";
import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./types.js";
export type SessionHistoryEntry = {
@@ -200,7 +201,7 @@ export class FileSystemStateContextManager {
async writeState(sessionId: string, state: StoredSessionState): Promise<void> {
const path = toStatePath(this.rootDirectory, sessionId);
await mkdir(dirname(path), { recursive: true });
await writeFile(path, `${JSON.stringify(state, null, 2)}\n`, "utf8");
await writeUtf8FileAtomic(path, `${JSON.stringify(state, null, 2)}\n`);
}
async patchState(
@@ -248,7 +249,7 @@ export class FileSystemStateContextManager {
const path = toHandoffPath(this.rootDirectory, sessionId, handoff.nodeId);
await mkdir(dirname(path), { recursive: true });
await writeFile(path, `${JSON.stringify(nodeHandoff, null, 2)}\n`, "utf8");
await writeUtf8FileAtomic(path, `${JSON.stringify(nodeHandoff, null, 2)}\n`);
return nodeHandoff;
}