Remove legacy orchestration and MCP aliases; reroute recursive pipeline API

This commit is contained in:
2026-02-23 16:02:20 -05:00
parent 62e2491cde
commit 1363bceecc
15 changed files with 88 additions and 102 deletions

View File

@@ -78,9 +78,10 @@ npm run dev -- claude "List potential improvements."
Pipeline edges can route via: Pipeline edges can route via:
- legacy status triggers (`on`: `success`, `validation_fail`, `failure`, `always`, ...) - legacy status triggers (`on`: `success`, `validation_fail`, `failure`, `always`)
- domain event triggers (`event`: typed domain events) - domain event triggers (`event`: typed domain events)
- conditions (`state_flag`, `history_has_event`, `file_exists`, `always`) - conditions (`state_flag`, `history_has_event`, `file_exists`, `always`)
- `history_has_event` evaluates persisted domain event history (for example `validation_failed`)
## Domain Events ## Domain Events
@@ -190,4 +191,9 @@ npm run build
## Notes ## Notes
- `AgentManager.runRecursiveAgent(...)` remains available for low-level testing, but pipeline execution should use `SchemaDrivenExecutionEngine.runSession(...)`. - Recursive execution APIs on `AgentManager` are internal runtime plumbing; use `SchemaDrivenExecutionEngine.runSession(...)` as the public orchestration entrypoint.
## MCP Migration Note
- Shared MCP server configs no longer accept the legacy `http_headers` alias.
- Use `headers` instead.

View File

@@ -42,6 +42,7 @@ Node payloads are persisted under the state root. Nodes do not inherit in-memory
- validation: `validation_passed`, `validation_failed` - validation: `validation_passed`, `validation_failed`
- integration: `branch_merged` - integration: `branch_merged`
- Pipeline edges can trigger on domain events (`edge.event`) in addition to legacy status triggers (`edge.on`). - Pipeline edges can trigger on domain events (`edge.event`) in addition to legacy status triggers (`edge.on`).
- `history_has_event` route conditions evaluate persisted domain event history entries (`validation_failed`, `task_blocked`, etc.).
## Security note ## Security note

View File

