Refactor UI modules and harden run/API behavior
This commit is contained in:
487
src/runs/run-service.ts
Normal file
487
src/runs/run-service.ts
Normal file
@@ -0,0 +1,487 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { mkdir, readFile, writeFile } from "node:fs/promises";
|
||||
import { resolve } from "node:path";
|
||||
import { SchemaDrivenExecutionEngine } from "../agents/orchestration.js";
|
||||
import { parseAgentManifest, type AgentManifest } from "../agents/manifest.js";
|
||||
import type {
|
||||
ActorExecutionResult,
|
||||
ActorExecutor,
|
||||
PipelineAggregateStatus,
|
||||
} from "../agents/pipeline.js";
|
||||
import { loadConfig, type AppConfig } from "../config.js";
|
||||
import { parseEnvFile } from "../store/env-store.js";
|
||||
import {
|
||||
createProviderActorExecutor,
|
||||
createProviderRunRuntime,
|
||||
type RunProvider,
|
||||
} from "../agents/provider-executor.js";
|
||||
|
||||
const RUN_META_FILE_NAME = "ui-run-meta.json";
|
||||
|
||||
export type RunStatus = "running" | "success" | "failure" | "cancelled";
|
||||
export type RunExecutionMode = "mock" | "provider";
|
||||
|
||||
export type StartRunInput = {
|
||||
prompt: string;
|
||||
manifest: unknown;
|
||||
sessionId?: string;
|
||||
manifestPath?: string;
|
||||
topologyHint?: string;
|
||||
initialFlags?: Record<string, boolean>;
|
||||
runtimeContextOverrides?: Record<string, string | number | boolean>;
|
||||
simulateValidationNodeIds?: string[];
|
||||
executionMode?: RunExecutionMode;
|
||||
provider?: RunProvider;
|
||||
};
|
||||
|
||||
export type RunRecord = {
|
||||
runId: string;
|
||||
sessionId: string;
|
||||
status: RunStatus;
|
||||
startedAt: string;
|
||||
endedAt?: string;
|
||||
manifestPath?: string;
|
||||
topologyHint?: string;
|
||||
executionMode: RunExecutionMode;
|
||||
provider: RunProvider;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
function toRunStatus(status: PipelineAggregateStatus): Extract<RunStatus, "success" | "failure"> {
|
||||
return status === "success" ? "success" : "failure";
|
||||
}
|
||||
|
||||
type ActiveRun = {
|
||||
controller: AbortController;
|
||||
record: RunRecord;
|
||||
promise: Promise<void>;
|
||||
};
|
||||
|
||||
function toSessionId(): string {
|
||||
return `ui-session-${Date.now().toString(36)}-${randomUUID().slice(0, 8)}`;
|
||||
}
|
||||
|
||||
function dedupeStrings(values: readonly string[]): string[] {
|
||||
const output: string[] = [];
|
||||
const seen = new Set<string>();
|
||||
for (const value of values) {
|
||||
const normalized = value.trim();
|
||||
if (!normalized || seen.has(normalized)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(normalized);
|
||||
output.push(normalized);
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
async function waitWithSignal(ms: number, signal: AbortSignal): Promise<void> {
|
||||
if (signal.aborted) {
|
||||
throw signal.reason instanceof Error
|
||||
? signal.reason
|
||||
: new Error(String(signal.reason ?? "Run aborted."));
|
||||
}
|
||||
|
||||
await new Promise<void>((resolveWait, rejectWait) => {
|
||||
const timeout = setTimeout(() => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
resolveWait();
|
||||
}, ms);
|
||||
|
||||
function onAbort(): void {
|
||||
clearTimeout(timeout);
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
rejectWait(
|
||||
signal.reason instanceof Error
|
||||
? signal.reason
|
||||
: new Error(String(signal.reason ?? "Run aborted.")),
|
||||
);
|
||||
}
|
||||
|
||||
signal.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
function estimateUsage(prompt: string, toolCount: number): {
|
||||
tokenInput: number;
|
||||
tokenOutput: number;
|
||||
durationMs: number;
|
||||
costUsd: number;
|
||||
} {
|
||||
const tokenInput = Math.max(1, Math.ceil(prompt.length / 4));
|
||||
const tokenOutput = Math.max(16, Math.ceil(tokenInput * 0.7));
|
||||
const durationMs = 220 + (tokenInput % 11) * 35 + toolCount * 20;
|
||||
const tokenTotal = tokenInput + tokenOutput;
|
||||
const costUsd = Number((tokenTotal * 0.000002).toFixed(6));
|
||||
|
||||
return {
|
||||
tokenInput,
|
||||
tokenOutput,
|
||||
durationMs,
|
||||
costUsd,
|
||||
};
|
||||
}
|
||||
|
||||
function extractSubtasks(prompt: string): string[] {
|
||||
const sentences = prompt
|
||||
.split(/[.!?\n]/)
|
||||
.map((part) => part.trim())
|
||||
.filter((part) => part.length > 0)
|
||||
.slice(0, 3);
|
||||
|
||||
if (sentences.length > 0) {
|
||||
return sentences;
|
||||
}
|
||||
|
||||
const words = prompt.split(/\s+/).filter((word) => word.length > 0).slice(0, 3);
|
||||
return words.length > 0 ? [words.join(" ")] : [];
|
||||
}
|
||||
|
||||
function createMockActorExecutors(
|
||||
manifest: AgentManifest,
|
||||
input: {
|
||||
prompt: string;
|
||||
topologyHint?: string;
|
||||
simulateValidationNodeIds: Set<string>;
|
||||
},
|
||||
): Record<string, ActorExecutor> {
|
||||
const attemptsByNode = new Map<string, number>();
|
||||
const uniqueActorIds = dedupeStrings(manifest.pipeline.nodes.map((node) => node.actorId));
|
||||
|
||||
const execute: ActorExecutor = async (actorInput) => {
|
||||
const attempt = (attemptsByNode.get(actorInput.node.id) ?? 0) + 1;
|
||||
attemptsByNode.set(actorInput.node.id, attempt);
|
||||
|
||||
const shouldValidationFail =
|
||||
attempt === 1 && input.simulateValidationNodeIds.has(actorInput.node.id);
|
||||
|
||||
const usage = estimateUsage(actorInput.prompt, actorInput.executionContext.allowedTools.length);
|
||||
await waitWithSignal(Math.min(usage.durationMs, 900), actorInput.signal);
|
||||
|
||||
if (shouldValidationFail) {
|
||||
const failure: ActorExecutionResult = {
|
||||
status: "validation_fail",
|
||||
payload: {
|
||||
summary: `Node ${actorInput.node.id} requires remediation on first pass.`,
|
||||
subtasks: extractSubtasks(input.prompt),
|
||||
security_violation: false,
|
||||
},
|
||||
stateMetadata: {
|
||||
usage: {
|
||||
...usage,
|
||||
tokenTotal: usage.tokenInput + usage.tokenOutput,
|
||||
toolCalls: actorInput.executionContext.allowedTools.length,
|
||||
},
|
||||
topologyHint: input.topologyHint ?? "manifest-default",
|
||||
},
|
||||
failureKind: "soft",
|
||||
failureCode: "ui_mock_validation_required",
|
||||
};
|
||||
|
||||
return failure;
|
||||
}
|
||||
|
||||
return {
|
||||
status: "success",
|
||||
payload: {
|
||||
summary: `Node ${actorInput.node.id} completed in mock mode.`,
|
||||
prompt: input.prompt,
|
||||
subtasks: extractSubtasks(input.prompt),
|
||||
},
|
||||
stateMetadata: {
|
||||
usage: {
|
||||
...usage,
|
||||
tokenTotal: usage.tokenInput + usage.tokenOutput,
|
||||
toolCalls: actorInput.executionContext.allowedTools.length,
|
||||
},
|
||||
topologyHint: input.topologyHint ?? "manifest-default",
|
||||
},
|
||||
stateFlags: {
|
||||
[`${actorInput.node.id}_completed`]: true,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const executors: Record<string, ActorExecutor> = {};
|
||||
for (const actorId of uniqueActorIds) {
|
||||
executors[actorId] = execute;
|
||||
}
|
||||
return executors;
|
||||
}
|
||||
|
||||
function createSingleExecutorMap(manifest: AgentManifest, executor: ActorExecutor): Record<string, ActorExecutor> {
|
||||
const uniqueActorIds = dedupeStrings(manifest.pipeline.nodes.map((node) => node.actorId));
|
||||
const executors: Record<string, ActorExecutor> = {};
|
||||
for (const actorId of uniqueActorIds) {
|
||||
executors[actorId] = executor;
|
||||
}
|
||||
return executors;
|
||||
}
|
||||
|
||||
function toAbortErrorMessage(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.message;
|
||||
}
|
||||
return String(error);
|
||||
}
|
||||
|
||||
function isAbort(error: unknown): boolean {
|
||||
if (!(error instanceof Error)) {
|
||||
return false;
|
||||
}
|
||||
return error.name === "AbortError" || error.message.toLowerCase().includes("abort");
|
||||
}
|
||||
|
||||
async function loadRuntimeConfig(envPath: string): Promise<Readonly<AppConfig>> {
|
||||
const parsed = await parseEnvFile(envPath);
|
||||
return loadConfig({
|
||||
...process.env,
|
||||
...parsed.values,
|
||||
});
|
||||
}
|
||||
|
||||
import { JSONFilePreset } from "lowdb/node";
|
||||
|
||||
async function writeRunMeta(input: {
|
||||
stateRoot: string;
|
||||
sessionId: string;
|
||||
run: RunRecord;
|
||||
}): Promise<void> {
|
||||
const sessionDirectory = resolve(input.stateRoot, input.sessionId);
|
||||
await mkdir(sessionDirectory, { recursive: true });
|
||||
const path = resolve(sessionDirectory, RUN_META_FILE_NAME);
|
||||
const db = await JSONFilePreset<RunRecord>(path, input.run);
|
||||
db.data = input.run;
|
||||
await db.write();
|
||||
}
|
||||
|
||||
export async function readRunMetaBySession(input: {
|
||||
stateRoot: string;
|
||||
sessionId: string;
|
||||
}): Promise<RunRecord | undefined> {
|
||||
const path = resolve(input.stateRoot, input.sessionId, RUN_META_FILE_NAME);
|
||||
|
||||
try {
|
||||
const content = await readFile(path, "utf8");
|
||||
const parsed = JSON.parse(content) as unknown;
|
||||
if (!parsed || typeof parsed !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const record = parsed as Partial<RunRecord>;
|
||||
if (
|
||||
typeof record.runId !== "string" ||
|
||||
typeof record.sessionId !== "string" ||
|
||||
typeof record.status !== "string" ||
|
||||
typeof record.startedAt !== "string"
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const normalized: RunRecord = {
|
||||
runId: record.runId,
|
||||
sessionId: record.sessionId,
|
||||
status:
|
||||
record.status === "running" ||
|
||||
record.status === "success" ||
|
||||
record.status === "failure" ||
|
||||
record.status === "cancelled"
|
||||
? record.status
|
||||
: "failure",
|
||||
startedAt: record.startedAt,
|
||||
executionMode:
|
||||
record.executionMode === "provider" || record.executionMode === "mock"
|
||||
? record.executionMode
|
||||
: "mock",
|
||||
provider: record.provider === "claude" || record.provider === "codex" ? record.provider : "codex",
|
||||
...(typeof record.endedAt === "string" ? { endedAt: record.endedAt } : {}),
|
||||
...(typeof record.manifestPath === "string" ? { manifestPath: record.manifestPath } : {}),
|
||||
...(typeof record.topologyHint === "string" ? { topologyHint: record.topologyHint } : {}),
|
||||
...(typeof record.error === "string" ? { error: record.error } : {}),
|
||||
};
|
||||
|
||||
return normalized;
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return undefined;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export class UiRunService {
|
||||
private readonly workspaceRoot: string;
|
||||
private readonly envFilePath: string;
|
||||
private readonly activeRuns = new Map<string, ActiveRun>();
|
||||
private readonly runHistory = new Map<string, RunRecord>();
|
||||
|
||||
constructor(input: {
|
||||
workspaceRoot: string;
|
||||
envFilePath?: string;
|
||||
}) {
|
||||
this.workspaceRoot = resolve(input.workspaceRoot);
|
||||
this.envFilePath = resolve(this.workspaceRoot, input.envFilePath ?? ".env");
|
||||
}
|
||||
|
||||
listRuns(): RunRecord[] {
|
||||
const output = [...this.runHistory.values()].sort((left, right) => {
|
||||
return right.startedAt.localeCompare(left.startedAt);
|
||||
});
|
||||
return output;
|
||||
}
|
||||
|
||||
getRun(runId: string): RunRecord | undefined {
|
||||
return this.runHistory.get(runId);
|
||||
}
|
||||
|
||||
async startRun(input: StartRunInput): Promise<RunRecord> {
|
||||
const config = await loadRuntimeConfig(this.envFilePath);
|
||||
const manifest = parseAgentManifest(input.manifest);
|
||||
const executionMode = input.executionMode ?? "mock";
|
||||
const provider = input.provider ?? "codex";
|
||||
const sessionId = input.sessionId?.trim() || toSessionId();
|
||||
const runId = randomUUID();
|
||||
const controller = new AbortController();
|
||||
|
||||
const record: RunRecord = {
|
||||
runId,
|
||||
sessionId,
|
||||
status: "running",
|
||||
startedAt: new Date().toISOString(),
|
||||
executionMode,
|
||||
provider,
|
||||
...(input.manifestPath ? { manifestPath: input.manifestPath } : {}),
|
||||
...(input.topologyHint ? { topologyHint: input.topologyHint } : {}),
|
||||
};
|
||||
this.runHistory.set(runId, record);
|
||||
|
||||
const runPromise = (async () => {
|
||||
let providerRuntime: Awaited<ReturnType<typeof createProviderRunRuntime>> | undefined;
|
||||
try {
|
||||
if (executionMode === "provider") {
|
||||
providerRuntime = await createProviderRunRuntime({
|
||||
provider,
|
||||
initialPrompt: input.prompt,
|
||||
config,
|
||||
});
|
||||
}
|
||||
|
||||
const actorExecutors =
|
||||
executionMode === "provider" && providerRuntime
|
||||
? createSingleExecutorMap(manifest, createProviderActorExecutor(providerRuntime))
|
||||
: createMockActorExecutors(manifest, {
|
||||
prompt: input.prompt,
|
||||
topologyHint: input.topologyHint,
|
||||
simulateValidationNodeIds: new Set(input.simulateValidationNodeIds ?? []),
|
||||
});
|
||||
|
||||
const engine = new SchemaDrivenExecutionEngine({
|
||||
manifest,
|
||||
actorExecutors,
|
||||
settings: {
|
||||
workspaceRoot: this.workspaceRoot,
|
||||
stateRoot: config.orchestration.stateRoot,
|
||||
projectContextPath: config.orchestration.projectContextPath,
|
||||
runtimeContext: {
|
||||
ui_mode: executionMode,
|
||||
run_provider: provider,
|
||||
...(input.runtimeContextOverrides ?? {}),
|
||||
},
|
||||
},
|
||||
config,
|
||||
});
|
||||
|
||||
await writeRunMeta({
|
||||
stateRoot: config.orchestration.stateRoot,
|
||||
sessionId,
|
||||
run: record,
|
||||
});
|
||||
|
||||
const summary = await engine.runSession({
|
||||
sessionId,
|
||||
initialPayload: {
|
||||
prompt: input.prompt,
|
||||
},
|
||||
initialState: {
|
||||
flags: {
|
||||
...(input.initialFlags ?? {}),
|
||||
},
|
||||
},
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
const completedRecord = this.runHistory.get(runId);
|
||||
if (!completedRecord) {
|
||||
return;
|
||||
}
|
||||
|
||||
const next: RunRecord = {
|
||||
...completedRecord,
|
||||
status: toRunStatus(summary.status),
|
||||
endedAt: new Date().toISOString(),
|
||||
};
|
||||
this.runHistory.set(runId, next);
|
||||
|
||||
await writeRunMeta({
|
||||
stateRoot: config.orchestration.stateRoot,
|
||||
sessionId,
|
||||
run: next,
|
||||
});
|
||||
} catch (error) {
|
||||
const current = this.runHistory.get(runId);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
|
||||
const cancelled = controller.signal.aborted || isAbort(error);
|
||||
const next: RunRecord = {
|
||||
...current,
|
||||
status: cancelled ? "cancelled" : "failure",
|
||||
endedAt: new Date().toISOString(),
|
||||
error: toAbortErrorMessage(error),
|
||||
};
|
||||
this.runHistory.set(runId, next);
|
||||
|
||||
await writeRunMeta({
|
||||
stateRoot: config.orchestration.stateRoot,
|
||||
sessionId,
|
||||
run: next,
|
||||
});
|
||||
} finally {
|
||||
if (providerRuntime) {
|
||||
await providerRuntime.close();
|
||||
}
|
||||
this.activeRuns.delete(runId);
|
||||
}
|
||||
})();
|
||||
|
||||
this.activeRuns.set(runId, {
|
||||
controller,
|
||||
record,
|
||||
promise: runPromise,
|
||||
});
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
async cancelRun(runId: string): Promise<RunRecord> {
|
||||
const active = this.activeRuns.get(runId);
|
||||
if (!active) {
|
||||
const existing = this.runHistory.get(runId);
|
||||
if (!existing) {
|
||||
throw new Error(`Run \"${runId}\" does not exist.`);
|
||||
}
|
||||
return existing;
|
||||
}
|
||||
|
||||
active.controller.abort(new Error("Cancelled by operator from UI kill switch."));
|
||||
await active.promise;
|
||||
|
||||
const finalRecord = this.runHistory.get(runId);
|
||||
if (!finalRecord) {
|
||||
throw new Error(`Run \"${runId}\" cancellation did not produce a final record.`);
|
||||
}
|
||||
|
||||
return finalRecord;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user