Implement explicit session lifecycle and task-scoped worktrees
This commit is contained in:
@@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto";
|
||||
import type { JsonObject } from "./types.js";
|
||||
|
||||
export type PlanningDomainEventType = "requirements_defined" | "tasks_planned";
|
||||
export type ExecutionDomainEventType = "code_committed" | "task_blocked";
|
||||
export type ExecutionDomainEventType = "code_committed" | "task_blocked" | "task_ready_for_review";
|
||||
export type ValidationDomainEventType = "validation_passed" | "validation_failed";
|
||||
export type IntegrationDomainEventType = "branch_merged";
|
||||
|
||||
@@ -46,6 +46,7 @@ const DOMAIN_EVENT_TYPES = new Set<DomainEventType>([
|
||||
"tasks_planned",
|
||||
"code_committed",
|
||||
"task_blocked",
|
||||
"task_ready_for_review",
|
||||
"validation_passed",
|
||||
"validation_failed",
|
||||
"branch_merged",
|
||||
|
||||
@@ -13,10 +13,16 @@ import {
|
||||
type ActorExecutionSecurityContext,
|
||||
type ActorExecutor,
|
||||
type PipelineRunSummary,
|
||||
type TaskExecutionLifecycle,
|
||||
} from "./pipeline.js";
|
||||
import { FileSystemProjectContextStore } from "./project-context.js";
|
||||
import {
|
||||
FileSystemProjectContextStore,
|
||||
type ProjectTask,
|
||||
type ProjectTaskStatus,
|
||||
} from "./project-context.js";
|
||||
import { FileSystemStateContextManager, type StoredSessionState } from "./state-context.js";
|
||||
import type { JsonObject } from "./types.js";
|
||||
import { SessionWorktreeManager, type SessionMetadata } from "./session-lifecycle.js";
|
||||
import {
|
||||
SecureCommandExecutor,
|
||||
type SecurityAuditEvent,
|
||||
@@ -221,6 +227,35 @@ function createActorSecurityContext(input: {
|
||||
};
|
||||
}
|
||||
|
||||
function resolveSessionProjectContextPath(stateRoot: string, sessionId: string): string {
|
||||
return resolve(stateRoot, sessionId, "project-context.json");
|
||||
}
|
||||
|
||||
function readTaskIdFromPayload(payload: JsonObject, fallback: string): string {
|
||||
const candidates = [payload.taskId, payload.task_id, payload.task];
|
||||
for (const candidate of candidates) {
|
||||
if (typeof candidate === "string" && candidate.trim().length > 0) {
|
||||
return candidate.trim();
|
||||
}
|
||||
}
|
||||
return fallback;
|
||||
}
|
||||
|
||||
function toTaskStatusForFailure(resultStatus: "validation_fail" | "failure"): ProjectTaskStatus {
|
||||
if (resultStatus === "failure") {
|
||||
return "failed";
|
||||
}
|
||||
return "in_progress";
|
||||
}
|
||||
|
||||
function shouldMergeFromStatus(statusAtStart: string): boolean {
|
||||
return statusAtStart === "review";
|
||||
}
|
||||
|
||||
function toTaskIdLabel(task: ProjectTask): string {
|
||||
return task.taskId || task.id || "task";
|
||||
}
|
||||
|
||||
export class SchemaDrivenExecutionEngine {
|
||||
private readonly manifest: AgentManifest;
|
||||
private readonly personaRegistry = new PersonaRegistry();
|
||||
@@ -234,6 +269,7 @@ export class SchemaDrivenExecutionEngine {
|
||||
private readonly mcpRegistry: McpRegistry;
|
||||
private readonly runtimeEventPublisher: RuntimeEventPublisher;
|
||||
private readonly securityContext: ActorExecutionSecurityContext;
|
||||
private readonly sessionWorktreeManager: SessionWorktreeManager;
|
||||
|
||||
constructor(input: {
|
||||
manifest: AgentManifest | unknown;
|
||||
@@ -273,6 +309,10 @@ export class SchemaDrivenExecutionEngine {
|
||||
this.projectContextStore = new FileSystemProjectContextStore({
|
||||
filePath: this.settings.projectContextPath,
|
||||
});
|
||||
this.sessionWorktreeManager = new SessionWorktreeManager({
|
||||
worktreeRoot: resolve(this.settings.workspaceRoot, this.config.provisioning.gitWorktree.rootDirectory),
|
||||
baseRef: this.config.provisioning.gitWorktree.baseRef,
|
||||
});
|
||||
|
||||
this.actorExecutors = toExecutorMap(input.actorExecutors);
|
||||
this.manager =
|
||||
@@ -352,9 +392,22 @@ export class SchemaDrivenExecutionEngine {
|
||||
initialPayload: JsonObject;
|
||||
initialState?: Partial<StoredSessionState>;
|
||||
signal?: AbortSignal;
|
||||
sessionMetadata?: SessionMetadata;
|
||||
}): Promise<PipelineRunSummary> {
|
||||
const managerSessionId = `${input.sessionId}__pipeline`;
|
||||
const managerSession = this.manager.createSession(managerSessionId);
|
||||
const workspaceRoot = input.sessionMetadata?.baseWorkspacePath ?? this.settings.workspaceRoot;
|
||||
const projectContextStore = input.sessionMetadata
|
||||
? new FileSystemProjectContextStore({
|
||||
filePath: resolveSessionProjectContextPath(this.settings.stateRoot, input.sessionId),
|
||||
})
|
||||
: this.projectContextStore;
|
||||
const taskLifecycle = input.sessionMetadata
|
||||
? this.createTaskExecutionLifecycle({
|
||||
session: input.sessionMetadata,
|
||||
projectContextStore,
|
||||
})
|
||||
: undefined;
|
||||
|
||||
const executor = new PipelineExecutor(
|
||||
this.manifest,
|
||||
@@ -362,25 +415,26 @@ export class SchemaDrivenExecutionEngine {
|
||||
this.stateManager,
|
||||
this.actorExecutors,
|
||||
{
|
||||
workspaceRoot: this.settings.workspaceRoot,
|
||||
workspaceRoot,
|
||||
runtimeContext: this.settings.runtimeContext,
|
||||
defaultModelConstraint: this.config.provider.claudeModel,
|
||||
resolvedExecutionSecurityConstraints: {
|
||||
dropUid: this.config.security.dropUid !== undefined,
|
||||
dropGid: this.config.security.dropGid !== undefined,
|
||||
worktreePath: this.settings.workspaceRoot,
|
||||
worktreePath: workspaceRoot,
|
||||
violationMode: this.settings.securityViolationHandling,
|
||||
},
|
||||
maxDepth: Math.min(this.settings.maxDepth, this.manifest.topologyConstraints.maxDepth),
|
||||
maxRetries: Math.min(this.settings.maxRetries, this.manifest.topologyConstraints.maxRetries),
|
||||
manager: this.manager,
|
||||
managerSessionId,
|
||||
projectContextStore: this.projectContextStore,
|
||||
resolveMcpConfig: ({ providerHint, prompt, toolClearance }) =>
|
||||
projectContextStore,
|
||||
resolveMcpConfig: ({ providerHint, prompt, toolClearance, workingDirectory }) =>
|
||||
loadMcpConfigFromEnv(
|
||||
{
|
||||
providerHint,
|
||||
prompt,
|
||||
...(workingDirectory ? { workingDirectory } : {}),
|
||||
},
|
||||
{
|
||||
config: this.config,
|
||||
@@ -391,6 +445,7 @@ export class SchemaDrivenExecutionEngine {
|
||||
securityViolationHandling: this.settings.securityViolationHandling,
|
||||
securityContext: this.securityContext,
|
||||
runtimeEventPublisher: this.runtimeEventPublisher,
|
||||
...(taskLifecycle ? { taskLifecycle } : {}),
|
||||
},
|
||||
);
|
||||
try {
|
||||
@@ -405,6 +460,97 @@ export class SchemaDrivenExecutionEngine {
|
||||
}
|
||||
}
|
||||
|
||||
private createTaskExecutionLifecycle(input: {
|
||||
session: SessionMetadata;
|
||||
projectContextStore: FileSystemProjectContextStore;
|
||||
}): TaskExecutionLifecycle {
|
||||
return {
|
||||
prepareTaskExecution: async ({ node, context }) => {
|
||||
const taskId = readTaskIdFromPayload(context.handoff.payload, node.id);
|
||||
const projectContext = await input.projectContextStore.readState();
|
||||
const existing = projectContext.taskQueue.find(
|
||||
(task) => toTaskIdLabel(task) === taskId,
|
||||
);
|
||||
|
||||
const ensured = await this.sessionWorktreeManager.ensureTaskWorktree({
|
||||
sessionId: input.session.sessionId,
|
||||
taskId,
|
||||
baseWorkspacePath: input.session.baseWorkspacePath,
|
||||
...(existing?.worktreePath ? { existingWorktreePath: existing.worktreePath } : {}),
|
||||
});
|
||||
|
||||
const statusAtStart: ProjectTaskStatus =
|
||||
existing?.status === "review" ? "review" : "in_progress";
|
||||
|
||||
await input.projectContextStore.patchState({
|
||||
upsertTasks: [
|
||||
{
|
||||
taskId,
|
||||
id: taskId,
|
||||
status: statusAtStart,
|
||||
worktreePath: ensured.taskWorktreePath,
|
||||
...(existing?.title ? { title: existing.title } : { title: taskId }),
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
return {
|
||||
taskId,
|
||||
worktreePath: ensured.taskWorktreePath,
|
||||
statusAtStart,
|
||||
};
|
||||
},
|
||||
finalizeTaskExecution: async ({ task, result }) => {
|
||||
if (result.status === "failure" || result.status === "validation_fail") {
|
||||
await input.projectContextStore.patchState({
|
||||
upsertTasks: [
|
||||
{
|
||||
taskId: task.taskId,
|
||||
id: task.taskId,
|
||||
status: toTaskStatusForFailure(result.status),
|
||||
worktreePath: task.worktreePath,
|
||||
title: task.taskId,
|
||||
},
|
||||
],
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (shouldMergeFromStatus(task.statusAtStart)) {
|
||||
await this.sessionWorktreeManager.mergeTaskIntoBase({
|
||||
taskId: task.taskId,
|
||||
baseWorkspacePath: input.session.baseWorkspacePath,
|
||||
taskWorktreePath: task.worktreePath,
|
||||
});
|
||||
|
||||
await input.projectContextStore.patchState({
|
||||
upsertTasks: [
|
||||
{
|
||||
taskId: task.taskId,
|
||||
id: task.taskId,
|
||||
status: "merged",
|
||||
title: task.taskId,
|
||||
},
|
||||
],
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await input.projectContextStore.patchState({
|
||||
upsertTasks: [
|
||||
{
|
||||
taskId: task.taskId,
|
||||
id: task.taskId,
|
||||
status: "review",
|
||||
worktreePath: task.worktreePath,
|
||||
title: task.taskId,
|
||||
},
|
||||
],
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private assertRelationshipConstraints(): void {
|
||||
for (const [parent, edges] of this.childrenByParent.entries()) {
|
||||
if (edges.length > this.settings.maxChildren) {
|
||||
|
||||
@@ -153,6 +153,7 @@ export type PipelineExecutorOptions = {
|
||||
securityViolationHandling?: SecurityViolationHandling;
|
||||
securityContext?: ActorExecutionSecurityContext;
|
||||
runtimeEventPublisher?: RuntimeEventPublisher;
|
||||
taskLifecycle?: TaskExecutionLifecycle;
|
||||
};
|
||||
|
||||
export type ActorExecutionSecurityContext = {
|
||||
@@ -166,6 +167,27 @@ export type ActorExecutionSecurityContext = {
|
||||
}) => SecureCommandExecutor;
|
||||
};
|
||||
|
||||
export type TaskExecutionResolution = {
|
||||
taskId: string;
|
||||
worktreePath: string;
|
||||
statusAtStart: string;
|
||||
};
|
||||
|
||||
export type TaskExecutionLifecycle = {
|
||||
prepareTaskExecution: (input: {
|
||||
sessionId: string;
|
||||
node: PipelineNode;
|
||||
context: NodeExecutionContext;
|
||||
}) => Promise<TaskExecutionResolution>;
|
||||
finalizeTaskExecution: (input: {
|
||||
sessionId: string;
|
||||
node: PipelineNode;
|
||||
task: TaskExecutionResolution;
|
||||
result: ActorExecutionResult;
|
||||
domainEvents: DomainEvent[];
|
||||
}) => Promise<void>;
|
||||
};
|
||||
|
||||
type QueueItem = {
|
||||
nodeId: string;
|
||||
depth: number;
|
||||
@@ -580,9 +602,11 @@ export class PipelineExecutor {
|
||||
globalFlags: { ...projectContext.globalFlags },
|
||||
artifactPointers: { ...projectContext.artifactPointers },
|
||||
taskQueue: projectContext.taskQueue.map((task) => ({
|
||||
id: task.id,
|
||||
title: task.title,
|
||||
taskId: task.taskId,
|
||||
id: task.id ?? task.taskId,
|
||||
...(task.title ? { title: task.title } : {}),
|
||||
status: task.status,
|
||||
...(task.worktreePath ? { worktreePath: task.worktreePath } : {}),
|
||||
...(task.assignee ? { assignee: task.assignee } : {}),
|
||||
...(task.metadata ? { metadata: task.metadata } : {}),
|
||||
})),
|
||||
@@ -854,6 +878,13 @@ export class PipelineExecutor {
|
||||
})();
|
||||
|
||||
const context = await this.stateManager.buildFreshNodeContext(sessionId, node.id);
|
||||
const taskResolution = this.options.taskLifecycle
|
||||
? await this.options.taskLifecycle.prepareTaskExecution({
|
||||
sessionId,
|
||||
node,
|
||||
context,
|
||||
})
|
||||
: undefined;
|
||||
const prompt = this.personaRegistry.renderSystemPrompt({
|
||||
personaId: node.personaId,
|
||||
runtimeContext: {
|
||||
@@ -869,6 +900,7 @@ export class PipelineExecutor {
|
||||
node,
|
||||
toolClearance,
|
||||
prompt,
|
||||
worktreePathOverride: taskResolution?.worktreePath,
|
||||
});
|
||||
|
||||
const result = await this.invokeActorExecutor({
|
||||
@@ -889,7 +921,16 @@ export class PipelineExecutor {
|
||||
customEvents: result.events,
|
||||
});
|
||||
const topologyKind: NodeTopologyKind = node.topology?.kind ?? "sequential";
|
||||
const payloadForNext = result.payload ?? context.handoff.payload;
|
||||
const payloadForNext = {
|
||||
...context.handoff.payload,
|
||||
...(result.payload ?? {}),
|
||||
...(taskResolution
|
||||
? {
|
||||
taskId: taskResolution.taskId,
|
||||
worktreePath: taskResolution.worktreePath,
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
const shouldRetry =
|
||||
result.status === "validation_fail" &&
|
||||
this.shouldRetryValidation(node) &&
|
||||
@@ -907,6 +948,16 @@ export class PipelineExecutor {
|
||||
topologyKind,
|
||||
});
|
||||
|
||||
if (taskResolution && this.options.taskLifecycle) {
|
||||
await this.options.taskLifecycle.finalizeTaskExecution({
|
||||
sessionId,
|
||||
node,
|
||||
task: taskResolution,
|
||||
result,
|
||||
domainEvents,
|
||||
});
|
||||
}
|
||||
|
||||
const emittedEventTypes = domainEvents.map((event) => event.type);
|
||||
nodeRecords.push({
|
||||
nodeId: node.id,
|
||||
@@ -1006,7 +1057,10 @@ export class PipelineExecutor {
|
||||
context: input.context,
|
||||
signal: input.signal,
|
||||
executionContext: input.executionContext,
|
||||
mcp: this.buildActorMcpContext(input.executionContext, input.prompt),
|
||||
mcp: this.buildActorMcpContext({
|
||||
executionContext: input.executionContext,
|
||||
prompt: input.prompt,
|
||||
}),
|
||||
security: this.securityContext,
|
||||
});
|
||||
} catch (error) {
|
||||
@@ -1047,9 +1101,15 @@ export class PipelineExecutor {
|
||||
node: PipelineNode;
|
||||
toolClearance: ToolClearancePolicy;
|
||||
prompt: string;
|
||||
worktreePathOverride?: string;
|
||||
}): ResolvedExecutionContext {
|
||||
const normalizedToolClearance = parseToolClearancePolicy(input.toolClearance);
|
||||
const toolUniverse = this.resolveAvailableToolsForAttempt(normalizedToolClearance, input.prompt);
|
||||
const worktreePath = input.worktreePathOverride ?? this.options.resolvedExecutionSecurityConstraints.worktreePath;
|
||||
const toolUniverse = this.resolveAvailableToolsForAttempt({
|
||||
toolClearance: normalizedToolClearance,
|
||||
prompt: input.prompt,
|
||||
worktreePath,
|
||||
});
|
||||
const allowedTools = this.resolveAllowedToolsForAttempt({
|
||||
toolClearance: normalizedToolClearance,
|
||||
toolUniverse,
|
||||
@@ -1065,6 +1125,7 @@ export class PipelineExecutor {
|
||||
allowedTools,
|
||||
security: {
|
||||
...this.options.resolvedExecutionSecurityConstraints,
|
||||
worktreePath,
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -1087,15 +1148,20 @@ export class PipelineExecutor {
|
||||
return [];
|
||||
}
|
||||
|
||||
private resolveAvailableToolsForAttempt(toolClearance: ToolClearancePolicy, prompt: string): string[] {
|
||||
private resolveAvailableToolsForAttempt(input: {
|
||||
toolClearance: ToolClearancePolicy;
|
||||
prompt: string;
|
||||
worktreePath: string;
|
||||
}): string[] {
|
||||
if (!this.options.resolveMcpConfig) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const resolved = this.options.resolveMcpConfig({
|
||||
providerHint: "codex",
|
||||
prompt,
|
||||
toolClearance,
|
||||
prompt: input.prompt,
|
||||
workingDirectory: input.worktreePath,
|
||||
toolClearance: input.toolClearance,
|
||||
});
|
||||
|
||||
const rawServers = resolved.codexConfig?.mcp_servers;
|
||||
@@ -1115,10 +1181,11 @@ export class PipelineExecutor {
|
||||
return dedupeStrings(tools);
|
||||
}
|
||||
|
||||
private buildActorMcpContext(
|
||||
executionContext: ResolvedExecutionContext,
|
||||
prompt: string,
|
||||
): ActorExecutionMcpContext {
|
||||
private buildActorMcpContext(input: {
|
||||
executionContext: ResolvedExecutionContext;
|
||||
prompt: string;
|
||||
}): ActorExecutionMcpContext {
|
||||
const { executionContext, prompt } = input;
|
||||
const toolPolicy = toAllowedToolPolicy(executionContext.allowedTools);
|
||||
const filterToolsForProvider = (tools: string[]): string[] => {
|
||||
const deduped = dedupeStrings(tools);
|
||||
@@ -1129,6 +1196,7 @@ export class PipelineExecutor {
|
||||
? this.options.resolveMcpConfig({
|
||||
providerHint: "both",
|
||||
prompt,
|
||||
workingDirectory: executionContext.security.worktreePath,
|
||||
toolClearance: toolPolicy,
|
||||
})
|
||||
: {};
|
||||
@@ -1137,7 +1205,12 @@ export class PipelineExecutor {
|
||||
executionContext.allowedTools,
|
||||
);
|
||||
const resolveConfig = (context: McpLoadContext = {}): LoadedMcpConfig => {
|
||||
if (context.providerHint === "codex") {
|
||||
const withWorkingDirectory: McpLoadContext = {
|
||||
...context,
|
||||
...(context.workingDirectory ? {} : { workingDirectory: executionContext.security.worktreePath }),
|
||||
};
|
||||
|
||||
if (withWorkingDirectory.providerHint === "codex") {
|
||||
return {
|
||||
...(resolvedConfig.codexConfig ? { codexConfig: cloneMcpConfig(resolvedConfig).codexConfig } : {}),
|
||||
...(resolvedConfig.sourcePath ? { sourcePath: resolvedConfig.sourcePath } : {}),
|
||||
@@ -1147,7 +1220,7 @@ export class PipelineExecutor {
|
||||
};
|
||||
}
|
||||
|
||||
if (context.providerHint === "claude") {
|
||||
if (withWorkingDirectory.providerHint === "claude") {
|
||||
return {
|
||||
...(resolvedConfig.claudeMcpServers
|
||||
? { claudeMcpServers: cloneMcpConfig(resolvedConfig).claudeMcpServers }
|
||||
|
||||
@@ -5,12 +5,21 @@ import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./type
|
||||
|
||||
export const PROJECT_CONTEXT_SCHEMA_VERSION = 1;
|
||||
|
||||
export type ProjectTaskStatus = "pending" | "in_progress" | "blocked" | "done";
|
||||
export type ProjectTaskStatus =
|
||||
| "pending"
|
||||
| "in_progress"
|
||||
| "review"
|
||||
| "merged"
|
||||
| "failed"
|
||||
| "blocked"
|
||||
| "done";
|
||||
|
||||
export type ProjectTask = {
|
||||
id: string;
|
||||
title: string;
|
||||
taskId: string;
|
||||
id?: string;
|
||||
title?: string;
|
||||
status: ProjectTaskStatus;
|
||||
worktreePath?: string;
|
||||
assignee?: string;
|
||||
metadata?: JsonObject;
|
||||
};
|
||||
@@ -52,7 +61,15 @@ function toJsonObject(value: unknown, label: string): JsonObject {
|
||||
}
|
||||
|
||||
function toTaskStatus(value: unknown, label: string): ProjectTaskStatus {
|
||||
if (value === "pending" || value === "in_progress" || value === "blocked" || value === "done") {
|
||||
if (
|
||||
value === "pending" ||
|
||||
value === "in_progress" ||
|
||||
value === "review" ||
|
||||
value === "merged" ||
|
||||
value === "failed" ||
|
||||
value === "blocked" ||
|
||||
value === "done"
|
||||
) {
|
||||
return value;
|
||||
}
|
||||
throw new Error(`${label} has unsupported status "${String(value)}".`);
|
||||
@@ -68,10 +85,28 @@ function toProjectTask(value: unknown, label: string): ProjectTask {
|
||||
throw new Error(`${label}.assignee must be a non-empty string when provided.`);
|
||||
}
|
||||
|
||||
const taskIdCandidate = value.taskId ?? value.id;
|
||||
const taskId = assertNonEmptyString(taskIdCandidate, `${label}.taskId`);
|
||||
|
||||
const titleRaw = value.title;
|
||||
if (titleRaw !== undefined && (typeof titleRaw !== "string" || titleRaw.trim().length === 0)) {
|
||||
throw new Error(`${label}.title must be a non-empty string when provided.`);
|
||||
}
|
||||
|
||||
const worktreePathRaw = value.worktreePath;
|
||||
if (
|
||||
worktreePathRaw !== undefined &&
|
||||
(typeof worktreePathRaw !== "string" || worktreePathRaw.trim().length === 0)
|
||||
) {
|
||||
throw new Error(`${label}.worktreePath must be a non-empty string when provided.`);
|
||||
}
|
||||
|
||||
return {
|
||||
id: assertNonEmptyString(value.id, `${label}.id`),
|
||||
title: assertNonEmptyString(value.title, `${label}.title`),
|
||||
taskId,
|
||||
id: taskId,
|
||||
...(typeof titleRaw === "string" ? { title: titleRaw.trim() } : {}),
|
||||
status: toTaskStatus(value.status, `${label}.status`),
|
||||
...(typeof worktreePathRaw === "string" ? { worktreePath: worktreePathRaw.trim() } : {}),
|
||||
...(typeof assignee === "string" ? { assignee: assignee.trim() } : {}),
|
||||
...(value.metadata !== undefined
|
||||
? { metadata: toJsonObject(value.metadata, `${label}.metadata`) }
|
||||
@@ -157,10 +192,10 @@ function mergeUpsertTasks(current: ProjectTask[], upserts: ProjectTask[]): Proje
|
||||
|
||||
const byId = new Map<string, ProjectTask>();
|
||||
for (const task of current) {
|
||||
byId.set(task.id, task);
|
||||
byId.set(task.taskId, task);
|
||||
}
|
||||
for (const task of upserts) {
|
||||
byId.set(task.id, task);
|
||||
byId.set(task.taskId, task);
|
||||
}
|
||||
|
||||
return [...byId.values()];
|
||||
|
||||
@@ -197,9 +197,9 @@ export class ResourceProvisioningOrchestrator {
|
||||
async provisionSession(input: {
|
||||
sessionId: string;
|
||||
resources: ResourceRequest[];
|
||||
workspaceRoot?: string;
|
||||
workspaceRoot: string;
|
||||
}): Promise<ProvisionedResources> {
|
||||
const workspaceRoot = resolve(input.workspaceRoot ?? process.cwd());
|
||||
const workspaceRoot = resolve(input.workspaceRoot);
|
||||
const hardConstraints: ProvisionedResourcesState["hardConstraints"] = [];
|
||||
const releases: ProvisionedResourcesState["releases"] = [];
|
||||
const env: Record<string, string> = {};
|
||||
|
||||
389
src/agents/session-lifecycle.ts
Normal file
389
src/agents/session-lifecycle.ts
Normal file
@@ -0,0 +1,389 @@
|
||||
import { execFile } from "node:child_process";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { mkdir, readFile, readdir, stat } from "node:fs/promises";
|
||||
import { dirname, isAbsolute, resolve } from "node:path";
|
||||
import { promisify } from "node:util";
|
||||
import { withFileLock, writeUtf8FileAtomic } from "./file-persistence.js";
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
const SESSION_METADATA_FILE_NAME = "session-metadata.json";
|
||||
|
||||
export type SessionStatus = "active" | "suspended" | "closed";
|
||||
|
||||
export type SessionMetadata = {
|
||||
sessionId: string;
|
||||
projectPath: string;
|
||||
sessionStatus: SessionStatus;
|
||||
baseWorkspacePath: string;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
export type CreateSessionRequest = {
|
||||
projectPath: string;
|
||||
};
|
||||
|
||||
function toErrorMessage(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.message;
|
||||
}
|
||||
return String(error);
|
||||
}
|
||||
|
||||
function assertAbsolutePath(path: string, label: string): string {
|
||||
if (!isAbsolute(path)) {
|
||||
throw new Error(`${label} must be an absolute path.`);
|
||||
}
|
||||
return resolve(path);
|
||||
}
|
||||
|
||||
function assertNonEmptyString(value: unknown, label: string): string {
|
||||
if (typeof value !== "string" || value.trim().length === 0) {
|
||||
throw new Error(`${label} must be a non-empty string.`);
|
||||
}
|
||||
return value.trim();
|
||||
}
|
||||
|
||||
function toSessionStatus(value: unknown): SessionStatus {
|
||||
if (value === "active" || value === "suspended" || value === "closed") {
|
||||
return value;
|
||||
}
|
||||
throw new Error(`Session status "${String(value)}" is not supported.`);
|
||||
}
|
||||
|
||||
function toSessionMetadata(value: unknown): SessionMetadata {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
||||
throw new Error("Session metadata file is malformed.");
|
||||
}
|
||||
|
||||
const raw = value as Record<string, unknown>;
|
||||
|
||||
return {
|
||||
sessionId: assertNonEmptyString(raw.sessionId, "sessionId"),
|
||||
projectPath: assertAbsolutePath(assertNonEmptyString(raw.projectPath, "projectPath"), "projectPath"),
|
||||
baseWorkspacePath: assertAbsolutePath(
|
||||
assertNonEmptyString(raw.baseWorkspacePath, "baseWorkspacePath"),
|
||||
"baseWorkspacePath",
|
||||
),
|
||||
sessionStatus: toSessionStatus(raw.sessionStatus),
|
||||
createdAt: assertNonEmptyString(raw.createdAt, "createdAt"),
|
||||
updatedAt: assertNonEmptyString(raw.updatedAt, "updatedAt"),
|
||||
};
|
||||
}
|
||||
|
||||
async function runGit(args: string[]): Promise<string> {
|
||||
try {
|
||||
const { stdout } = await execFileAsync("git", args, {
|
||||
encoding: "utf8",
|
||||
});
|
||||
return stdout.trim();
|
||||
} catch (error) {
|
||||
throw new Error(`git ${args.join(" ")} failed: ${toErrorMessage(error)}`);
|
||||
}
|
||||
}
|
||||
|
||||
async function pathExists(path: string): Promise<boolean> {
|
||||
try {
|
||||
await stat(path);
|
||||
return true;
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return false;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
function sanitizeSegment(value: string, fallback: string): string {
|
||||
const normalized = value
|
||||
.trim()
|
||||
.replace(/[^a-zA-Z0-9_-]/g, "-")
|
||||
.replace(/-+/g, "-")
|
||||
.replace(/^-+/, "")
|
||||
.replace(/-+$/, "");
|
||||
return normalized || fallback;
|
||||
}
|
||||
|
||||
export class FileSystemSessionMetadataStore {
|
||||
private readonly stateRoot: string;
|
||||
|
||||
constructor(input: { stateRoot: string }) {
|
||||
this.stateRoot = resolve(input.stateRoot);
|
||||
}
|
||||
|
||||
getStateRoot(): string {
|
||||
return this.stateRoot;
|
||||
}
|
||||
|
||||
getSessionDirectory(sessionId: string): string {
|
||||
return resolve(this.stateRoot, sessionId);
|
||||
}
|
||||
|
||||
getSessionMetadataPath(sessionId: string): string {
|
||||
return resolve(this.getSessionDirectory(sessionId), SESSION_METADATA_FILE_NAME);
|
||||
}
|
||||
|
||||
getSessionProjectContextPath(sessionId: string): string {
|
||||
return resolve(this.getSessionDirectory(sessionId), "project-context.json");
|
||||
}
|
||||
|
||||
async createSession(input: {
|
||||
projectPath: string;
|
||||
baseWorkspacePath: string;
|
||||
sessionId?: string;
|
||||
}): Promise<SessionMetadata> {
|
||||
const sessionId = input.sessionId?.trim() || randomUUID();
|
||||
const now = new Date().toISOString();
|
||||
const metadata: SessionMetadata = {
|
||||
sessionId,
|
||||
projectPath: assertAbsolutePath(input.projectPath, "projectPath"),
|
||||
baseWorkspacePath: assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath"),
|
||||
sessionStatus: "active",
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
};
|
||||
|
||||
const sessionDirectory = this.getSessionDirectory(sessionId);
|
||||
await mkdir(sessionDirectory, { recursive: true });
|
||||
await this.writeSessionMetadata(metadata);
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
async readSession(sessionId: string): Promise<SessionMetadata | undefined> {
|
||||
const metadataPath = this.getSessionMetadataPath(sessionId);
|
||||
|
||||
try {
|
||||
const content = await readFile(metadataPath, "utf8");
|
||||
return toSessionMetadata(JSON.parse(content) as unknown);
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return undefined;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async listSessions(): Promise<SessionMetadata[]> {
|
||||
try {
|
||||
const entries = await readdir(this.stateRoot, { withFileTypes: true });
|
||||
const sessions: SessionMetadata[] = [];
|
||||
|
||||
for (const entry of entries) {
|
||||
if (!entry.isDirectory()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const metadata = await this.readSession(entry.name);
|
||||
if (metadata) {
|
||||
sessions.push(metadata);
|
||||
}
|
||||
}
|
||||
|
||||
sessions.sort((left, right) => right.createdAt.localeCompare(left.createdAt));
|
||||
return sessions;
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return [];
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async updateSession(
|
||||
sessionId: string,
|
||||
patch: Partial<Pick<SessionMetadata, "projectPath" | "baseWorkspacePath" | "sessionStatus">>,
|
||||
): Promise<SessionMetadata> {
|
||||
const current = await this.readSession(sessionId);
|
||||
if (!current) {
|
||||
throw new Error(`Session "${sessionId}" does not exist.`);
|
||||
}
|
||||
|
||||
const next: SessionMetadata = {
|
||||
...current,
|
||||
...(patch.projectPath ? { projectPath: assertAbsolutePath(patch.projectPath, "projectPath") } : {}),
|
||||
...(patch.baseWorkspacePath
|
||||
? { baseWorkspacePath: assertAbsolutePath(patch.baseWorkspacePath, "baseWorkspacePath") }
|
||||
: {}),
|
||||
...(patch.sessionStatus ? { sessionStatus: patch.sessionStatus } : {}),
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await this.writeSessionMetadata(next);
|
||||
return next;
|
||||
}
|
||||
|
||||
private async writeSessionMetadata(metadata: SessionMetadata): Promise<void> {
|
||||
const metadataPath = this.getSessionMetadataPath(metadata.sessionId);
|
||||
await mkdir(dirname(metadataPath), { recursive: true });
|
||||
await withFileLock(`${metadataPath}.lock`, async () => {
|
||||
await writeUtf8FileAtomic(metadataPath, `${JSON.stringify(metadata, null, 2)}\n`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export class SessionWorktreeManager {
|
||||
private readonly worktreeRoot: string;
|
||||
private readonly baseRef: string;
|
||||
|
||||
constructor(input: {
|
||||
worktreeRoot: string;
|
||||
baseRef: string;
|
||||
}) {
|
||||
this.worktreeRoot = assertAbsolutePath(input.worktreeRoot, "worktreeRoot");
|
||||
this.baseRef = assertNonEmptyString(input.baseRef, "baseRef");
|
||||
}
|
||||
|
||||
resolveBaseWorkspacePath(sessionId: string): string {
|
||||
const scoped = sanitizeSegment(sessionId, "session");
|
||||
return resolve(this.worktreeRoot, scoped, "base");
|
||||
}
|
||||
|
||||
resolveTaskWorktreePath(sessionId: string, taskId: string): string {
|
||||
const scopedSession = sanitizeSegment(sessionId, "session");
|
||||
const scopedTask = sanitizeSegment(taskId, "task");
|
||||
return resolve(this.worktreeRoot, scopedSession, "tasks", scopedTask);
|
||||
}
|
||||
|
||||
private resolveBaseBranchName(sessionId: string): string {
|
||||
const scoped = sanitizeSegment(sessionId, "session");
|
||||
return `ai-ops/${scoped}/base`;
|
||||
}
|
||||
|
||||
private resolveTaskBranchName(sessionId: string, taskId: string): string {
|
||||
const scopedSession = sanitizeSegment(sessionId, "session");
|
||||
const scopedTask = sanitizeSegment(taskId, "task");
|
||||
return `ai-ops/${scopedSession}/task/${scopedTask}`;
|
||||
}
|
||||
|
||||
async initializeSessionBaseWorkspace(input: {
|
||||
sessionId: string;
|
||||
projectPath: string;
|
||||
baseWorkspacePath: string;
|
||||
}): Promise<void> {
|
||||
const projectPath = assertAbsolutePath(input.projectPath, "projectPath");
|
||||
const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath");
|
||||
|
||||
await mkdir(dirname(baseWorkspacePath), { recursive: true });
|
||||
|
||||
const alreadyExists = await pathExists(baseWorkspacePath);
|
||||
if (alreadyExists) {
|
||||
return;
|
||||
}
|
||||
|
||||
const repoRoot = await runGit(["-C", projectPath, "rev-parse", "--show-toplevel"]);
|
||||
const branchName = this.resolveBaseBranchName(input.sessionId);
|
||||
await runGit(["-C", repoRoot, "worktree", "add", "-B", branchName, baseWorkspacePath, this.baseRef]);
|
||||
}
|
||||
|
||||
async ensureTaskWorktree(input: {
|
||||
sessionId: string;
|
||||
taskId: string;
|
||||
baseWorkspacePath: string;
|
||||
existingWorktreePath?: string;
|
||||
}): Promise<{
|
||||
taskWorktreePath: string;
|
||||
}> {
|
||||
const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath");
|
||||
const maybeExisting = input.existingWorktreePath?.trim();
|
||||
const worktreePath = maybeExisting
|
||||
? assertAbsolutePath(maybeExisting, "existingWorktreePath")
|
||||
: this.resolveTaskWorktreePath(input.sessionId, input.taskId);
|
||||
|
||||
if (!(await pathExists(worktreePath))) {
|
||||
await mkdir(dirname(worktreePath), { recursive: true });
|
||||
const branchName = this.resolveTaskBranchName(input.sessionId, input.taskId);
|
||||
await runGit(["-C", baseWorkspacePath, "worktree", "add", "-B", branchName, worktreePath, "HEAD"]);
|
||||
}
|
||||
|
||||
return {
|
||||
taskWorktreePath: worktreePath,
|
||||
};
|
||||
}
|
||||
|
||||
async mergeTaskIntoBase(input: {
|
||||
taskId: string;
|
||||
baseWorkspacePath: string;
|
||||
taskWorktreePath: string;
|
||||
}): Promise<void> {
|
||||
const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath");
|
||||
const taskWorktreePath = assertAbsolutePath(input.taskWorktreePath, "taskWorktreePath");
|
||||
|
||||
await runGit(["-C", taskWorktreePath, "add", "-A"]);
|
||||
const hasPending = await this.hasStagedChanges(taskWorktreePath);
|
||||
if (hasPending) {
|
||||
await runGit([
|
||||
"-C",
|
||||
taskWorktreePath,
|
||||
"commit",
|
||||
"-m",
|
||||
`ai_ops: finalize task ${input.taskId}`,
|
||||
]);
|
||||
}
|
||||
|
||||
const branchName = await runGit(["-C", taskWorktreePath, "rev-parse", "--abbrev-ref", "HEAD"]);
|
||||
await runGit(["-C", baseWorkspacePath, "merge", "--no-ff", "--no-edit", branchName]);
|
||||
await this.removeWorktree({
|
||||
repoPath: baseWorkspacePath,
|
||||
worktreePath: taskWorktreePath,
|
||||
});
|
||||
}
|
||||
|
||||
async closeSession(input: {
|
||||
session: SessionMetadata;
|
||||
taskWorktreePaths: string[];
|
||||
mergeBaseIntoProject?: boolean;
|
||||
}): Promise<void> {
|
||||
const projectPath = assertAbsolutePath(input.session.projectPath, "projectPath");
|
||||
const baseWorkspacePath = assertAbsolutePath(input.session.baseWorkspacePath, "baseWorkspacePath");
|
||||
|
||||
for (const taskWorktreePath of input.taskWorktreePaths) {
|
||||
if (!taskWorktreePath.trim()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.removeWorktree({
|
||||
repoPath: baseWorkspacePath,
|
||||
worktreePath: taskWorktreePath,
|
||||
});
|
||||
}
|
||||
|
||||
if (input.mergeBaseIntoProject) {
|
||||
const baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]);
|
||||
await runGit(["-C", projectPath, "merge", "--no-ff", "--no-edit", baseBranch]);
|
||||
}
|
||||
|
||||
await this.removeWorktree({
|
||||
repoPath: projectPath,
|
||||
worktreePath: baseWorkspacePath,
|
||||
});
|
||||
}
|
||||
|
||||
private async removeWorktree(input: {
|
||||
repoPath: string;
|
||||
worktreePath: string;
|
||||
}): Promise<void> {
|
||||
if (!(await pathExists(input.worktreePath))) {
|
||||
return;
|
||||
}
|
||||
|
||||
await runGit(["-C", input.repoPath, "worktree", "remove", "--force", input.worktreePath]);
|
||||
await runGit(["-C", input.repoPath, "worktree", "prune"]);
|
||||
}
|
||||
|
||||
private async hasStagedChanges(worktreePath: string): Promise<boolean> {
|
||||
try {
|
||||
await execFileAsync("git", ["-C", worktreePath, "diff", "--cached", "--quiet"], {
|
||||
encoding: "utf8",
|
||||
});
|
||||
return false;
|
||||
} catch (error) {
|
||||
const exitCode = (error as { code?: number }).code;
|
||||
if (exitCode === 1) {
|
||||
return true;
|
||||
}
|
||||
throw new Error(`Unable to inspect staged changes: ${toErrorMessage(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user