Files
ai_ops/src/ui/provider-executor.ts
2026-02-24 18:57:20 -05:00

699 lines
18 KiB
TypeScript

import { Codex } from "@openai/codex-sdk";
import { query, type Options, type SDKMessage } from "@anthropic-ai/claude-agent-sdk";
import {
buildClaudeAuthEnv,
resolveAnthropicToken,
resolveOpenAiApiKey,
type AppConfig,
} from "../config.js";
import { isDomainEventType, type DomainEventEmission } from "../agents/domain-events.js";
import type { ActorExecutionInput, ActorExecutionResult, ActorExecutor } from "../agents/pipeline.js";
import { isRecord, type JsonObject, type JsonValue } from "../agents/types.js";
import { ClaudeObservabilityLogger } from "./claude-observability.js";
export type RunProvider = "codex" | "claude";
export type ProviderRunRuntime = {
provider: RunProvider;
config: Readonly<AppConfig>;
sharedEnv: Record<string, string>;
claudeObservability: ClaudeObservabilityLogger;
close: () => Promise<void>;
};
type ProviderUsage = {
tokenInput?: number;
tokenOutput?: number;
tokenTotal?: number;
durationMs?: number;
costUsd?: number;
};
function sanitizeEnv(input: Record<string, string | undefined>): Record<string, string> {
const output: Record<string, string> = {};
for (const [key, value] of Object.entries(input)) {
if (typeof value === "string") {
output[key] = value;
}
}
return output;
}
const ACTOR_RESPONSE_SCHEMA = {
type: "object",
additionalProperties: true,
properties: {
status: {
type: "string",
enum: ["success", "validation_fail", "failure"],
},
payload: {
type: "object",
},
stateFlags: {
type: "object",
additionalProperties: {
type: "boolean",
},
},
stateMetadata: {
type: "object",
},
events: {
type: "array",
items: {
type: "object",
additionalProperties: true,
},
},
failureKind: {
type: "string",
enum: ["soft", "hard"],
},
failureCode: {
type: "string",
},
},
required: ["status"],
};
const CLAUDE_OUTPUT_FORMAT = {
type: "json_schema",
name: "actor_execution_result",
schema: ACTOR_RESPONSE_SCHEMA,
} as const;
function toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
export function resolveProviderWorkingDirectory(actorInput: ActorExecutionInput): string {
return actorInput.executionContext.security.worktreePath;
}
export function buildProviderRuntimeEnv(input: {
runtime: ProviderRunRuntime;
actorInput: ActorExecutionInput;
includeClaudeAuth?: boolean;
}): Record<string, string> {
const workingDirectory = resolveProviderWorkingDirectory(input.actorInput);
return sanitizeEnv({
...input.runtime.sharedEnv,
...(input.includeClaudeAuth ? buildClaudeAuthEnv(input.runtime.config.provider) : {}),
AGENT_WORKTREE_PATH: workingDirectory,
});
}
function toJsonValue(value: unknown): JsonValue {
return JSON.parse(JSON.stringify(value)) as JsonValue;
}
function toJsonObject(value: unknown): JsonObject | undefined {
if (!isRecord(value)) {
return undefined;
}
try {
const cloned = toJsonValue(value);
if (!isRecord(cloned)) {
return undefined;
}
return cloned as JsonObject;
} catch {
return undefined;
}
}
function toBooleanRecord(value: unknown): Record<string, boolean> | undefined {
if (!isRecord(value)) {
return undefined;
}
const output: Record<string, boolean> = {};
for (const [key, candidate] of Object.entries(value)) {
if (typeof candidate === "boolean") {
output[key] = candidate;
}
}
return Object.keys(output).length > 0 ? output : undefined;
}
function toEventEmissions(value: unknown): DomainEventEmission[] | undefined {
if (!Array.isArray(value)) {
return undefined;
}
const output: DomainEventEmission[] = [];
for (const item of value) {
if (!isRecord(item)) {
continue;
}
const type = item.type;
if (typeof type !== "string" || !isDomainEventType(type)) {
continue;
}
const payload = toJsonObject(item.payload);
output.push({
type,
...(payload ? { payload } : {}),
});
}
return output.length > 0 ? output : undefined;
}
function extractJsonFromFencedBlock(text: string): unknown {
const matches = text.match(/```(?:json)?\s*([\s\S]*?)```/i);
if (!matches || !matches[1]) {
return undefined;
}
try {
return JSON.parse(matches[1]);
} catch {
return undefined;
}
}
function extractFirstBalancedJsonObject(text: string): unknown {
const start = text.indexOf("{");
if (start < 0) {
return undefined;
}
let depth = 0;
let inString = false;
let escaped = false;
for (let index = start; index < text.length; index += 1) {
const character = text[index];
if (!character) {
continue;
}
if (inString) {
if (escaped) {
escaped = false;
} else if (character === "\\") {
escaped = true;
} else if (character === '"') {
inString = false;
}
continue;
}
if (character === '"') {
inString = true;
continue;
}
if (character === "{") {
depth += 1;
continue;
}
if (character === "}") {
depth -= 1;
if (depth === 0) {
const candidate = text.slice(start, index + 1);
try {
return JSON.parse(candidate);
} catch {
return undefined;
}
}
}
}
return undefined;
}
function tryParseResponseObject(rawText: string, structuredOutput?: unknown): unknown {
if (structuredOutput !== undefined) {
return structuredOutput;
}
const trimmed = rawText.trim();
if (!trimmed) {
return undefined;
}
try {
return JSON.parse(trimmed);
} catch {
// fall through
}
const fenced = extractJsonFromFencedBlock(trimmed);
if (fenced !== undefined) {
return fenced;
}
return extractFirstBalancedJsonObject(trimmed);
}
function ensureUsageMetadata(input: {
result: ActorExecutionResult;
providerUsage: ProviderUsage;
}): ActorExecutionResult {
const stateMetadata = toJsonObject(input.result.stateMetadata) ?? {};
const existingUsage = toJsonObject(stateMetadata.usage) ?? {};
const usage: JsonObject = {
...existingUsage,
...(typeof input.providerUsage.tokenInput === "number"
? { tokenInput: input.providerUsage.tokenInput }
: {}),
...(typeof input.providerUsage.tokenOutput === "number"
? { tokenOutput: input.providerUsage.tokenOutput }
: {}),
...(typeof input.providerUsage.tokenTotal === "number"
? { tokenTotal: input.providerUsage.tokenTotal }
: {}),
...(typeof input.providerUsage.durationMs === "number"
? { durationMs: input.providerUsage.durationMs }
: {}),
...(typeof input.providerUsage.costUsd === "number"
? { costUsd: input.providerUsage.costUsd }
: {}),
};
return {
...input.result,
stateMetadata: {
...stateMetadata,
usage,
},
};
}
export function parseActorExecutionResultFromModelOutput(input: {
rawText: string;
structuredOutput?: unknown;
}): ActorExecutionResult {
const parsed = tryParseResponseObject(input.rawText, input.structuredOutput);
if (!isRecord(parsed)) {
return {
status: "success",
payload: {
assistantResponse: input.rawText.trim(),
},
};
}
const status = parsed.status;
if (status !== "success" && status !== "validation_fail" && status !== "failure") {
return {
status: "success",
payload: {
assistantResponse: input.rawText.trim(),
},
};
}
const payload = toJsonObject(parsed.payload) ?? {
assistantResponse: input.rawText.trim(),
};
const stateMetadata = toJsonObject(parsed.stateMetadata);
const stateFlags = toBooleanRecord(parsed.stateFlags);
const events = toEventEmissions(parsed.events);
const failureKind = parsed.failureKind === "soft" || parsed.failureKind === "hard"
? parsed.failureKind
: undefined;
const failureCode = typeof parsed.failureCode === "string"
? parsed.failureCode
: undefined;
return {
status,
payload,
...(stateFlags ? { stateFlags } : {}),
...(stateMetadata ? { stateMetadata } : {}),
...(events ? { events } : {}),
...(failureKind ? { failureKind } : {}),
...(failureCode ? { failureCode } : {}),
};
}
function buildActorPrompt(input: ActorExecutionInput): string {
const recentHistory = input.context.state.history.slice(-15);
return [
"You are executing one orchestration node in a schema-driven DAG runtime.",
"Return ONLY JSON with this object shape:",
JSON.stringify(
{
status: "success | validation_fail | failure",
payload: {},
stateFlags: {
optional_boolean_flag: true,
},
stateMetadata: {
optional_metadata: "value",
},
events: [
{
type: "requirements_defined | tasks_planned | code_committed | task_ready_for_review | task_blocked | validation_passed | validation_failed | branch_merged | merge_conflict_detected | merge_conflict_resolved | merge_conflict_unresolved | merge_retry_started",
payload: {
summary: "optional",
details: {},
errorCode: "optional",
artifactPointer: "optional",
},
},
],
failureKind: "soft | hard",
failureCode: "optional",
},
null,
2,
),
"Do not include markdown or extra explanation outside JSON.",
`Node Prompt:\n${input.prompt}`,
`Execution Context:\n${JSON.stringify(input.executionContext, null, 2)}`,
`Current Handoff Payload:\n${JSON.stringify(input.context.handoff.payload, null, 2)}`,
`Session Flags:\n${JSON.stringify(input.context.state.flags, null, 2)}`,
`Recent Domain History:\n${JSON.stringify(recentHistory, null, 2)}`,
].join("\n\n");
}
async function runCodexActor(input: {
runtime: ProviderRunRuntime;
actorInput: ActorExecutionInput;
}): Promise<ActorExecutionResult> {
const { runtime, actorInput } = input;
const prompt = buildActorPrompt(actorInput);
const startedAt = Date.now();
const apiKey = resolveOpenAiApiKey(runtime.config.provider);
const workingDirectory = resolveProviderWorkingDirectory(actorInput);
const codex = new Codex({
...(apiKey ? { apiKey } : {}),
...(runtime.config.provider.openAiBaseUrl
? { baseUrl: runtime.config.provider.openAiBaseUrl }
: {}),
...(actorInput.mcp.resolvedConfig.codexConfig
? { config: actorInput.mcp.resolvedConfig.codexConfig }
: {}),
env: buildProviderRuntimeEnv({
runtime,
actorInput,
}),
});
const thread = codex.startThread({
workingDirectory,
skipGitRepoCheck: runtime.config.provider.codexSkipGitCheck,
});
const turn = await thread.run(prompt, {
signal: actorInput.signal,
outputSchema: ACTOR_RESPONSE_SCHEMA,
});
const usage: ProviderUsage = {
...(turn.usage
? {
tokenInput: turn.usage.input_tokens + turn.usage.cached_input_tokens,
tokenOutput: turn.usage.output_tokens,
tokenTotal: turn.usage.input_tokens + turn.usage.cached_input_tokens + turn.usage.output_tokens,
}
: {}),
durationMs: Date.now() - startedAt,
};
const parsed = parseActorExecutionResultFromModelOutput({
rawText: turn.finalResponse,
});
return ensureUsageMetadata({
result: parsed,
providerUsage: usage,
});
}
type ClaudeTurnResult = {
text: string;
structuredOutput?: unknown;
usage: ProviderUsage;
};
function toClaudeTraceContext(actorInput: ActorExecutionInput): {
sessionId: string;
nodeId: string;
attempt: number;
depth: number;
} {
return {
sessionId: actorInput.sessionId,
nodeId: actorInput.node.id,
attempt: actorInput.attempt,
depth: actorInput.depth,
};
}
function toProviderUsageJson(usage: ProviderUsage): JsonObject {
const output: JsonObject = {};
if (typeof usage.tokenInput === "number") {
output.tokenInput = usage.tokenInput;
}
if (typeof usage.tokenOutput === "number") {
output.tokenOutput = usage.tokenOutput;
}
if (typeof usage.tokenTotal === "number") {
output.tokenTotal = usage.tokenTotal;
}
if (typeof usage.durationMs === "number") {
output.durationMs = usage.durationMs;
}
if (typeof usage.costUsd === "number") {
output.costUsd = usage.costUsd;
}
return output;
}
function buildClaudeOptions(input: {
runtime: ProviderRunRuntime;
actorInput: ActorExecutionInput;
}): Options {
const { runtime, actorInput } = input;
const workingDirectory = resolveProviderWorkingDirectory(actorInput);
const authOptionOverrides = runtime.config.provider.anthropicOauthToken
? { authToken: runtime.config.provider.anthropicOauthToken }
: (() => {
const token = resolveAnthropicToken(runtime.config.provider);
return token ? { apiKey: token } : {};
})();
const runtimeEnv = buildProviderRuntimeEnv({
runtime,
actorInput,
includeClaudeAuth: true,
});
const traceContext = toClaudeTraceContext(actorInput);
return {
maxTurns: runtime.config.provider.claudeMaxTurns,
...(runtime.config.provider.claudeModel
? { model: runtime.config.provider.claudeModel }
: {}),
...(runtime.config.provider.claudeCodePath
? { pathToClaudeCodeExecutable: runtime.config.provider.claudeCodePath }
: {}),
...authOptionOverrides,
...(actorInput.mcp.resolvedConfig.claudeMcpServers
? { mcpServers: actorInput.mcp.resolvedConfig.claudeMcpServers as Options["mcpServers"] }
: {}),
canUseTool: actorInput.mcp.createClaudeCanUseTool(),
cwd: workingDirectory,
env: runtimeEnv,
...runtime.claudeObservability.toOptionOverrides({
context: traceContext,
}),
outputFormat: CLAUDE_OUTPUT_FORMAT,
};
}
async function runClaudeTurn(input: {
runtime: ProviderRunRuntime;
actorInput: ActorExecutionInput;
prompt: string;
}): Promise<ClaudeTurnResult> {
const traceContext = toClaudeTraceContext(input.actorInput);
const options = buildClaudeOptions({
runtime: input.runtime,
actorInput: input.actorInput,
});
input.runtime.claudeObservability.recordQueryStarted({
context: traceContext,
data: {
...(options.model ? { model: options.model } : {}),
maxTurns: options.maxTurns ?? input.runtime.config.provider.claudeMaxTurns,
...(typeof options.cwd === "string" ? { cwd: options.cwd } : {}),
},
});
const startedAt = Date.now();
const stream = query({
prompt: input.prompt,
options,
});
let resultText = "";
let structuredOutput: unknown;
let usage: ProviderUsage = {};
let messageCount = 0;
const onAbort = (): void => {
stream.close();
};
input.actorInput.signal.addEventListener("abort", onAbort, { once: true });
try {
for await (const message of stream as AsyncIterable<SDKMessage>) {
messageCount += 1;
input.runtime.claudeObservability.recordMessage({
context: traceContext,
message,
});
if (message.type !== "result") {
continue;
}
if (message.subtype !== "success") {
const detail = message.errors.join("; ");
throw new Error(
`Claude query failed (${message.subtype})${detail ? `: ${detail}` : ""}`,
);
}
resultText = message.result.trim();
structuredOutput = message.structured_output;
usage = {
tokenInput: message.usage.input_tokens,
tokenOutput: message.usage.output_tokens,
tokenTotal: message.usage.input_tokens + message.usage.output_tokens,
durationMs: message.duration_ms,
costUsd: message.total_cost_usd,
};
}
} catch (error) {
input.runtime.claudeObservability.recordQueryError({
context: traceContext,
error,
});
throw error;
} finally {
input.actorInput.signal.removeEventListener("abort", onAbort);
stream.close();
}
if (!resultText && structuredOutput !== undefined) {
resultText = JSON.stringify(structuredOutput);
}
if (!resultText) {
const error = new Error("Claude run completed without a final result.");
input.runtime.claudeObservability.recordQueryError({
context: traceContext,
error,
});
throw error;
}
input.runtime.claudeObservability.recordQueryCompleted({
context: traceContext,
data: {
messageCount,
usage: toProviderUsageJson(usage),
},
});
return {
text: resultText,
structuredOutput,
usage: {
...usage,
durationMs: usage.durationMs ?? Date.now() - startedAt,
},
};
}
async function runClaudeActor(input: {
runtime: ProviderRunRuntime;
actorInput: ActorExecutionInput;
}): Promise<ActorExecutionResult> {
const prompt = buildActorPrompt(input.actorInput);
const turn = await runClaudeTurn({
runtime: input.runtime,
actorInput: input.actorInput,
prompt,
});
const parsed = parseActorExecutionResultFromModelOutput({
rawText: turn.text,
structuredOutput: turn.structuredOutput,
});
return ensureUsageMetadata({
result: parsed,
providerUsage: turn.usage,
});
}
export async function createProviderRunRuntime(input: {
provider: RunProvider;
config: Readonly<AppConfig>;
observabilityRootPath?: string;
baseEnv?: Record<string, string | undefined>;
}): Promise<ProviderRunRuntime> {
const claudeObservability = new ClaudeObservabilityLogger({
workspaceRoot: input.observabilityRootPath ?? process.cwd(),
config: input.config.provider.claudeObservability,
});
return {
provider: input.provider,
config: input.config,
sharedEnv: sanitizeEnv(input.baseEnv ?? process.env),
claudeObservability,
close: async () => claudeObservability.close(),
};
}
export function createProviderActorExecutor(runtime: ProviderRunRuntime): ActorExecutor {
return async (actorInput) => {
try {
if (runtime.provider === "codex") {
return await runCodexActor({
runtime,
actorInput,
});
}
return await runClaudeActor({
runtime,
actorInput,
});
} catch (error) {
return {
status: "failure",
payload: {
error: toErrorMessage(error),
},
failureKind: "hard",
failureCode: `provider_${runtime.provider}_execution_error`,
};
}
};
}