@@ -8,7 +8,6 @@ import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./
import { PersonaRegistry } from "./persona-registry.js"; import { PersonaRegistry } from "./persona-registry.js";
import { import {
FileSystemStateContextManager, FileSystemStateContextManager,
type SessionHistoryEntry,
} from "./state-context.js"; } from "./state-context.js";
import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js"; import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js";
@@ -56,14 +55,7 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver {
}) })
: {}; : {};
const legacyHistoryEvent: SessionHistoryEntry = { const domainHistoryEvents = event.domainEvents.map((domainEvent) => ({
nodeId: event.node.id,
event: event.result.status,
timestamp: new Date().toISOString(),
...(event.result.payload ? { data: event.result.payload } : {}),
};
const domainHistoryEvents: SessionHistoryEntry[] = event.domainEvents.map((domainEvent) => ({
nodeId: event.node.id, nodeId: event.node.id,
event: domainEvent.type, event: domainEvent.type,
timestamp: domainEvent.timestamp, timestamp: domainEvent.timestamp,
@@ -85,7 +77,6 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver {
...(event.result.stateMetadata ?? {}), ...(event.result.stateMetadata ?? {}),
...behaviorPatch, ...behaviorPatch,
}, },
historyEvent: legacyHistoryEvent,
historyEvents: domainHistoryEvents, historyEvents: domainHistoryEvents,
}); });

View File

@@ -132,6 +132,25 @@ export type RecursiveRunOutput<TIntent extends RecursiveChildIntent, TOutput> =
| RecursiveCompleteResult<TOutput> | RecursiveCompleteResult<TOutput>
| RecursiveFanoutPlan<TIntent, TOutput>; | RecursiveFanoutPlan<TIntent, TOutput>;
export type RecursiveRunInput<TIntent extends RecursiveChildIntent, TOutput> = {
sessionId: string;
depth: number;
signal?: AbortSignal;
run: (
input: RecursiveRunContext<TIntent>,
) => Promise<RecursiveRunOutput<TIntent, TOutput>> | RecursiveRunOutput<TIntent, TOutput>;
childMiddleware?: RecursiveChildMiddleware<TIntent>;
};
type SessionRecursiveRunInput<TIntent extends RecursiveChildIntent, TOutput> = Omit<
RecursiveRunInput<TIntent, TOutput>,
"sessionId"
>;
type SessionRecursiveRunner = <TIntent extends RecursiveChildIntent, TOutput>(
input: SessionRecursiveRunInput<TIntent, TOutput>,
) => Promise<TOutput>;
type RecursiveChildOutcome = { status: "success" } | { status: "failure"; error: Error }; type RecursiveChildOutcome = { status: "success" } | { status: "failure"; error: Error };
export type RecursiveChildMiddleware<TIntent extends RecursiveChildIntent> = { export type RecursiveChildMiddleware<TIntent extends RecursiveChildIntent> = {
@@ -193,6 +212,7 @@ export class AgentSession {
constructor( constructor(
private readonly manager: AgentManager, private readonly manager: AgentManager,
public readonly id: string, public readonly id: string,
private readonly recursiveRunner: SessionRecursiveRunner,
) {} ) {}
async runAgent<T>(input: { async runAgent<T>(input: {
@@ -211,6 +231,12 @@ export class AgentSession {
close(): void { close(): void {
this.manager.closeSession(this.id); this.manager.closeSession(this.id);
} }
async runRecursive<TIntent extends RecursiveChildIntent, TOutput>(
input: SessionRecursiveRunInput<TIntent, TOutput>,
): Promise<TOutput> {
return this.recursiveRunner(input);
}
} }
export class AgentManager { export class AgentManager {
@@ -256,7 +282,12 @@ export class AgentManager {
this.sessions.get(parentSessionId)?.childSessionIds.add(sessionId); this.sessions.get(parentSessionId)?.childSessionIds.add(sessionId);
} }
return new AgentSession(this, sessionId); return new AgentSession(this, sessionId, (input) =>
this.runRecursiveNode({
sessionId,
...input,
}),
);
} }
closeSession(sessionId: string): void { closeSession(sessionId: string): void {
@@ -297,28 +328,6 @@ export class AgentManager {
} }
} }
/**
* @deprecated Prefer running recursive topologies through SchemaDrivenExecutionEngine.runSession.
* This method remains available for internal orchestration and low-level tests.
*/
async runRecursiveAgent<TIntent extends RecursiveChildIntent, TOutput>(input: {
sessionId: string;
depth: number;
signal?: AbortSignal;
run: (
input: RecursiveRunContext<TIntent>,
) => Promise<RecursiveRunOutput<TIntent, TOutput>> | RecursiveRunOutput<TIntent, TOutput>;
childMiddleware?: RecursiveChildMiddleware<TIntent>;
}): Promise<TOutput> {
return this.runRecursiveNode({
sessionId: input.sessionId,
depth: input.depth,
signal: input.signal,
run: input.run,
childMiddleware: input.childMiddleware,
});
}
getLimits(): AgentManagerLimits { getLimits(): AgentManagerLimits {
return { ...this.limits }; return { ...this.limits };
} }

View File

@@ -65,13 +65,7 @@ export type PipelineNode = {
export type PipelineEdge = { export type PipelineEdge = {
from: string; from: string;
to: string; to: string;
on?: on?: "success" | "validation_fail" | "failure" | "always";
| "success"
| "validation_fail"
| "failure"
| "always"
| "onTaskComplete"
| "onValidationFail";
event?: DomainEventType; event?: DomainEventType;
when?: RouteCondition[]; when?: RouteCondition[];
}; };
@@ -278,8 +272,6 @@ function parsePipelineEdge(value: unknown): PipelineEdge {
"validation_fail", "validation_fail",
"failure", "failure",
"always", "always",
"onTaskComplete",
"onValidationFail",
]; ];
const rawOn = value.on; const rawOn = value.on;

View File

@@ -164,12 +164,6 @@ function shouldEdgeRun(
if (edge.on === "failure" && status === "failure") { if (edge.on === "failure" && status === "failure") {
return true; return true;
} }
if (edge.on === "onTaskComplete" && status === "success") {
return true;
}
if (edge.on === "onValidationFail" && status === "validation_fail") {
return true;
}
return false; return false;
} }
@@ -598,8 +592,7 @@ export class PipelineExecutor {
const maxRetriesForNode = this.getMaxRetriesForNode(node); const maxRetriesForNode = this.getMaxRetriesForNode(node);
try { try {
const output = await this.options.manager.runRecursiveAgent<RetryIntent, NodeAttemptResult>({ const output = await managerRunSession.runRecursive<RetryIntent, NodeAttemptResult>({
sessionId: managerRunSessionId,
depth, depth,
signal, signal,
run: async ({ intent, depth: recursiveDepth, signal: recursiveSignal }) => { run: async ({ intent, depth: recursiveDepth, signal: recursiveSignal }) => {

View File

@@ -209,7 +209,6 @@ export class FileSystemStateContextManager {
patch: { patch: {
flags?: Record<string, boolean>; flags?: Record<string, boolean>;
metadata?: JsonObject; metadata?: JsonObject;
historyEvent?: SessionHistoryEntry;
historyEvents?: SessionHistoryEntry[]; historyEvents?: SessionHistoryEntry[];
}, },
): Promise<StoredSessionState> { ): Promise<StoredSessionState> {
@@ -221,9 +220,6 @@ export class FileSystemStateContextManager {
if (patch.metadata) { if (patch.metadata) {
Object.assign(current.metadata, patch.metadata); Object.assign(current.metadata, patch.metadata);
} }
if (patch.historyEvent) {
current.history.push(patch.historyEvent);
}
if (patch.historyEvents && patch.historyEvents.length > 0) { if (patch.historyEvents && patch.historyEvents.length > 0) {
current.history.push(...patch.historyEvents); current.history.push(...patch.historyEvents);
} }

View File

@@ -9,7 +9,6 @@ export type ProviderRuntimeConfig = {
codexSkipGitCheck: boolean; codexSkipGitCheck: boolean;
anthropicOauthToken?: string; anthropicOauthToken?: string;
anthropicApiKey?: string; anthropicApiKey?: string;
anthropicToken?: string;
claudeModel?: string; claudeModel?: string;
claudeCodePath?: string; claudeCodePath?: string;
}; };
@@ -211,24 +210,19 @@ function deepFreeze<T>(value: T): Readonly<T> {
} }
export function resolveAnthropicToken( export function resolveAnthropicToken(
provider: Pick<ProviderRuntimeConfig, "anthropicOauthToken" | "anthropicApiKey" | "anthropicToken">, provider: Pick<ProviderRuntimeConfig, "anthropicOauthToken" | "anthropicApiKey">,
): string | undefined { ): string | undefined {
const oauthToken = provider.anthropicOauthToken?.trim(); const oauthToken = provider.anthropicOauthToken?.trim();
if (oauthToken) { if (oauthToken) {
return oauthToken; return oauthToken;
} }
const configuredToken = provider.anthropicToken?.trim();
if (configuredToken) {
return configuredToken;
}
const apiKey = provider.anthropicApiKey?.trim(); const apiKey = provider.anthropicApiKey?.trim();
return apiKey || undefined; return apiKey || undefined;
} }
export function buildClaudeAuthEnv( export function buildClaudeAuthEnv(
provider: Pick<ProviderRuntimeConfig, "anthropicOauthToken" | "anthropicApiKey" | "anthropicToken">, provider: Pick<ProviderRuntimeConfig, "anthropicOauthToken" | "anthropicApiKey">,
): Record<string, string | undefined> { ): Record<string, string | undefined> {
const oauthToken = provider.anthropicOauthToken?.trim(); const oauthToken = provider.anthropicOauthToken?.trim();
if (oauthToken) { if (oauthToken) {
@@ -262,7 +256,6 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly<AppCo
codexSkipGitCheck: readBooleanWithFallback(env, "CODEX_SKIP_GIT_CHECK", true), codexSkipGitCheck: readBooleanWithFallback(env, "CODEX_SKIP_GIT_CHECK", true),
anthropicOauthToken, anthropicOauthToken,
anthropicApiKey, anthropicApiKey,
anthropicToken: anthropicOauthToken ?? anthropicApiKey,
claudeModel: readOptionalString(env, "CLAUDE_MODEL"), claudeModel: readOptionalString(env, "CLAUDE_MODEL"),
claudeCodePath: readOptionalString(env, "CLAUDE_CODE_PATH"), claudeCodePath: readOptionalString(env, "CLAUDE_CODE_PATH"),
}, },

View File

@@ -8,7 +8,6 @@ import type {
function mergeHeaders(server: SharedMcpServer): Record<string, string> | undefined { function mergeHeaders(server: SharedMcpServer): Record<string, string> | undefined {
const merged = { const merged = {
...(server.http_headers ?? {}),
...(server.headers ?? {}), ...(server.headers ?? {}),
}; };
@@ -16,7 +15,7 @@ function mergeHeaders(server: SharedMcpServer): Record<string, string> | undefin
} }
export function normalizeSharedMcpServer(server: SharedMcpServer): SharedMcpServer { export function normalizeSharedMcpServer(server: SharedMcpServer): SharedMcpServer {
const { headers: _headers, http_headers: _httpHeaders, ...rest } = server; const { headers: _headers, ...rest } = server;
const normalizedHeaders = mergeHeaders(server); const normalizedHeaders = mergeHeaders(server);
return { return {

View File

@@ -30,7 +30,6 @@ export type SharedMcpServer = {
startup_timeout_sec?: number; startup_timeout_sec?: number;
tool_timeout_sec?: number; tool_timeout_sec?: number;
bearer_token_env_var?: string; bearer_token_env_var?: string;
http_headers?: Record<string, string>;
env_http_headers?: Record<string, string>; env_http_headers?: Record<string, string>;
env_vars?: string[]; env_vars?: string[];
handler?: string; handler?: string;
@@ -111,7 +110,6 @@ export const sharedMcpServerSchema: z.ZodType<SharedMcpServer> = z
startup_timeout_sec: nonNegativeFiniteNumberSchema.optional(), startup_timeout_sec: nonNegativeFiniteNumberSchema.optional(),
tool_timeout_sec: nonNegativeFiniteNumberSchema.optional(), tool_timeout_sec: nonNegativeFiniteNumberSchema.optional(),
bearer_token_env_var: nonEmptyStringSchema.optional(), bearer_token_env_var: nonEmptyStringSchema.optional(),
http_headers: stringRecordSchema.optional(),
env_http_headers: stringRecordSchema.optional(), env_http_headers: stringRecordSchema.optional(),
env_vars: stringArraySchema.optional(), env_vars: stringArraySchema.optional(),
handler: nonEmptyStringSchema.optional(), handler: nonEmptyStringSchema.optional(),

View File

@@ -107,8 +107,7 @@ test("recursive fanout/fan-in avoids deadlock at maxConcurrentAgents=1", async (
const session = manager.createSession("recursive-deadlock"); const session = manager.createSession("recursive-deadlock");
const executionOrder: string[] = []; const executionOrder: string[] = [];
const result = await manager.runRecursiveAgent({ const result = await session.runRecursive({
sessionId: session.id,
depth: 0, depth: 0,
run: async ({ sessionId, intent }) => { run: async ({ sessionId, intent }) => {
executionOrder.push(`${sessionId}:${intent?.task ?? "root"}`); executionOrder.push(`${sessionId}:${intent?.task ?? "root"}`);
@@ -151,8 +150,7 @@ test("rejects recursive child spawn above depth limit", async () => {
await assert.rejects( await assert.rejects(
() => () =>
manager.runRecursiveAgent({ session.runRecursive({
sessionId: session.id,
depth: 0, depth: 0,
run: async ({ depth }) => { run: async ({ depth }) => {
if (depth < 3) { if (depth < 3) {
@@ -195,8 +193,7 @@ test("closing parent session aborts active recursive work and releases child res
let abortCount = 0; let abortCount = 0;
let releaseCount = 0; let releaseCount = 0;
const runPromise = manager.runRecursiveAgent({ const runPromise = session.runRecursive({
sessionId: session.id,
depth: 0, depth: 0,
run: async ({ intent, signal }) => { run: async ({ intent, signal }) => {
if (!intent) { if (!intent) {
@@ -286,8 +283,7 @@ test("recursive children can be isolated via middleware-backed suballocation", a
const childLeases = new Map<string, Awaited<ReturnType<typeof provisioner.provisionChildSession>>>(); const childLeases = new Map<string, Awaited<ReturnType<typeof provisioner.provisionChildSession>>>();
try { try {
await manager.runRecursiveAgent({ await session.runRecursive({
sessionId: session.id,
depth: 0, depth: 0,
run: async ({ intent, sessionId }) => { run: async ({ intent, sessionId }) => {
if (!intent) { if (!intent) {

View File

@@ -37,7 +37,6 @@ test("prefers CLAUDE_CODE_OAUTH_TOKEN over ANTHROPIC_API_KEY", () => {
assert.equal(config.provider.anthropicOauthToken, "oauth-token"); assert.equal(config.provider.anthropicOauthToken, "oauth-token");
assert.equal(config.provider.anthropicApiKey, "api-key"); assert.equal(config.provider.anthropicApiKey, "api-key");
assert.equal(config.provider.anthropicToken, "oauth-token");
assert.equal(resolveAnthropicToken(config.provider), "oauth-token"); assert.equal(resolveAnthropicToken(config.provider), "oauth-token");
const authEnv = buildClaudeAuthEnv(config.provider); const authEnv = buildClaudeAuthEnv(config.provider);
@@ -52,7 +51,6 @@ test("falls back to ANTHROPIC_API_KEY when oauth token is absent", () => {
assert.equal(config.provider.anthropicOauthToken, undefined); assert.equal(config.provider.anthropicOauthToken, undefined);
assert.equal(config.provider.anthropicApiKey, "api-key"); assert.equal(config.provider.anthropicApiKey, "api-key");
assert.equal(config.provider.anthropicToken, "api-key");
assert.equal(resolveAnthropicToken(config.provider), "api-key"); assert.equal(resolveAnthropicToken(config.provider), "api-key");
const authEnv = buildClaudeAuthEnv(config.provider); const authEnv = buildClaudeAuthEnv(config.provider);

View File

@@ -117,3 +117,22 @@ test("rejects relationship cycles", () => {
assert.throws(() => parseAgentManifest(manifest), /Relationship graph must be acyclic/); assert.throws(() => parseAgentManifest(manifest), /Relationship graph must be acyclic/);
}); });
test("rejects legacy edge trigger aliases", () => {
const manifest = validManifest() as {
pipeline: {
edges: Array<{ from: string; to: string; on: string }>;
};
};
manifest.pipeline.edges[0] = {
from: "product-node",
to: "coder-node",
on: "onValidationFail",
};
assert.throws(
() => parseAgentManifest(manifest),
/unsupported event "onValidationFail"/,
);
});

View File

@@ -43,40 +43,35 @@ test("maps shared headers to codex http_headers", () => {
}); });
}); });
test("normalizes header aliases into a single headers object", () => { test("normalizes headers into a single headers object", () => {
const normalized = normalizeSharedMcpServer({ const normalized = normalizeSharedMcpServer({
url: "http://localhost:3000/mcp", url: "http://localhost:3000/mcp",
http_headers: {
"X-Source": "legacy",
},
headers: { headers: {
Authorization: "Bearer token", Authorization: "Bearer token",
}, },
}); });
assert.deepEqual(normalized.headers, { assert.deepEqual(normalized.headers, {
"X-Source": "legacy",
Authorization: "Bearer token", Authorization: "Bearer token",
}); });
assert.equal("http_headers" in normalized, false);
}); });
test("maps legacy http_headers alias for claude conversion", () => { test("rejects legacy shared MCP header alias http_headers", () => {
const claudeConfig = toClaudeServerConfig("legacy-http-headers", { assert.throws(
type: "http", () =>
url: "http://localhost:3000/mcp", parseMcpConfig({
http_headers: { servers: {
Authorization: "Bearer token", legacy: {
}, type: "http",
}); url: "http://localhost:3000/mcp",
http_headers: {
assert.deepEqual(claudeConfig, { Authorization: "Bearer token",
type: "http", },
url: "http://localhost:3000/mcp", },
headers: { },
Authorization: "Bearer token", }),
}, /unrecognized key/i,
}); );
}); });
test("throws for claude http server without url", () => { test("throws for claude http server without url", () => {

View File

@@ -135,7 +135,7 @@ function createManifest(): unknown {
when: [ when: [
{ {
type: "history_has_event", type: "history_has_event",
event: "validation_fail", event: "validation_failed",
}, },
], ],
}, },