From 1363bceecc95729a1d69733ffe1efbe7aec89b0a Mon Sep 17 00:00:00 2001 From: Josh Rzemien Date: Mon, 23 Feb 2026 16:02:20 -0500 Subject: [PATCH] Remove legacy orchestration and MCP aliases; reroute recursive pipeline API --- README.md | 10 ++++-- docs/orchestration-engine.md | 1 + src/agents/lifecycle-observer.ts | 11 +----- src/agents/manager.ts | 55 +++++++++++++++++------------- src/agents/manifest.ts | 10 +----- src/agents/pipeline.ts | 9 +---- src/agents/state-context.ts | 4 --- src/config.ts | 11 ++---- src/mcp/converters.ts | 3 +- src/mcp/types.ts | 2 -- tests/agent-manager.test.ts | 12 +++---- tests/config.test.ts | 2 -- tests/manifest-schema.test.ts | 19 +++++++++++ tests/mcp-converters.test.ts | 39 +++++++++------------ tests/orchestration-engine.test.ts | 2 +- 15 files changed, 88 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index bc80d3a..222d5b6 100644 --- a/README.md +++ b/README.md @@ -78,9 +78,10 @@ npm run dev -- claude "List potential improvements." Pipeline edges can route via: -- legacy status triggers (`on`: `success`, `validation_fail`, `failure`, `always`, ...) +- legacy status triggers (`on`: `success`, `validation_fail`, `failure`, `always`) - domain event triggers (`event`: typed domain events) - conditions (`state_flag`, `history_has_event`, `file_exists`, `always`) +- `history_has_event` evaluates persisted domain event history (for example `validation_failed`) ## Domain Events @@ -190,4 +191,9 @@ npm run build ## Notes -- `AgentManager.runRecursiveAgent(...)` remains available for low-level testing, but pipeline execution should use `SchemaDrivenExecutionEngine.runSession(...)`. +- Recursive execution APIs on `AgentManager` are internal runtime plumbing; use `SchemaDrivenExecutionEngine.runSession(...)` as the public orchestration entrypoint. + +## MCP Migration Note + +- Shared MCP server configs no longer accept the legacy `http_headers` alias. +- Use `headers` instead. diff --git a/docs/orchestration-engine.md b/docs/orchestration-engine.md index f80ef0d..5261a0a 100644 --- a/docs/orchestration-engine.md +++ b/docs/orchestration-engine.md @@ -42,6 +42,7 @@ Node payloads are persisted under the state root. Nodes do not inherit in-memory - validation: `validation_passed`, `validation_failed` - integration: `branch_merged` - Pipeline edges can trigger on domain events (`edge.event`) in addition to legacy status triggers (`edge.on`). +- `history_has_event` route conditions evaluate persisted domain event history entries (`validation_failed`, `task_blocked`, etc.). ## Security note diff --git a/src/agents/lifecycle-observer.ts b/src/agents/lifecycle-observer.ts index 079d62e..654462f 100644 --- a/src/agents/lifecycle-observer.ts +++ b/src/agents/lifecycle-observer.ts @@ -8,7 +8,6 @@ import { type ProjectContextPatch, type FileSystemProjectContextStore } from "./ import { PersonaRegistry } from "./persona-registry.js"; import { FileSystemStateContextManager, - type SessionHistoryEntry, } from "./state-context.js"; import type { ActorExecutionResult, ActorResultStatus } from "./pipeline.js"; @@ -56,14 +55,7 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver { }) : {}; - const legacyHistoryEvent: SessionHistoryEntry = { - nodeId: event.node.id, - event: event.result.status, - timestamp: new Date().toISOString(), - ...(event.result.payload ? { data: event.result.payload } : {}), - }; - - const domainHistoryEvents: SessionHistoryEntry[] = event.domainEvents.map((domainEvent) => ({ + const domainHistoryEvents = event.domainEvents.map((domainEvent) => ({ nodeId: event.node.id, event: domainEvent.type, timestamp: domainEvent.timestamp, @@ -85,7 +77,6 @@ export class PersistenceLifecycleObserver implements PipelineLifecycleObserver { ...(event.result.stateMetadata ?? {}), ...behaviorPatch, }, - historyEvent: legacyHistoryEvent, historyEvents: domainHistoryEvents, }); diff --git a/src/agents/manager.ts b/src/agents/manager.ts index 6b267db..c4e29b8 100644 --- a/src/agents/manager.ts +++ b/src/agents/manager.ts @@ -132,6 +132,25 @@ export type RecursiveRunOutput = | RecursiveCompleteResult | RecursiveFanoutPlan; +export type RecursiveRunInput = { + sessionId: string; + depth: number; + signal?: AbortSignal; + run: ( + input: RecursiveRunContext, + ) => Promise> | RecursiveRunOutput; + childMiddleware?: RecursiveChildMiddleware; +}; + +type SessionRecursiveRunInput = Omit< + RecursiveRunInput, + "sessionId" +>; + +type SessionRecursiveRunner = ( + input: SessionRecursiveRunInput, +) => Promise; + type RecursiveChildOutcome = { status: "success" } | { status: "failure"; error: Error }; export type RecursiveChildMiddleware = { @@ -193,6 +212,7 @@ export class AgentSession { constructor( private readonly manager: AgentManager, public readonly id: string, + private readonly recursiveRunner: SessionRecursiveRunner, ) {} async runAgent(input: { @@ -211,6 +231,12 @@ export class AgentSession { close(): void { this.manager.closeSession(this.id); } + + async runRecursive( + input: SessionRecursiveRunInput, + ): Promise { + return this.recursiveRunner(input); + } } export class AgentManager { @@ -256,7 +282,12 @@ export class AgentManager { this.sessions.get(parentSessionId)?.childSessionIds.add(sessionId); } - return new AgentSession(this, sessionId); + return new AgentSession(this, sessionId, (input) => + this.runRecursiveNode({ + sessionId, + ...input, + }), + ); } closeSession(sessionId: string): void { @@ -297,28 +328,6 @@ export class AgentManager { } } - /** - * @deprecated Prefer running recursive topologies through SchemaDrivenExecutionEngine.runSession. - * This method remains available for internal orchestration and low-level tests. - */ - async runRecursiveAgent(input: { - sessionId: string; - depth: number; - signal?: AbortSignal; - run: ( - input: RecursiveRunContext, - ) => Promise> | RecursiveRunOutput; - childMiddleware?: RecursiveChildMiddleware; - }): Promise { - return this.runRecursiveNode({ - sessionId: input.sessionId, - depth: input.depth, - signal: input.signal, - run: input.run, - childMiddleware: input.childMiddleware, - }); - } - getLimits(): AgentManagerLimits { return { ...this.limits }; } diff --git a/src/agents/manifest.ts b/src/agents/manifest.ts index 30bc48c..cb58840 100644 --- a/src/agents/manifest.ts +++ b/src/agents/manifest.ts @@ -65,13 +65,7 @@ export type PipelineNode = { export type PipelineEdge = { from: string; to: string; - on?: - | "success" - | "validation_fail" - | "failure" - | "always" - | "onTaskComplete" - | "onValidationFail"; + on?: "success" | "validation_fail" | "failure" | "always"; event?: DomainEventType; when?: RouteCondition[]; }; @@ -278,8 +272,6 @@ function parsePipelineEdge(value: unknown): PipelineEdge { "validation_fail", "failure", "always", - "onTaskComplete", - "onValidationFail", ]; const rawOn = value.on; diff --git a/src/agents/pipeline.ts b/src/agents/pipeline.ts index fdf5d89..1f37c49 100644 --- a/src/agents/pipeline.ts +++ b/src/agents/pipeline.ts @@ -164,12 +164,6 @@ function shouldEdgeRun( if (edge.on === "failure" && status === "failure") { return true; } - if (edge.on === "onTaskComplete" && status === "success") { - return true; - } - if (edge.on === "onValidationFail" && status === "validation_fail") { - return true; - } return false; } @@ -598,8 +592,7 @@ export class PipelineExecutor { const maxRetriesForNode = this.getMaxRetriesForNode(node); try { - const output = await this.options.manager.runRecursiveAgent({ - sessionId: managerRunSessionId, + const output = await managerRunSession.runRecursive({ depth, signal, run: async ({ intent, depth: recursiveDepth, signal: recursiveSignal }) => { diff --git a/src/agents/state-context.ts b/src/agents/state-context.ts index df3a80d..0683576 100644 --- a/src/agents/state-context.ts +++ b/src/agents/state-context.ts @@ -209,7 +209,6 @@ export class FileSystemStateContextManager { patch: { flags?: Record; metadata?: JsonObject; - historyEvent?: SessionHistoryEntry; historyEvents?: SessionHistoryEntry[]; }, ): Promise { @@ -221,9 +220,6 @@ export class FileSystemStateContextManager { if (patch.metadata) { Object.assign(current.metadata, patch.metadata); } - if (patch.historyEvent) { - current.history.push(patch.historyEvent); - } if (patch.historyEvents && patch.historyEvents.length > 0) { current.history.push(...patch.historyEvents); } diff --git a/src/config.ts b/src/config.ts index 96c4f58..1a316b3 100644 --- a/src/config.ts +++ b/src/config.ts @@ -9,7 +9,6 @@ export type ProviderRuntimeConfig = { codexSkipGitCheck: boolean; anthropicOauthToken?: string; anthropicApiKey?: string; - anthropicToken?: string; claudeModel?: string; claudeCodePath?: string; }; @@ -211,24 +210,19 @@ function deepFreeze(value: T): Readonly { } export function resolveAnthropicToken( - provider: Pick, + provider: Pick, ): string | undefined { const oauthToken = provider.anthropicOauthToken?.trim(); if (oauthToken) { return oauthToken; } - const configuredToken = provider.anthropicToken?.trim(); - if (configuredToken) { - return configuredToken; - } - const apiKey = provider.anthropicApiKey?.trim(); return apiKey || undefined; } export function buildClaudeAuthEnv( - provider: Pick, + provider: Pick, ): Record { const oauthToken = provider.anthropicOauthToken?.trim(); if (oauthToken) { @@ -262,7 +256,6 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly | undefined { const merged = { - ...(server.http_headers ?? {}), ...(server.headers ?? {}), }; @@ -16,7 +15,7 @@ function mergeHeaders(server: SharedMcpServer): Record | undefin } export function normalizeSharedMcpServer(server: SharedMcpServer): SharedMcpServer { - const { headers: _headers, http_headers: _httpHeaders, ...rest } = server; + const { headers: _headers, ...rest } = server; const normalizedHeaders = mergeHeaders(server); return { diff --git a/src/mcp/types.ts b/src/mcp/types.ts index ea6e4be..597a711 100644 --- a/src/mcp/types.ts +++ b/src/mcp/types.ts @@ -30,7 +30,6 @@ export type SharedMcpServer = { startup_timeout_sec?: number; tool_timeout_sec?: number; bearer_token_env_var?: string; - http_headers?: Record; env_http_headers?: Record; env_vars?: string[]; handler?: string; @@ -111,7 +110,6 @@ export const sharedMcpServerSchema: z.ZodType = z startup_timeout_sec: nonNegativeFiniteNumberSchema.optional(), tool_timeout_sec: nonNegativeFiniteNumberSchema.optional(), bearer_token_env_var: nonEmptyStringSchema.optional(), - http_headers: stringRecordSchema.optional(), env_http_headers: stringRecordSchema.optional(), env_vars: stringArraySchema.optional(), handler: nonEmptyStringSchema.optional(), diff --git a/tests/agent-manager.test.ts b/tests/agent-manager.test.ts index d474b97..ac4219f 100644 --- a/tests/agent-manager.test.ts +++ b/tests/agent-manager.test.ts @@ -107,8 +107,7 @@ test("recursive fanout/fan-in avoids deadlock at maxConcurrentAgents=1", async ( const session = manager.createSession("recursive-deadlock"); const executionOrder: string[] = []; - const result = await manager.runRecursiveAgent({ - sessionId: session.id, + const result = await session.runRecursive({ depth: 0, run: async ({ sessionId, intent }) => { executionOrder.push(`${sessionId}:${intent?.task ?? "root"}`); @@ -151,8 +150,7 @@ test("rejects recursive child spawn above depth limit", async () => { await assert.rejects( () => - manager.runRecursiveAgent({ - sessionId: session.id, + session.runRecursive({ depth: 0, run: async ({ depth }) => { if (depth < 3) { @@ -195,8 +193,7 @@ test("closing parent session aborts active recursive work and releases child res let abortCount = 0; let releaseCount = 0; - const runPromise = manager.runRecursiveAgent({ - sessionId: session.id, + const runPromise = session.runRecursive({ depth: 0, run: async ({ intent, signal }) => { if (!intent) { @@ -286,8 +283,7 @@ test("recursive children can be isolated via middleware-backed suballocation", a const childLeases = new Map>>(); try { - await manager.runRecursiveAgent({ - sessionId: session.id, + await session.runRecursive({ depth: 0, run: async ({ intent, sessionId }) => { if (!intent) { diff --git a/tests/config.test.ts b/tests/config.test.ts index 1b2970c..2f061b4 100644 --- a/tests/config.test.ts +++ b/tests/config.test.ts @@ -37,7 +37,6 @@ test("prefers CLAUDE_CODE_OAUTH_TOKEN over ANTHROPIC_API_KEY", () => { assert.equal(config.provider.anthropicOauthToken, "oauth-token"); assert.equal(config.provider.anthropicApiKey, "api-key"); - assert.equal(config.provider.anthropicToken, "oauth-token"); assert.equal(resolveAnthropicToken(config.provider), "oauth-token"); const authEnv = buildClaudeAuthEnv(config.provider); @@ -52,7 +51,6 @@ test("falls back to ANTHROPIC_API_KEY when oauth token is absent", () => { assert.equal(config.provider.anthropicOauthToken, undefined); assert.equal(config.provider.anthropicApiKey, "api-key"); - assert.equal(config.provider.anthropicToken, "api-key"); assert.equal(resolveAnthropicToken(config.provider), "api-key"); const authEnv = buildClaudeAuthEnv(config.provider); diff --git a/tests/manifest-schema.test.ts b/tests/manifest-schema.test.ts index 9ee4aee..0bffa9f 100644 --- a/tests/manifest-schema.test.ts +++ b/tests/manifest-schema.test.ts @@ -117,3 +117,22 @@ test("rejects relationship cycles", () => { assert.throws(() => parseAgentManifest(manifest), /Relationship graph must be acyclic/); }); + +test("rejects legacy edge trigger aliases", () => { + const manifest = validManifest() as { + pipeline: { + edges: Array<{ from: string; to: string; on: string }>; + }; + }; + + manifest.pipeline.edges[0] = { + from: "product-node", + to: "coder-node", + on: "onValidationFail", + }; + + assert.throws( + () => parseAgentManifest(manifest), + /unsupported event "onValidationFail"/, + ); +}); diff --git a/tests/mcp-converters.test.ts b/tests/mcp-converters.test.ts index e6bdc4a..5b3bbf3 100644 --- a/tests/mcp-converters.test.ts +++ b/tests/mcp-converters.test.ts @@ -43,40 +43,35 @@ test("maps shared headers to codex http_headers", () => { }); }); -test("normalizes header aliases into a single headers object", () => { +test("normalizes headers into a single headers object", () => { const normalized = normalizeSharedMcpServer({ url: "http://localhost:3000/mcp", - http_headers: { - "X-Source": "legacy", - }, headers: { Authorization: "Bearer token", }, }); assert.deepEqual(normalized.headers, { - "X-Source": "legacy", Authorization: "Bearer token", }); - assert.equal("http_headers" in normalized, false); }); -test("maps legacy http_headers alias for claude conversion", () => { - const claudeConfig = toClaudeServerConfig("legacy-http-headers", { - type: "http", - url: "http://localhost:3000/mcp", - http_headers: { - Authorization: "Bearer token", - }, - }); - - assert.deepEqual(claudeConfig, { - type: "http", - url: "http://localhost:3000/mcp", - headers: { - Authorization: "Bearer token", - }, - }); +test("rejects legacy shared MCP header alias http_headers", () => { + assert.throws( + () => + parseMcpConfig({ + servers: { + legacy: { + type: "http", + url: "http://localhost:3000/mcp", + http_headers: { + Authorization: "Bearer token", + }, + }, + }, + }), + /unrecognized key/i, + ); }); test("throws for claude http server without url", () => { diff --git a/tests/orchestration-engine.test.ts b/tests/orchestration-engine.test.ts index 3de3d8c..195c19e 100644 --- a/tests/orchestration-engine.test.ts +++ b/tests/orchestration-engine.test.ts @@ -135,7 +135,7 @@ function createManifest(): unknown { when: [ { type: "history_has_event", - event: "validation_fail", + event: "validation_failed", }, ], },