Wire pipeline DAG execution to manager with events and project context

This commit is contained in:
2026-02-23 13:14:20 -05:00
parent 53af0d44cd
commit 889087daa1
13 changed files with 1668 additions and 380 deletions

112
src/agents/domain-events.ts Normal file
View File

@@ -0,0 +1,112 @@
import { randomUUID } from "node:crypto";
import type { JsonObject } from "./types.js";
export type PlanningDomainEventType = "requirements_defined" | "tasks_planned";
export type ExecutionDomainEventType = "code_committed" | "task_blocked";
export type ValidationDomainEventType = "validation_passed" | "validation_failed";
export type IntegrationDomainEventType = "branch_merged";
export type DomainEventType =
| PlanningDomainEventType
| ExecutionDomainEventType
| ValidationDomainEventType
| IntegrationDomainEventType;
export type DomainEventSource = "pipeline" | "actor";
export type DomainEventPayload = {
summary?: string;
details?: JsonObject;
errorCode?: string;
artifactPointer?: string;
};
export type DomainEvent<TType extends DomainEventType = DomainEventType> = {
id: string;
type: TType;
source: DomainEventSource;
sessionId: string;
nodeId: string;
attempt: number;
timestamp: string;
payload: DomainEventPayload;
};
export type DomainEventEmission<TType extends DomainEventType = DomainEventType> = {
type: TType;
payload?: DomainEventPayload;
};
export type DomainEventHandler<TType extends DomainEventType = DomainEventType> = (
event: DomainEvent<TType>,
) => Promise<void> | void;
const DOMAIN_EVENT_TYPES = new Set<DomainEventType>([
"requirements_defined",
"tasks_planned",
"code_committed",
"task_blocked",
"validation_passed",
"validation_failed",
"branch_merged",
]);
export function isDomainEventType(value: string): value is DomainEventType {
return DOMAIN_EVENT_TYPES.has(value as DomainEventType);
}
export function createDomainEvent(input: {
type: DomainEventType;
source: DomainEventSource;
sessionId: string;
nodeId: string;
attempt: number;
payload?: DomainEventPayload;
}): DomainEvent {
return {
id: randomUUID(),
type: input.type,
source: input.source,
sessionId: input.sessionId,
nodeId: input.nodeId,
attempt: input.attempt,
timestamp: new Date().toISOString(),
payload: input.payload ?? {},
};
}
export class DomainEventBus {
private readonly handlers = new Map<DomainEventType, Set<DomainEventHandler>>();
subscribe<TType extends DomainEventType>(
type: TType,
handler: DomainEventHandler<TType>,
): () => void {
const typedHandler = handler as DomainEventHandler;
const set = this.handlers.get(type);
if (set) {
set.add(typedHandler);
} else {
this.handlers.set(type, new Set([typedHandler]));
}
return () => {
const current = this.handlers.get(type);
current?.delete(typedHandler);
if (current && current.size === 0) {
this.handlers.delete(type);
}
};
}
async publish<TType extends DomainEventType>(event: DomainEvent<TType>): Promise<void> {
const set = this.handlers.get(event.type);
if (!set || set.size === 0) {
return;
}
for (const handler of set) {
await handler(event);
}
}
}

View File

