import { randomUUID } from "node:crypto"; import { mkdir, readFile, writeFile } from "node:fs/promises"; import { resolve } from "node:path"; import { JSONFilePreset } from "lowdb/node"; import { SchemaDrivenExecutionEngine } from "../agents/orchestration.js"; import { parseAgentManifest, type AgentManifest } from "../agents/manifest.js"; import { FileSystemProjectContextStore } from "../agents/project-context.js"; import type { ActorExecutionResult, ActorExecutor, PipelineAggregateStatus, } from "../agents/pipeline.js"; import { FileSystemSessionMetadataStore, SessionWorktreeManager, type SessionMetadata, } from "../agents/session-lifecycle.js"; import { loadConfig, type AppConfig } from "../config.js"; import { parseEnvFile } from "../store/env-store.js"; import { createProviderActorExecutor, createProviderRunRuntime, type RunProvider, } from "../agents/provider-executor.js"; const RUN_META_FILE_NAME = "ui-run-meta.json"; export type RunStatus = "running" | "success" | "failure" | "cancelled"; export type RunExecutionMode = "mock" | "provider"; export type StartRunInput = { prompt: string; manifest: unknown; sessionId?: string; manifestPath?: string; topologyHint?: string; initialFlags?: Record; runtimeContextOverrides?: Record; simulateValidationNodeIds?: string[]; executionMode?: RunExecutionMode; provider?: RunProvider; }; export type RunRecord = { runId: string; sessionId: string; status: RunStatus; startedAt: string; endedAt?: string; manifestPath?: string; topologyHint?: string; executionMode: RunExecutionMode; provider: RunProvider; error?: string; }; function toRunStatus(status: PipelineAggregateStatus): Extract { return status === "success" ? "success" : "failure"; } type ActiveRun = { controller: AbortController; record: RunRecord; promise: Promise; }; function toSessionId(): string { return `ui-session-${Date.now().toString(36)}-${randomUUID().slice(0, 8)}`; } function dedupeStrings(values: readonly string[]): string[] { const output: string[] = []; const seen = new Set(); for (const value of values) { const normalized = value.trim(); if (!normalized || seen.has(normalized)) { continue; } seen.add(normalized); output.push(normalized); } return output; } async function waitWithSignal(ms: number, signal: AbortSignal): Promise { if (signal.aborted) { throw signal.reason instanceof Error ? signal.reason : new Error(String(signal.reason ?? "Run aborted.")); } await new Promise((resolveWait, rejectWait) => { const timeout = setTimeout(() => { signal.removeEventListener("abort", onAbort); resolveWait(); }, ms); function onAbort(): void { clearTimeout(timeout); signal.removeEventListener("abort", onAbort); rejectWait( signal.reason instanceof Error ? signal.reason : new Error(String(signal.reason ?? "Run aborted.")), ); } signal.addEventListener("abort", onAbort, { once: true }); }); } function estimateUsage(prompt: string, toolCount: number): { tokenInput: number; tokenOutput: number; durationMs: number; costUsd: number; } { const tokenInput = Math.max(1, Math.ceil(prompt.length / 4)); const tokenOutput = Math.max(16, Math.ceil(tokenInput * 0.7)); const durationMs = 220 + (tokenInput % 11) * 35 + toolCount * 20; const tokenTotal = tokenInput + tokenOutput; const costUsd = Number((tokenTotal * 0.000002).toFixed(6)); return { tokenInput, tokenOutput, durationMs, costUsd, }; } function extractSubtasks(prompt: string): string[] { const sentences = prompt .split(/[.!?\n]/) .map((part) => part.trim()) .filter((part) => part.length > 0) .slice(0, 3); if (sentences.length > 0) { return sentences; } const words = prompt.split(/\s+/).filter((word) => word.length > 0).slice(0, 3); return words.length > 0 ? [words.join(" ")] : []; } function createMockActorExecutors( manifest: AgentManifest, input: { prompt: string; topologyHint?: string; simulateValidationNodeIds: Set; }, ): Record { const attemptsByNode = new Map(); const uniqueActorIds = dedupeStrings(manifest.pipeline.nodes.map((node) => node.actorId)); const execute: ActorExecutor = async (actorInput) => { const attempt = (attemptsByNode.get(actorInput.node.id) ?? 0) + 1; attemptsByNode.set(actorInput.node.id, attempt); const shouldValidationFail = attempt === 1 && input.simulateValidationNodeIds.has(actorInput.node.id); const usage = estimateUsage(actorInput.prompt, actorInput.executionContext.allowedTools.length); await waitWithSignal(Math.min(usage.durationMs, 900), actorInput.signal); if (shouldValidationFail) { const failure: ActorExecutionResult = { status: "validation_fail", payload: { summary: `Node ${actorInput.node.id} requires remediation on first pass.`, subtasks: extractSubtasks(input.prompt), security_violation: false, }, stateMetadata: { usage: { ...usage, tokenTotal: usage.tokenInput + usage.tokenOutput, toolCalls: actorInput.executionContext.allowedTools.length, }, topologyHint: input.topologyHint ?? "manifest-default", }, failureKind: "soft", failureCode: "ui_mock_validation_required", }; return failure; } return { status: "success", payload: { summary: `Node ${actorInput.node.id} completed in mock mode.`, prompt: input.prompt, subtasks: extractSubtasks(input.prompt), }, stateMetadata: { usage: { ...usage, tokenTotal: usage.tokenInput + usage.tokenOutput, toolCalls: actorInput.executionContext.allowedTools.length, }, topologyHint: input.topologyHint ?? "manifest-default", }, stateFlags: { [`${actorInput.node.id}_completed`]: true, }, }; }; const executors: Record = {}; for (const actorId of uniqueActorIds) { executors[actorId] = execute; } return executors; } function createSingleExecutorMap(manifest: AgentManifest, executor: ActorExecutor): Record { const uniqueActorIds = dedupeStrings(manifest.pipeline.nodes.map((node) => node.actorId)); const executors: Record = {}; for (const actorId of uniqueActorIds) { executors[actorId] = executor; } return executors; } function toAbortErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; } return String(error); } function isAbort(error: unknown): boolean { if (!(error instanceof Error)) { return false; } return error.name === "AbortError" || error.message.toLowerCase().includes("abort"); } async function loadRuntimeConfig(envPath: string): Promise> { const parsed = await parseEnvFile(envPath); return loadConfig({ ...process.env, ...parsed.values, }); } function resolveRuntimePaths(input: { workspaceRoot: string; config: Readonly; }): { stateRoot: string; worktreeRoot: string; } { return { stateRoot: resolve(input.workspaceRoot, input.config.orchestration.stateRoot), worktreeRoot: resolve(input.workspaceRoot, input.config.provisioning.gitWorktree.rootDirectory), }; } async function writeRunMeta(input: { stateRoot: string; sessionId: string; run: RunRecord; }): Promise { const sessionDirectory = resolve(input.stateRoot, input.sessionId); await mkdir(sessionDirectory, { recursive: true }); const path = resolve(sessionDirectory, RUN_META_FILE_NAME); const db = await JSONFilePreset(path, input.run); db.data = input.run; await db.write(); } export async function readRunMetaBySession(input: { stateRoot: string; sessionId: string; }): Promise { const path = resolve(input.stateRoot, input.sessionId, RUN_META_FILE_NAME); try { const content = await readFile(path, "utf8"); const parsed = JSON.parse(content) as unknown; if (!parsed || typeof parsed !== "object") { return undefined; } const record = parsed as Partial; if ( typeof record.runId !== "string" || typeof record.sessionId !== "string" || typeof record.status !== "string" || typeof record.startedAt !== "string" ) { return undefined; } const normalized: RunRecord = { runId: record.runId, sessionId: record.sessionId, status: record.status === "running" || record.status === "success" || record.status === "failure" || record.status === "cancelled" ? record.status : "failure", startedAt: record.startedAt, executionMode: record.executionMode === "provider" || record.executionMode === "mock" ? record.executionMode : "mock", provider: record.provider === "claude" || record.provider === "codex" ? record.provider : "codex", ...(typeof record.endedAt === "string" ? { endedAt: record.endedAt } : {}), ...(typeof record.manifestPath === "string" ? { manifestPath: record.manifestPath } : {}), ...(typeof record.topologyHint === "string" ? { topologyHint: record.topologyHint } : {}), ...(typeof record.error === "string" ? { error: record.error } : {}), }; return normalized; } catch (error) { if ((error as NodeJS.ErrnoException).code === "ENOENT") { return undefined; } throw error; } } export class UiRunService { private readonly workspaceRoot: string; private readonly envFilePath: string; private readonly activeRuns = new Map(); private readonly runHistory = new Map(); constructor(input: { workspaceRoot: string; envFilePath?: string; }) { this.workspaceRoot = resolve(input.workspaceRoot); this.envFilePath = resolve(this.workspaceRoot, input.envFilePath ?? ".env"); } private async loadRuntime(): Promise<{ config: Readonly; stateRoot: string; sessionStore: FileSystemSessionMetadataStore; worktreeManager: SessionWorktreeManager; }> { const config = await loadRuntimeConfig(this.envFilePath); const paths = resolveRuntimePaths({ workspaceRoot: this.workspaceRoot, config, }); return { config, stateRoot: paths.stateRoot, sessionStore: new FileSystemSessionMetadataStore({ stateRoot: paths.stateRoot, }), worktreeManager: new SessionWorktreeManager({ worktreeRoot: paths.worktreeRoot, baseRef: config.provisioning.gitWorktree.baseRef, targetPath: config.provisioning.gitWorktree.targetPath, }), }; } async createSession(input: { projectPath: string; sessionId?: string; }): Promise { const runtime = await this.loadRuntime(); const sessionId = input.sessionId?.trim() || toSessionId(); const baseWorkspacePath = runtime.worktreeManager.resolveBaseWorkspacePath(sessionId); const session = await runtime.sessionStore.createSession({ sessionId, projectPath: resolve(input.projectPath), baseWorkspacePath, }); await runtime.worktreeManager.initializeSessionBaseWorkspace({ sessionId: session.sessionId, projectPath: session.projectPath, baseWorkspacePath: session.baseWorkspacePath, }); return session; } async listSessions(): Promise { const runtime = await this.loadRuntime(); return runtime.sessionStore.listSessions(); } async readSession(sessionId: string): Promise { const runtime = await this.loadRuntime(); return runtime.sessionStore.readSession(sessionId); } async closeSession(input: { sessionId: string; mergeToProject?: boolean; }): Promise { const runtime = await this.loadRuntime(); const session = await runtime.sessionStore.readSession(input.sessionId); if (!session) { throw new Error(`Session \"${input.sessionId}\" does not exist.`); } const sessionProjectContextStore = new FileSystemProjectContextStore({ filePath: runtime.sessionStore.getSessionProjectContextPath(session.sessionId), }); const projectContext = await sessionProjectContextStore.readState(); const taskWorktreePaths = projectContext.taskQueue .map((task) => task.worktreePath) .filter((path): path is string => typeof path === "string" && path.trim().length > 0); const outcome = await runtime.worktreeManager.closeSession({ session, taskWorktreePaths, mergeBaseIntoProject: input.mergeToProject === true, }); if (outcome.kind === "fatal_error") { throw new Error(`Session close failed: ${outcome.error}`); } if (outcome.kind === "conflict") { return runtime.sessionStore.updateSession(session.sessionId, { sessionStatus: "closed_with_conflicts", }); } return runtime.sessionStore.updateSession(session.sessionId, { sessionStatus: "closed", }); } listRuns(): RunRecord[] { const output = [...this.runHistory.values()].sort((left, right) => { return right.startedAt.localeCompare(left.startedAt); }); return output; } getRun(runId: string): RunRecord | undefined { return this.runHistory.get(runId); } async startRun(input: StartRunInput): Promise { const runtime = await this.loadRuntime(); const config = runtime.config; const manifest = parseAgentManifest(input.manifest); const executionMode = input.executionMode ?? "mock"; const provider = input.provider ?? "codex"; const sessionId = input.sessionId?.trim() || toSessionId(); const session = input.sessionId?.trim() ? await runtime.sessionStore.readSession(sessionId) : undefined; if (input.sessionId?.trim() && !session) { throw new Error(`Session \"${sessionId}\" does not exist.`); } if ( session && (session.sessionStatus === "closed" || session.sessionStatus === "closed_with_conflicts") ) { throw new Error(`Session \"${sessionId}\" is closed and cannot run new tasks.`); } const runId = randomUUID(); const controller = new AbortController(); const record: RunRecord = { runId, sessionId, status: "running", startedAt: new Date().toISOString(), executionMode, provider, ...(input.manifestPath ? { manifestPath: input.manifestPath } : {}), ...(input.topologyHint ? { topologyHint: input.topologyHint } : {}), }; this.runHistory.set(runId, record); const runPromise = (async () => { let providerRuntime: Awaited> | undefined; try { if (executionMode === "provider") { providerRuntime = await createProviderRunRuntime({ provider, config, observabilityRootPath: this.workspaceRoot, baseEnv: process.env, }); } const actorExecutors = executionMode === "provider" && providerRuntime ? createSingleExecutorMap(manifest, createProviderActorExecutor(providerRuntime)) : createMockActorExecutors(manifest, { prompt: input.prompt, topologyHint: input.topologyHint, simulateValidationNodeIds: new Set(input.simulateValidationNodeIds ?? []), }); const engine = new SchemaDrivenExecutionEngine({ manifest, actorExecutors, settings: { workspaceRoot: this.workspaceRoot, stateRoot: runtime.stateRoot, projectContextPath: session ? runtime.sessionStore.getSessionProjectContextPath(sessionId) : resolve(this.workspaceRoot, config.orchestration.projectContextPath), runtimeContext: { ui_mode: executionMode, run_provider: provider, ...(session ? { session_id: sessionId, project_path: session.projectPath, base_workspace_path: session.baseWorkspacePath, } : {}), ...(input.runtimeContextOverrides ?? {}), }, }, config, }); await writeRunMeta({ stateRoot: runtime.stateRoot, sessionId, run: record, }); const summary = await engine.runSession({ sessionId, initialPayload: { prompt: input.prompt, }, initialState: { flags: { ...(input.initialFlags ?? {}), }, }, signal: controller.signal, ...(session ? { sessionMetadata: session } : {}), }); const completedRecord = this.runHistory.get(runId); if (!completedRecord) { return; } const next: RunRecord = { ...completedRecord, status: toRunStatus(summary.status), endedAt: new Date().toISOString(), }; this.runHistory.set(runId, next); await writeRunMeta({ stateRoot: runtime.stateRoot, sessionId, run: next, }); } catch (error) { const current = this.runHistory.get(runId); if (!current) { return; } const cancelled = controller.signal.aborted || isAbort(error); const next: RunRecord = { ...current, status: cancelled ? "cancelled" : "failure", endedAt: new Date().toISOString(), error: toAbortErrorMessage(error), }; this.runHistory.set(runId, next); await writeRunMeta({ stateRoot: runtime.stateRoot, sessionId, run: next, }); } finally { if (providerRuntime) { await providerRuntime.close(); } this.activeRuns.delete(runId); } })(); this.activeRuns.set(runId, { controller, record, promise: runPromise, }); return record; } async cancelRun(runId: string): Promise { const active = this.activeRuns.get(runId); if (!active) { const existing = this.runHistory.get(runId); if (!existing) { throw new Error(`Run \"${runId}\" does not exist.`); } return existing; } active.controller.abort(new Error("Cancelled by operator from UI kill switch.")); await active.promise; const finalRecord = this.runHistory.get(runId); if (!finalRecord) { throw new Error(`Run \"${runId}\" cancellation did not produce a final record.`); } return finalRecord; } }