@@ -297,6 +297,10 @@ export class AgentManager {
}
}
/**
* @deprecated Prefer running recursive topologies through SchemaDrivenExecutionEngine.runSession.
* This method remains available for internal orchestration and low-level tests.
*/
async runRecursiveAgent<TIntent extends RecursiveChildIntent, TOutput>(input: {
sessionId: string;
depth: number;

View File

@@ -1,4 +1,5 @@
import { isRecord } from "./types.js";
import { isDomainEventType, type DomainEventType } from "./domain-events.js";
export type ToolClearancePolicy = {
allowlist: string[];
@@ -45,23 +46,32 @@ export type PipelineConstraint = {
maxRetries?: number;
};
export type NodeTopologyKind = "sequential" | "parallel" | "hierarchical" | "retry-unrolled";
export type PipelineNodeTopology = {
kind: NodeTopologyKind;
blockId?: string;
};
export type PipelineNode = {
id: string;
actorId: string;
personaId: string;
constraints?: PipelineConstraint;
topology?: PipelineNodeTopology;
};
export type PipelineEdge = {
from: string;
to: string;
on:
on?:
| "success"
| "validation_fail"
| "failure"
| "always"
| "onTaskComplete"
| "onValidationFail";
event?: DomainEventType;
when?: RouteCondition[];
};
@@ -71,7 +81,7 @@ export type PipelineGraph = {
edges: PipelineEdge[];
};
export type TopologyKind = "hierarchical" | "retry-unrolled" | "sequential";
export type TopologyKind = "hierarchical" | "parallel" | "retry-unrolled" | "sequential";
export type TopologyConstraint = {
maxDepth: number;
@@ -216,6 +226,34 @@ function parsePipelineNode(value: unknown): PipelineNode {
throw new Error("Pipeline node must be an object.");
}
const topology = value.topology;
let parsedTopology: PipelineNodeTopology | undefined;
if (topology !== undefined) {
if (!isRecord(topology)) {
throw new Error("Pipeline node topology must be an object when provided.");
}
const kind = readString(topology, "kind");
if (
kind !== "sequential" &&
kind !== "parallel" &&
kind !== "hierarchical" &&
kind !== "retry-unrolled"
) {
throw new Error(`Pipeline node topology kind "${kind}" is not supported.`);
}
const blockIdRaw = topology.blockId;
if (blockIdRaw !== undefined && (typeof blockIdRaw !== "string" || blockIdRaw.trim().length === 0)) {
throw new Error("Pipeline node topology blockId must be a non-empty string when provided.");
}
parsedTopology = {
kind,
...(typeof blockIdRaw === "string" ? { blockId: blockIdRaw.trim() } : {}),
};
}
const constraints = isRecord(value.constraints)
? {
maxRetries: readOptionalInteger(value.constraints, "maxRetries", { min: 0 }),
@@ -227,6 +265,7 @@ function parsePipelineNode(value: unknown): PipelineNode {
actorId: readString(value, "actorId"),
personaId: readString(value, "personaId"),
constraints,
...(parsedTopology ? { topology: parsedTopology } : {}),
};
}
@@ -235,8 +274,7 @@ function parsePipelineEdge(value: unknown): PipelineEdge {
throw new Error("Pipeline edge must be an object.");
}
const on = readString(value, "on");
const validEvents: PipelineEdge["on"][] = [
const validEvents: NonNullable<PipelineEdge["on"]>[] = [
"success",
"validation_fail",
"failure",
@@ -245,8 +283,29 @@ function parsePipelineEdge(value: unknown): PipelineEdge {
"onValidationFail",
];
if (!validEvents.includes(on as PipelineEdge["on"])) {
throw new Error(`Pipeline edge field \"on\" has unsupported event \"${on}\".`);
const rawOn = value.on;
let on: PipelineEdge["on"];
if (rawOn !== undefined) {
if (typeof rawOn !== "string" || !validEvents.includes(rawOn as NonNullable<PipelineEdge["on"]>)) {
throw new Error(`Pipeline edge field "on" has unsupported event "${String(rawOn)}".`);
}
on = rawOn as NonNullable<PipelineEdge["on"]>;
}
const rawDomainEvent = value.event;
let event: DomainEventType | undefined;
if (rawDomainEvent !== undefined) {
if (typeof rawDomainEvent !== "string" || !isDomainEventType(rawDomainEvent)) {
throw new Error(`Pipeline edge field "event" has unsupported domain event "${String(rawDomainEvent)}".`);
}
event = rawDomainEvent;
}
if (!on && !event) {
throw new Error('Pipeline edge must provide either an "on" trigger or an "event" trigger.');
}
if (on && event) {
throw new Error('Pipeline edge cannot define both "on" and "event" triggers simultaneously.');
}
const rawWhen = value.when;
@@ -263,7 +322,8 @@ function parsePipelineEdge(value: unknown): PipelineEdge {
return {
from: readString(value, "from"),
to: readString(value, "to"),
on: on as PipelineEdge["on"],
...(on ? { on } : {}),
...(event ? { event } : {}),
...(when.length > 0 ? { when } : {}),
};
}
@@ -298,7 +358,7 @@ function parseTopologies(value: unknown): TopologyKind[] {
throw new Error("Manifest topologies must be a non-empty array.");
}
const valid = new Set<TopologyKind>(["hierarchical", "retry-unrolled", "sequential"]);
const valid = new Set<TopologyKind>(["hierarchical", "parallel", "retry-unrolled", "sequential"]);
const result: TopologyKind[] = [];
for (const item of value) {
@@ -498,6 +558,12 @@ export function parseAgentManifest(input: unknown): AgentManifest {
if (!personaIds.has(node.personaId)) {
throw new Error(`Pipeline node \"${node.id}\" references unknown persona \"${node.personaId}\".`);
}
if (node.topology && !manifest.topologies.includes(node.topology.kind as TopologyKind)) {
throw new Error(
`Pipeline node "${node.id}" topology "${node.topology.kind}" is not listed in manifest topologies.`,
);
}
}
assertPipelineDag(manifest.pipeline);

View File

@@ -1,17 +1,21 @@
import { resolve } from "node:path";
import { parseAgentManifest, type AgentManifest } from "./manifest.js";
import { AgentManager } from "./manager.js";
import {
PersonaRegistry,
type PersonaBehaviorEvent,
type PersonaBehaviorHandler,
} from "./persona-registry.js";
import { PipelineExecutor, type ActorExecutor, type PipelineRunSummary } from "./pipeline.js";
import { FileSystemProjectContextStore } from "./project-context.js";
import { loadAgentManagerLimitsFromEnv } from "./runtime.js";
import { FileSystemStateContextManager, type StoredSessionState } from "./state-context.js";
import type { JsonObject } from "./types.js";
export type OrchestrationSettings = {
workspaceRoot: string;
stateRoot: string;
projectContextPath: string;
maxDepth: number;
maxRetries: number;
maxChildren: number;
@@ -51,12 +55,27 @@ function readOptionalStringEnv(key: "AGENT_STATE_ROOT", fallback: string): strin
return raw;
}
function readOptionalProjectContextPathEnv(
key: "AGENT_PROJECT_CONTEXT_PATH",
fallback: string,
): string {
const raw = process.env[key]?.trim();
if (!raw) {
return fallback;
}
return raw;
}
export function loadOrchestrationSettingsFromEnv(): Omit<
OrchestrationSettings,
"workspaceRoot" | "runtimeContext"
> {
return {
stateRoot: readOptionalStringEnv("AGENT_STATE_ROOT", ".ai_ops/state"),
projectContextPath: readOptionalProjectContextPathEnv(
"AGENT_PROJECT_CONTEXT_PATH",
".ai_ops/project-context.json",
),
maxDepth: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_DEPTH", 4, 1),
maxRetries: readOptionalIntegerEnv("AGENT_TOPOLOGY_MAX_RETRIES", 2, 0),
maxChildren: readOptionalIntegerEnv("AGENT_RELATIONSHIP_MAX_CHILDREN", 4, 1),
@@ -91,9 +110,11 @@ export class SchemaDrivenExecutionEngine {
private readonly manifest: AgentManifest;
private readonly personaRegistry = new PersonaRegistry();
private readonly stateManager: FileSystemStateContextManager;
private readonly projectContextStore: FileSystemProjectContextStore;
private readonly actorExecutors: ReadonlyMap<string, ActorExecutor>;
private readonly settings: OrchestrationSettings;
private readonly childrenByParent: Map<string, AgentManifest["relationships"]>;
private readonly manager: AgentManager;
constructor(input: {
manifest: AgentManifest | unknown;
@@ -103,6 +124,7 @@ export class SchemaDrivenExecutionEngine {
workspaceRoot?: string;
runtimeContext?: Record<string, string | number | boolean>;
};
manager?: AgentManager;
}) {
this.manifest = parseAgentManifest(input.manifest);
@@ -110,6 +132,7 @@ export class SchemaDrivenExecutionEngine {
this.settings = {
workspaceRoot: resolve(input.settings?.workspaceRoot ?? process.cwd()),
stateRoot: resolve(input.settings?.stateRoot ?? defaults.stateRoot),
projectContextPath: resolve(input.settings?.projectContextPath ?? defaults.projectContextPath),
maxDepth: input.settings?.maxDepth ?? defaults.maxDepth,
maxRetries: input.settings?.maxRetries ?? defaults.maxRetries,
maxChildren: input.settings?.maxChildren ?? defaults.maxChildren,
@@ -121,8 +144,12 @@ export class SchemaDrivenExecutionEngine {
this.stateManager = new FileSystemStateContextManager({
rootDirectory: this.settings.stateRoot,
});
this.projectContextStore = new FileSystemProjectContextStore({
filePath: this.settings.projectContextPath,
});
this.actorExecutors = toExecutorMap(input.actorExecutors);
this.manager = input.manager ?? new AgentManager(loadAgentManagerLimitsFromEnv());
for (const persona of this.manifest.personas) {
this.personaRegistry.register({
@@ -182,7 +209,11 @@ export class SchemaDrivenExecutionEngine {
sessionId: string;
initialPayload: JsonObject;
initialState?: Partial<StoredSessionState>;
signal?: AbortSignal;
}): Promise<PipelineRunSummary> {
const managerSessionId = `${input.sessionId}__pipeline`;
const managerSession = this.manager.createSession(managerSessionId);
const executor = new PipelineExecutor(
this.manifest,
this.personaRegistry,
@@ -193,14 +224,21 @@ export class SchemaDrivenExecutionEngine {
runtimeContext: this.settings.runtimeContext,
maxDepth: Math.min(this.settings.maxDepth, this.manifest.topologyConstraints.maxDepth),
maxRetries: Math.min(this.settings.maxRetries, this.manifest.topologyConstraints.maxRetries),
manager: this.manager,
managerSessionId,
projectContextStore: this.projectContextStore,
},
);
return executor.run({
sessionId: input.sessionId,
initialPayload: input.initialPayload,
initialState: input.initialState,
});
try {
return await executor.run({
sessionId: input.sessionId,
initialPayload: input.initialPayload,
initialState: input.initialState,
signal: input.signal,
});
} finally {
managerSession.close();
}
}
private assertRelationshipConstraints(): void {

View File

@@ -1,8 +1,18 @@
import { access } from "node:fs/promises";
import { constants as fsConstants } from "node:fs";
import { access } from "node:fs/promises";
import { resolve } from "node:path";
import {
createDomainEvent,
DomainEventBus,
type DomainEvent,
type DomainEventEmission,
type DomainEventPayload,
type DomainEventType,
} from "./domain-events.js";
import type { AgentManifest, PipelineEdge, PipelineNode, RouteCondition } from "./manifest.js";
import type { AgentManager, RecursiveChildIntent } from "./manager.js";
import { PersonaRegistry } from "./persona-registry.js";
import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./project-context.js";
import {
FileSystemStateContextManager,
type NodeExecutionContext,
@@ -12,12 +22,17 @@ import {
import type { JsonObject } from "./types.js";
export type ActorResultStatus = "success" | "validation_fail" | "failure";
export type ActorFailureKind = "soft" | "hard";
export type ActorExecutionResult = {
status: ActorResultStatus;
payload?: JsonObject;
stateFlags?: Record<string, boolean>;
stateMetadata?: JsonObject;
events?: DomainEventEmission[];
projectContextPatch?: ProjectContextPatch;
failureKind?: ActorFailureKind;
failureCode?: string;
};
export type ActorExecutionInput = {
@@ -25,6 +40,7 @@ export type ActorExecutionInput = {
node: PipelineNode;
prompt: string;
context: NodeExecutionContext;
signal: AbortSignal;
toolClearance: {
allowlist: string[];
banlist: string[];
@@ -38,11 +54,13 @@ export type PipelineExecutionRecord = {
depth: number;
attempt: number;
status: ActorResultStatus;
emittedEvents: DomainEventType[];
};
export type PipelineRunSummary = {
sessionId: string;
records: PipelineExecutionRecord[];
events: DomainEvent[];
finalState: StoredSessionState;
};
@@ -51,6 +69,9 @@ export type PipelineExecutorOptions = {
runtimeContext: Record<string, string | number | boolean>;
maxDepth: number;
maxRetries: number;
manager: AgentManager;
managerSessionId: string;
projectContextStore: FileSystemProjectContextStore;
};
type QueueItem = {
@@ -58,6 +79,37 @@ type QueueItem = {
depth: number;
};
type ExecutionGroup = {
concurrent: boolean;
items: QueueItem[];
};
type RetryIntent = RecursiveChildIntent & {
context: {
attempt: number;
};
};
type NodeAttemptResult = {
attempt: number;
depth: number;
result: ActorExecutionResult;
payloadForNext: JsonObject;
domainEvents: DomainEvent[];
hardFailure: boolean;
};
type NodeExecutionOutcome = {
queueItem: QueueItem;
finalResult: ActorExecutionResult;
finalPayload: JsonObject;
finalAttempt: number;
finalEventTypes: DomainEventType[];
records: PipelineExecutionRecord[];
events: DomainEvent[];
hardFailureAttempts: boolean[];
};
function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValidationFail" | undefined {
if (status === "success") {
return "onTaskComplete";
@@ -68,7 +120,15 @@ function toBehaviorEvent(status: ActorResultStatus): "onTaskComplete" | "onValid
return undefined;
}
function shouldEdgeRun(edge: PipelineEdge, status: ActorResultStatus): boolean {
function shouldEdgeRun(
edge: PipelineEdge,
status: ActorResultStatus,
emittedEventTypes: ReadonlySet<DomainEventType>,
): boolean {
if (edge.event) {
return emittedEventTypes.has(edge.event);
}
if (edge.on === "always") {
return true;
}
@@ -136,9 +196,134 @@ async function edgeConditionsSatisfied(
return true;
}
function throwIfAborted(signal?: AbortSignal): void {
if (!signal?.aborted) {
return;
}
if (signal.reason instanceof Error) {
throw signal.reason;
}
const error = new Error(
signal.reason === undefined
? "The pipeline run was aborted."
: `The pipeline run was aborted: ${String(signal.reason)}.`,
);
error.name = "AbortError";
throw error;
}
function toAbortError(signal: AbortSignal): Error {
if (signal.reason instanceof Error) {
return signal.reason;
}
const error = new Error(
signal.reason === undefined
? "The pipeline run was aborted."
: `The pipeline run was aborted: ${String(signal.reason)}.`,
);
error.name = "AbortError";
return error;
}
function toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
function toErrorPayload(error: unknown): JsonObject {
const errorMessage = toErrorMessage(error);
return {
error: errorMessage,
};
}
function toFailureCodeFromError(error: unknown): string | undefined {
if (!(error instanceof Error)) {
return undefined;
}
const maybeCode = (error as NodeJS.ErrnoException).code;
return typeof maybeCode === "string" ? maybeCode : undefined;
}
function containsHardFailureSignal(value: string): boolean {
return /(timeout|timed out|network|econnreset|econnrefused|enotfound|403|forbidden)/i.test(value);
}
function inferHardFailure(result: ActorExecutionResult): boolean {
if (result.failureKind === "hard") {
return true;
}
if (result.status !== "failure") {
return false;
}
const payloadText = (() => {
const message = result.payload?.error;
return typeof message === "string" ? message : "";
})();
const codeText = result.failureCode ?? "";
return containsHardFailureSignal(`${codeText} ${payloadText}`);
}
function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload {
if (status === "success") {
return {
summary: "Node completed successfully.",
};
}
if (status === "validation_fail") {
return {
summary: "Node failed validation and requested retry/unrolled remediation.",
};
}
return {
summary: "Node execution failed and was marked blocked.",
};
}
function defaultEventsForStatus(status: ActorResultStatus): DomainEventType[] {
if (status === "success") {
return ["validation_passed"];
}
if (status === "validation_fail") {
return ["validation_failed"];
}
return ["task_blocked"];
}
function getExecutionMode(node: PipelineNode): "sequential" | "parallel" | "hierarchical" {
if (node.topology?.kind === "parallel") {
return "parallel";
}
if (node.topology?.kind === "hierarchical") {
return "hierarchical";
}
return "sequential";
}
function toBlockKey(item: QueueItem, node: PipelineNode): string {
const mode = getExecutionMode(node);
if (mode === "sequential") {
return `sequential:${item.nodeId}`;
}
const blockId = node.topology?.blockId;
return blockId ? `${mode}:${blockId}` : `${mode}:${item.nodeId}`;
}
export class PipelineExecutor {
private readonly nodeById = new Map<string, PipelineNode>();
private readonly edgesBySource = new Map<string, PipelineEdge[]>();
private readonly domainEventBus = new DomainEventBus();
private managerRunCounter = 0;
constructor(
private readonly manifest: AgentManifest,
@@ -165,150 +350,481 @@ export class PipelineExecutor {
sessionId: string;
initialPayload: JsonObject;
initialState?: Partial<StoredSessionState>;
signal?: AbortSignal;
}): Promise<PipelineRunSummary> {
await this.stateManager.initializeSession(input.sessionId, input.initialState);
const projectContext = await this.options.projectContextStore.readState();
await this.stateManager.initializeSession(input.sessionId, {
...(input.initialState ?? {}),
flags: {
...projectContext.globalFlags,
...(input.initialState?.flags ?? {}),
},
metadata: {
project_context: {
globalFlags: { ...projectContext.globalFlags },
artifactPointers: { ...projectContext.artifactPointers },
taskQueue: projectContext.taskQueue.map((task) => ({
id: task.id,
title: task.title,
status: task.status,
...(task.assignee ? { assignee: task.assignee } : {}),
...(task.metadata ? { metadata: task.metadata } : {}),
})),
},
...((input.initialState?.metadata ?? {}) as JsonObject),
},
});
await this.stateManager.writeHandoff(input.sessionId, {
nodeId: this.manifest.pipeline.entryNodeId,
payload: input.initialPayload,
});
const records: PipelineExecutionRecord[] = [];
const queue: QueueItem[] = [{ nodeId: this.manifest.pipeline.entryNodeId, depth: 0 }];
const attemptsByNode = new Map<string, number>();
const events: DomainEvent[] = [];
const ready = new Map<string, number>([[this.manifest.pipeline.entryNodeId, 0]]);
const completedNodes = new Set<string>();
const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 3);
const maxExecutions = this.manifest.pipeline.nodes.length * (this.options.maxRetries + 4);
let executionCount = 0;
let sequentialHardFailures = 0;
while (queue.length > 0) {
const item = queue.shift();
if (!item) {
continue;
while (ready.size > 0) {
throwIfAborted(input.signal);
const frontier: QueueItem[] = [...ready.entries()].map(([nodeId, depth]) => ({ nodeId, depth }));
ready.clear();
for (const group of this.buildExecutionGroups(frontier)) {
const groupResults = group.concurrent
? await Promise.all(
group.items.map((queueItem) => this.executeNode({ ...queueItem, sessionId: input.sessionId, signal: input.signal })),
)
: await this.executeSequentialGroup(input.sessionId, group.items, input.signal);
for (const nodeResult of groupResults) {
records.push(...nodeResult.records);
events.push(...nodeResult.events);
executionCount += nodeResult.records.length;
if (executionCount > maxExecutions) {
throw new Error("Pipeline execution exceeded the configured safe execution bound.");
}
for (const wasHardFailure of nodeResult.hardFailureAttempts) {
if (wasHardFailure) {
sequentialHardFailures += 1;
} else {
sequentialHardFailures = 0;
}
if (sequentialHardFailures >= 2) {
throw new Error(
"Hard failure threshold reached (>=2 sequential API/network/403 failures). Pipeline aborted.",
);
}
}
completedNodes.add(nodeResult.queueItem.nodeId);
const state = await this.stateManager.readState(input.sessionId);
const candidateEdges = this.edgesBySource.get(nodeResult.queueItem.nodeId) ?? [];
const eventTypes = new Set(nodeResult.finalEventTypes);
for (const edge of candidateEdges) {
if (!shouldEdgeRun(edge, nodeResult.finalResult.status, eventTypes)) {
continue;
}
if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) {
continue;
}
if (completedNodes.has(edge.to)) {
continue;
}
await this.stateManager.writeHandoff(input.sessionId, {
nodeId: edge.to,
fromNodeId: nodeResult.queueItem.nodeId,
payload: nodeResult.finalPayload,
});
const nextDepth = nodeResult.queueItem.depth + 1;
const existingDepth = ready.get(edge.to);
if (existingDepth === undefined || nextDepth < existingDepth) {
ready.set(edge.to, nextDepth);
}
}
}
}
}
executionCount += 1;
if (executionCount > maxExecutions) {
throw new Error("Pipeline execution exceeded the configured safe execution bound.");
}
if (item.depth > this.options.maxDepth) {
throw new Error(
`Pipeline depth ${String(item.depth)} exceeds configured maxDepth ${String(this.options.maxDepth)}.`,
);
}
const node = this.nodeById.get(item.nodeId);
if (!node) {
throw new Error(`Pipeline node \"${item.nodeId}\" is not defined.`);
}
const executor = this.actorExecutors.get(node.actorId);
if (!executor) {
throw new Error(`No actor executor registered for actor \"${node.actorId}\".`);
}
const context = await this.stateManager.buildFreshNodeContext(input.sessionId, node.id);
const prompt = this.personaRegistry.renderSystemPrompt({
personaId: node.personaId,
runtimeContext: {
...this.options.runtimeContext,
session_id: input.sessionId,
node_id: node.id,
depth: item.depth,
const finalState = await this.stateManager.readState(input.sessionId);
if (records.length > 0 && records[records.length - 1]?.status === "success") {
await this.options.projectContextStore.patchState({
artifactPointers: {
[`sessions/${input.sessionId}/final_state`]: this.stateManager.getSessionStatePath(input.sessionId),
},
});
const result = await executor({
sessionId: input.sessionId,
node,
prompt,
context,
toolClearance: this.personaRegistry.getToolClearance(node.personaId),
});
const attempt = (attemptsByNode.get(node.id) ?? 0) + 1;
attemptsByNode.set(node.id, attempt);
records.push({
nodeId: node.id,
depth: item.depth,
attempt,
status: result.status,
});
const behaviorEvent = toBehaviorEvent(result.status);
const behaviorPatch = behaviorEvent
? await this.personaRegistry.emitBehaviorEvent({
personaId: node.personaId,
event: behaviorEvent,
sessionId: input.sessionId,
nodeId: node.id,
payload: result.payload ?? {},
})
: {};
const historyEvent: SessionHistoryEntry = {
nodeId: node.id,
event: result.status,
timestamp: new Date().toISOString(),
...(result.payload ? { data: result.payload } : {}),
};
await this.stateManager.patchState(input.sessionId, {
...(result.stateFlags ? { flags: result.stateFlags } : {}),
metadata: {
...(result.stateMetadata ?? {}),
...behaviorPatch,
},
historyEvent,
});
const maxRetriesForNode = Math.min(
node.constraints?.maxRetries ?? this.manifest.topologyConstraints.maxRetries,
this.options.maxRetries,
);
if (result.status !== "success" && attempt <= maxRetriesForNode + 1) {
await this.stateManager.writeHandoff(input.sessionId, {
nodeId: node.id,
fromNodeId: node.id,
payload: result.payload ?? context.handoff.payload,
});
queue.push({
nodeId: node.id,
depth: item.depth,
});
continue;
}
const state = await this.stateManager.readState(input.sessionId);
const candidateEdges = this.edgesBySource.get(node.id) ?? [];
for (const edge of candidateEdges) {
if (!shouldEdgeRun(edge, result.status)) {
continue;
}
if (!(await edgeConditionsSatisfied(edge, state, this.options.workspaceRoot))) {
continue;
}
await this.stateManager.writeHandoff(input.sessionId, {
nodeId: edge.to,
fromNodeId: node.id,
payload: result.payload ?? context.handoff.payload,
});
queue.push({
nodeId: edge.to,
depth: item.depth + 1,
});
}
}
return {
sessionId: input.sessionId,
records,
finalState: await this.stateManager.readState(input.sessionId),
events,
finalState,
};
}
private buildExecutionGroups(frontier: QueueItem[]): ExecutionGroup[] {
const groupsByKey = new Map<string, ExecutionGroup>();
for (const item of frontier) {
const node = this.nodeById.get(item.nodeId);
if (!node) {
throw new Error(`Pipeline node "${item.nodeId}" is not defined.`);
}
const mode = getExecutionMode(node);
const key = toBlockKey(item, node);
const existing = groupsByKey.get(key);
if (existing) {
existing.items.push(item);
} else {
groupsByKey.set(key, {
concurrent: mode === "parallel" || mode === "hierarchical",
items: [item],
});
}
}
return [...groupsByKey.values()];
}
private async executeSequentialGroup(
sessionId: string,
items: QueueItem[],
signal?: AbortSignal,
): Promise<NodeExecutionOutcome[]> {
const results: NodeExecutionOutcome[] = [];
for (const item of items) {
results.push(await this.executeNode({ ...item, sessionId, signal }));
}
return results;
}
private shouldRetryValidation(node: PipelineNode): boolean {
if (node.topology?.kind === "retry-unrolled") {
return true;
}
return this.manifest.topologies.includes("retry-unrolled");
}
private getMaxRetriesForNode(node: PipelineNode): number {
return Math.min(
node.constraints?.maxRetries ?? this.manifest.topologyConstraints.maxRetries,
this.options.maxRetries,
);
}
private buildManagerRunSessionId(nodeId: string): string {
this.managerRunCounter += 1;
const safeNodeId = nodeId.replace(/[^a-zA-Z0-9_-]/g, "_");
return `${this.options.managerSessionId}__${safeNodeId}__run_${String(this.managerRunCounter)}`;
}
private async executeNode(input: {
sessionId: string;
nodeId: string;
depth: number;
signal?: AbortSignal;
}): Promise<NodeExecutionOutcome> {
const { sessionId, nodeId, depth, signal } = input;
if (depth > this.options.maxDepth) {
throw new Error(
`Pipeline depth ${String(depth)} exceeds configured maxDepth ${String(this.options.maxDepth)}.`,
);
}
const node = this.nodeById.get(nodeId);
if (!node) {
throw new Error(`Pipeline node "${nodeId}" is not defined.`);
}
const executor = this.actorExecutors.get(node.actorId);
if (!executor) {
throw new Error(`No actor executor registered for actor "${node.actorId}".`);
}
const managerRunSessionId = this.buildManagerRunSessionId(node.id);
const managerRunSession = this.options.manager.createSession(managerRunSessionId, {
parentSessionId: this.options.managerSessionId,
});
const nodeRecords: PipelineExecutionRecord[] = [];
const nodeEvents: DomainEvent[] = [];
const hardFailureAttempts: boolean[] = [];
const maxRetriesForNode = this.getMaxRetriesForNode(node);
try {
const output = await this.options.manager.runRecursiveAgent<RetryIntent, NodeAttemptResult>({
sessionId: managerRunSessionId,
depth,
signal,
run: async ({ intent, depth: recursiveDepth, signal: recursiveSignal }) => {
const attempt = (() => {
const intentAttempt = intent?.context?.attempt;
if (typeof intentAttempt === "number" && Number.isInteger(intentAttempt) && intentAttempt >= 1) {
return intentAttempt;
}
return 1;
})();
const context = await this.stateManager.buildFreshNodeContext(sessionId, node.id);
const prompt = this.personaRegistry.renderSystemPrompt({
personaId: node.personaId,
runtimeContext: {
...this.options.runtimeContext,
session_id: sessionId,
node_id: node.id,
depth: recursiveDepth,
attempt,
},
});
const result = await this.invokeActorExecutor({
sessionId,
node,
prompt,
context,
signal: recursiveSignal,
executor,
});
const domainEvents = this.createAttemptDomainEvents({
sessionId,
nodeId: node.id,
attempt,
status: result.status,
customEvents: result.events,
});
await this.persistNodeAttempt({
sessionId,
node,
attempt,
result,
domainEvents,
});
const emittedEventTypes = domainEvents.map((event) => event.type);
nodeRecords.push({
nodeId: node.id,
depth: recursiveDepth,
attempt,
status: result.status,
emittedEvents: emittedEventTypes,
});
nodeEvents.push(...domainEvents);
const hardFailure = inferHardFailure(result);
hardFailureAttempts.push(hardFailure);
const payloadForNext = result.payload ?? context.handoff.payload;
const shouldRetry =
result.status === "validation_fail" &&
this.shouldRetryValidation(node) &&
attempt <= maxRetriesForNode;
if (!shouldRetry) {
return {
type: "complete" as const,
output: {
attempt,
depth: recursiveDepth,
result,
payloadForNext,
domainEvents,
hardFailure,
},
};
}
await this.stateManager.writeHandoff(sessionId, {
nodeId: node.id,
fromNodeId: node.id,
payload: payloadForNext,
});
return {
type: "fanout" as const,
intents: [
{
persona: node.personaId,
task: `retry-${node.id}-attempt-${String(attempt + 1)}`,
context: {
attempt: attempt + 1,
},
},
],
aggregate: ({ childResults }) => {
const first = childResults[0];
if (!first) {
throw new Error(`Retry aggregation for node "${node.id}" did not receive child output.`);
}
return first.output;
},
};
},
});
return {
queueItem: {
nodeId: node.id,
depth,
},
finalResult: output.result,
finalPayload: output.payloadForNext,
finalAttempt: output.attempt,
finalEventTypes: output.domainEvents.map((event) => event.type),
records: nodeRecords,
events: nodeEvents,
hardFailureAttempts,
};
} finally {
managerRunSession.close();
}
}
private async invokeActorExecutor(input: {
sessionId: string;
node: PipelineNode;
prompt: string;
context: NodeExecutionContext;
signal: AbortSignal;
executor: ActorExecutor;
}): Promise<ActorExecutionResult> {
try {
throwIfAborted(input.signal);
return await input.executor({
sessionId: input.sessionId,
node: input.node,
prompt: input.prompt,
context: input.context,
signal: input.signal,
toolClearance: this.personaRegistry.getToolClearance(input.node.personaId),
});
} catch (error) {
if (input.signal.aborted) {
throw toAbortError(input.signal);
}
const failureCode = toFailureCodeFromError(error);
const failureKind = containsHardFailureSignal(`${failureCode ?? ""} ${toErrorMessage(error)}`)
? "hard"
: "soft";
return {
status: "failure",
payload: toErrorPayload(error),
failureCode,
failureKind,
};
}
}
private createAttemptDomainEvents(input: {
sessionId: string;
nodeId: string;
attempt: number;
status: ActorResultStatus;
customEvents?: DomainEventEmission[];
}): DomainEvent[] {
const eventPayloadByType = new Map<DomainEventType, DomainEventPayload>();
for (const type of defaultEventsForStatus(input.status)) {
eventPayloadByType.set(type, defaultEventPayloadForStatus(input.status));
}
for (const customEvent of input.customEvents ?? []) {
eventPayloadByType.set(customEvent.type, customEvent.payload ?? {});
}
return [...eventPayloadByType.entries()].map(([type, payload]) =>
createDomainEvent({
type,
source: "actor",
sessionId: input.sessionId,
nodeId: input.nodeId,
attempt: input.attempt,
payload,
}),
);
}
private async persistNodeAttempt(input: {
sessionId: string;
node: PipelineNode;
attempt: number;
result: ActorExecutionResult;
domainEvents: DomainEvent[];
}): Promise<void> {
const behaviorEvent = toBehaviorEvent(input.result.status);
const behaviorPatch = behaviorEvent
? await this.personaRegistry.emitBehaviorEvent({
personaId: input.node.personaId,
event: behaviorEvent,
sessionId: input.sessionId,
nodeId: input.node.id,
payload: input.result.payload ?? {},
})
: {};
const legacyHistoryEvent: SessionHistoryEntry = {
nodeId: input.node.id,
event: input.result.status,
timestamp: new Date().toISOString(),
...(input.result.payload ? { data: input.result.payload } : {}),
};
const domainHistoryEvents: SessionHistoryEntry[] = input.domainEvents.map((event) => ({
nodeId: input.node.id,
event: event.type,
timestamp: event.timestamp,
data: {
source: event.source,
attempt: event.attempt,
...(event.payload.summary ? { summary: event.payload.summary } : {}),
...(event.payload.errorCode ? { errorCode: event.payload.errorCode } : {}),
...(event.payload.artifactPointer ? { artifactPointer: event.payload.artifactPointer } : {}),
...(event.payload.details ? { details: event.payload.details } : {}),
},
}));
await this.stateManager.patchState(input.sessionId, {
...(input.result.stateFlags ? { flags: input.result.stateFlags } : {}),
metadata: {
...(input.result.stateMetadata ?? {}),
...behaviorPatch,
},
historyEvent: legacyHistoryEvent,
historyEvents: domainHistoryEvents,
});
for (const event of input.domainEvents) {
await this.domainEventBus.publish(event);
}
const patch: ProjectContextPatch = {
...(input.result.projectContextPatch ?? {}),
artifactPointers: {
[`sessions/${input.sessionId}/last_completed_node`]: input.node.id,
[`sessions/${input.sessionId}/last_attempt`]: String(input.attempt),
...(input.result.projectContextPatch?.artifactPointers ?? {}),
},
};
await this.options.projectContextStore.patchState(patch);
}
}

View File

@@ -0,0 +1,218 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, resolve } from "node:path";
import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./types.js";
export type ProjectTaskStatus = "pending" | "in_progress" | "blocked" | "done";
export type ProjectTask = {
id: string;
title: string;
status: ProjectTaskStatus;
assignee?: string;
metadata?: JsonObject;
};
export type ProjectContextState = {
globalFlags: Record<string, boolean>;
artifactPointers: Record<string, string>;
taskQueue: ProjectTask[];
};
export type ProjectContextPatch = {
globalFlags?: Record<string, boolean>;
artifactPointers?: Record<string, string>;
taskQueue?: ProjectTask[];
enqueueTasks?: ProjectTask[];
upsertTasks?: ProjectTask[];
};
const DEFAULT_PROJECT_CONTEXT: ProjectContextState = {
globalFlags: {},
artifactPointers: {},
taskQueue: [],
};
function assertNonEmptyString(value: unknown, label: string): string {
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`${label} must be a non-empty string.`);
}
return value.trim();
}
function toJsonObject(value: unknown, label: string): JsonObject {
if (!isRecord(value)) {
throw new Error(`${label} is malformed.`);
}
return value as JsonObject;
}
function toTaskStatus(value: unknown, label: string): ProjectTaskStatus {
if (value === "pending" || value === "in_progress" || value === "blocked" || value === "done") {
return value;
}
throw new Error(`${label} has unsupported status "${String(value)}".`);
}
function toProjectTask(value: unknown, label: string): ProjectTask {
if (!isRecord(value)) {
throw new Error(`${label} is malformed.`);
}
const assignee = value.assignee;
if (assignee !== undefined && (typeof assignee !== "string" || assignee.trim().length === 0)) {
throw new Error(`${label}.assignee must be a non-empty string when provided.`);
}
return {
id: assertNonEmptyString(value.id, `${label}.id`),
title: assertNonEmptyString(value.title, `${label}.title`),
status: toTaskStatus(value.status, `${label}.status`),
...(typeof assignee === "string" ? { assignee: assignee.trim() } : {}),
...(value.metadata !== undefined
? { metadata: toJsonObject(value.metadata, `${label}.metadata`) }
: {}),
};
}
function toBooleanRecord(value: unknown, label: string): Record<string, boolean> {
if (!isRecord(value)) {
throw new Error(`${label} is malformed.`);
}
const out: Record<string, boolean> = {};
for (const [key, raw] of Object.entries(value)) {
if (typeof raw !== "boolean") {
throw new Error(`${label}.${key} must be a boolean.`);
}
out[key] = raw;
}
return out;
}
function toStringRecord(value: unknown, label: string): Record<string, string> {
if (!isRecord(value)) {
throw new Error(`${label} is malformed.`);
}
const out: Record<string, string> = {};
for (const [key, raw] of Object.entries(value)) {
out[key] = assertNonEmptyString(raw, `${label}.${key}`);
}
return out;
}
function toProjectContextState(value: unknown): ProjectContextState {
if (!isRecord(value)) {
throw new Error("Project context store is malformed.");
}
const tasksRaw = value.taskQueue;
if (!Array.isArray(tasksRaw)) {
throw new Error("Project context taskQueue is malformed.");
}
return {
globalFlags: toBooleanRecord(value.globalFlags, "Project context globalFlags"),
artifactPointers: toStringRecord(value.artifactPointers, "Project context artifactPointers"),
taskQueue: tasksRaw.map((task, index) => toProjectTask(task, `Project context taskQueue[${String(index)}]`)),
};
}
function cloneState(state: ProjectContextState): ProjectContextState {
return deepCloneJson(state as JsonValue) as ProjectContextState;
}
function mergeUpsertTasks(current: ProjectTask[], upserts: ProjectTask[]): ProjectTask[] {
if (upserts.length === 0) {
return current;
}
const byId = new Map<string, ProjectTask>();
for (const task of current) {
byId.set(task.id, task);
}
for (const task of upserts) {
byId.set(task.id, task);
}
return [...byId.values()];
}
export class FileSystemProjectContextStore {
private readonly filePath: string;
private queue: Promise<void> = Promise.resolve();
constructor(input: { filePath: string }) {
this.filePath = resolve(input.filePath);
}
getFilePath(): string {
return this.filePath;
}
async readState(): Promise<ProjectContextState> {
try {
const content = await readFile(this.filePath, "utf8");
const parsed = JSON.parse(content) as unknown;
return toProjectContextState(parsed);
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return cloneState(DEFAULT_PROJECT_CONTEXT);
}
throw error;
}
}
async writeState(state: ProjectContextState): Promise<void> {
await this.runSerialized(async () => {
await mkdir(dirname(this.filePath), { recursive: true });
await writeFile(this.filePath, `${JSON.stringify(state, null, 2)}\n`, "utf8");
});
}
async patchState(patch: ProjectContextPatch): Promise<ProjectContextState> {
return this.runSerialized(async () => {
const current = await this.readState();
if (patch.globalFlags) {
Object.assign(current.globalFlags, patch.globalFlags);
}
if (patch.artifactPointers) {
Object.assign(current.artifactPointers, patch.artifactPointers);
}
if (patch.taskQueue) {
current.taskQueue = patch.taskQueue.map((task, index) =>
toProjectTask(task, `Project context patch taskQueue[${String(index)}]`),
);
}
if (patch.enqueueTasks && patch.enqueueTasks.length > 0) {
current.taskQueue.push(
...patch.enqueueTasks.map((task, index) =>
toProjectTask(task, `Project context patch enqueueTasks[${String(index)}]`),
),
);
}
if (patch.upsertTasks && patch.upsertTasks.length > 0) {
const upsertTasks = patch.upsertTasks.map((task, index) =>
toProjectTask(task, `Project context patch upsertTasks[${String(index)}]`),
);
current.taskQueue = mergeUpsertTasks(current.taskQueue, upsertTasks);
}
await mkdir(dirname(this.filePath), { recursive: true });
await writeFile(this.filePath, `${JSON.stringify(current, null, 2)}\n`, "utf8");
return current;
});
}
private runSerialized<T>(operation: () => Promise<T>): Promise<T> {
const run = this.queue.then(operation, operation);
this.queue = run.then(
() => undefined,
() => undefined,
);
return run;
}
}

View File

@@ -161,6 +161,10 @@ export class FileSystemStateContextManager {
return this.rootDirectory;
}
getSessionStatePath(sessionId: string): string {
return toStatePath(this.rootDirectory, sessionId);
}
async initializeSession(
sessionId: string,
initialState: Partial<StoredSessionState> = {},
@@ -205,6 +209,7 @@ export class FileSystemStateContextManager {
flags?: Record<string, boolean>;
metadata?: JsonObject;
historyEvent?: SessionHistoryEntry;
historyEvents?: SessionHistoryEntry[];
},
): Promise<StoredSessionState> {
const current = await this.readState(sessionId);
@@ -218,6 +223,9 @@ export class FileSystemStateContextManager {
if (patch.historyEvent) {
current.history.push(patch.historyEvent);
}
if (patch.historyEvents && patch.historyEvents.length > 0) {
current.history.push(...patch.historyEvents);
}
await this.writeState(sessionId, current);
return current;