From 9f032d9b1456b513eb079a3de4247bbe81c29a61 Mon Sep 17 00:00:00 2001 From: Josh Rzemien Date: Tue, 24 Feb 2026 10:29:06 -0500 Subject: [PATCH] Handle merge conflicts as orchestration events --- .env.example | 1 + AGENTS.md | 1 + README.md | 4 +- docs/orchestration-engine.md | 8 +- src/agents/domain-events.ts | 11 +- src/agents/lifecycle-observer.ts | 8 +- src/agents/orchestration.ts | 280 ++++++++++++++++++++++- src/agents/pipeline.ts | 49 +++- src/agents/project-context.ts | 4 + src/agents/session-lifecycle.ts | 381 +++++++++++++++++++++++++++---- src/config.ts | 8 + src/ui/provider-executor.ts | 2 +- src/ui/public/app.js | 4 + src/ui/run-service.ts | 17 +- tests/config.test.ts | 8 + tests/project-context.test.ts | 29 +++ tests/run-service.test.ts | 51 +++++ tests/session-lifecycle.test.ts | 67 +++++- 18 files changed, 863 insertions(+), 70 deletions(-) diff --git a/.env.example b/.env.example index f3d50f3..45b1c75 100644 --- a/.env.example +++ b/.env.example @@ -28,6 +28,7 @@ AGENT_PROJECT_CONTEXT_PATH=.ai_ops/project-context.json AGENT_TOPOLOGY_MAX_DEPTH=4 AGENT_TOPOLOGY_MAX_RETRIES=2 AGENT_RELATIONSHIP_MAX_CHILDREN=4 +AGENT_MERGE_CONFLICT_MAX_ATTEMPTS=2 # Resource provisioning (hard + soft constraints) AGENT_WORKTREE_ROOT=.ai_ops/worktrees diff --git a/AGENTS.md b/AGENTS.md index 0d6f3ec..3ce1edf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -29,6 +29,7 @@ - `AGENT_TOPOLOGY_MAX_DEPTH` - `AGENT_TOPOLOGY_MAX_RETRIES` - `AGENT_RELATIONSHIP_MAX_CHILDREN` + - `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS` - Provisioning/resource controls: - `AGENT_WORKTREE_ROOT` - `AGENT_WORKTREE_BASE_REF` diff --git a/README.md b/README.md index c95faf0..45c2198 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ TypeScript runtime for deterministic multi-agent execution with: - `artifactPointers` - `taskQueue` - each task record stores `taskId`, status, and optional `worktreePath` for task-scoped workspace ownership + - conflict-aware statuses are supported (`conflict`, `resolving_conflict`) ## Deep Dives @@ -132,7 +133,7 @@ Domain events are typed and can trigger edges directly: - planning: `requirements_defined`, `tasks_planned` - execution: `code_committed`, `task_ready_for_review`, `task_blocked` - validation: `validation_passed`, `validation_failed` -- integration: `branch_merged` +- integration: `branch_merged`, `merge_conflict_detected`, `merge_conflict_resolved`, `merge_conflict_unresolved`, `merge_retry_started` Actors can emit events in `ActorExecutionResult.events`. Pipeline status also emits default validation/execution events. @@ -272,6 +273,7 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson - `AGENT_TOPOLOGY_MAX_DEPTH` - `AGENT_TOPOLOGY_MAX_RETRIES` - `AGENT_RELATIONSHIP_MAX_CHILDREN` +- `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS` ### Provisioning / Resource Controls diff --git a/docs/orchestration-engine.md b/docs/orchestration-engine.md index 20b27f4..ea2bc2b 100644 --- a/docs/orchestration-engine.md +++ b/docs/orchestration-engine.md @@ -52,10 +52,16 @@ This keeps orchestration policy resolution separate from executor enforcement. E - planning: `requirements_defined`, `tasks_planned` - execution: `code_committed`, `task_blocked` - validation: `validation_passed`, `validation_failed` - - integration: `branch_merged` + - integration: `branch_merged`, `merge_conflict_detected`, `merge_conflict_resolved`, `merge_conflict_unresolved`, `merge_retry_started` - Pipeline edges can trigger on domain events (`edge.event`) in addition to legacy status triggers (`edge.on`). - `history_has_event` route conditions evaluate persisted domain event history entries (`validation_failed`, `task_blocked`, etc.). +## Merge conflict orchestration + +- Task merge/close merge operations return structured outcomes (`success`, `conflict`, `fatal_error`) instead of throwing for conflicts. +- Task state supports conflict workflows (`conflict`, `resolving_conflict`) and conflict metadata is persisted under `task.metadata.mergeConflict`. +- Conflict retries are bounded by `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS`; exhaustion emits `merge_conflict_unresolved` and the session continues without crashing. + ## Security note Security enforcement now lives in `src/security`: diff --git a/src/agents/domain-events.ts b/src/agents/domain-events.ts index 34b4c87..9b815ae 100644 --- a/src/agents/domain-events.ts +++ b/src/agents/domain-events.ts @@ -4,7 +4,12 @@ import type { JsonObject } from "./types.js"; export type PlanningDomainEventType = "requirements_defined" | "tasks_planned"; export type ExecutionDomainEventType = "code_committed" | "task_blocked" | "task_ready_for_review"; export type ValidationDomainEventType = "validation_passed" | "validation_failed"; -export type IntegrationDomainEventType = "branch_merged"; +export type IntegrationDomainEventType = + | "branch_merged" + | "merge_conflict_detected" + | "merge_conflict_resolved" + | "merge_conflict_unresolved" + | "merge_retry_started"; export type DomainEventType = | PlanningDomainEventType @@ -50,6 +55,10 @@ const DOMAIN_EVENT_TYPES = new Set([ "validation_passed", "validation_failed", "branch_merged", + "merge_conflict_detected", + "merge_conflict_resolved", + "merge_conflict_unresolved", + "merge_retry_started", ]); export function isDomainEventType(value: string): value is DomainEventType { diff --git a/src/agents/lifecycle-observer.ts b/src/agents/lifecycle-observer.ts index 40b86af..67fa2a7 100644 --- a/src/agents/lifecycle-observer.ts +++ b/src/agents/lifecycle-observer.ts @@ -50,10 +50,14 @@ function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity } function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity { - if (type === "task_blocked") { + if (type === "task_blocked" || type === "merge_conflict_unresolved") { return "critical"; } - if (type === "validation_failed") { + if ( + type === "validation_failed" || + type === "merge_conflict_detected" || + type === "merge_retry_started" + ) { return "warning"; } return "info"; diff --git a/src/agents/orchestration.ts b/src/agents/orchestration.ts index b6fcf61..140762b 100644 --- a/src/agents/orchestration.ts +++ b/src/agents/orchestration.ts @@ -2,6 +2,7 @@ import { resolve } from "node:path"; import { getConfig, loadConfig, type AppConfig } from "../config.js"; import { createDefaultMcpRegistry, loadMcpConfigFromEnv, McpRegistry } from "../mcp.js"; import { parseAgentManifest, type AgentManifest } from "./manifest.js"; +import type { DomainEventEmission } from "./domain-events.js"; import { AgentManager } from "./manager.js"; import { PersonaRegistry, @@ -44,6 +45,7 @@ export type OrchestrationSettings = { maxDepth: number; maxRetries: number; maxChildren: number; + mergeConflictMaxAttempts: number; securityViolationHandling: "hard_abort" | "validation_fail"; runtimeContext: Record; }; @@ -62,6 +64,7 @@ export function loadOrchestrationSettingsFromEnv( maxDepth: config.orchestration.maxDepth, maxRetries: config.orchestration.maxRetries, maxChildren: config.orchestration.maxChildren, + mergeConflictMaxAttempts: config.orchestration.mergeConflictMaxAttempts, securityViolationHandling: config.security.violationHandling, }; } @@ -241,21 +244,43 @@ function readTaskIdFromPayload(payload: JsonObject, fallback: string): string { return fallback; } -function toTaskStatusForFailure(resultStatus: "validation_fail" | "failure"): ProjectTaskStatus { +function toTaskStatusForFailure( + resultStatus: "validation_fail" | "failure", + statusAtStart: string, +): ProjectTaskStatus { if (resultStatus === "failure") { return "failed"; } + if (statusAtStart === "conflict" || statusAtStart === "resolving_conflict") { + return "conflict"; + } return "in_progress"; } function shouldMergeFromStatus(statusAtStart: string): boolean { - return statusAtStart === "review"; + return statusAtStart === "review" || statusAtStart === "resolving_conflict"; } function toTaskIdLabel(task: ProjectTask): string { return task.taskId || task.id || "task"; } +function toJsonObject(value: unknown): JsonObject | undefined { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return undefined; + } + return value as JsonObject; +} + +function readMergeConflictAttempts(metadata: JsonObject | undefined): number { + const record = toJsonObject(metadata?.mergeConflict); + const attempts = record?.attempts; + if (typeof attempts === "number" && Number.isInteger(attempts) && attempts >= 0) { + return attempts; + } + return 0; +} + export class SchemaDrivenExecutionEngine { private readonly manifest: AgentManifest; private readonly personaRegistry = new PersonaRegistry(); @@ -296,6 +321,8 @@ export class SchemaDrivenExecutionEngine { maxDepth: input.settings?.maxDepth ?? config.orchestration.maxDepth, maxRetries: input.settings?.maxRetries ?? config.orchestration.maxRetries, maxChildren: input.settings?.maxChildren ?? config.orchestration.maxChildren, + mergeConflictMaxAttempts: + input.settings?.mergeConflictMaxAttempts ?? config.orchestration.mergeConflictMaxAttempts, securityViolationHandling: input.settings?.securityViolationHandling ?? config.security.violationHandling, runtimeContext: { @@ -480,7 +507,11 @@ export class SchemaDrivenExecutionEngine { }); const statusAtStart: ProjectTaskStatus = - existing?.status === "review" ? "review" : "in_progress"; + existing?.status === "review" || + existing?.status === "conflict" || + existing?.status === "resolving_conflict" + ? existing.status + : "in_progress"; await input.projectContextStore.patchState({ upsertTasks: [ @@ -490,6 +521,7 @@ export class SchemaDrivenExecutionEngine { status: statusAtStart, worktreePath: ensured.taskWorktreePath, ...(existing?.title ? { title: existing.title } : { title: taskId }), + ...(existing?.metadata ? { metadata: existing.metadata } : {}), }, ], }); @@ -498,44 +530,267 @@ export class SchemaDrivenExecutionEngine { taskId, worktreePath: ensured.taskWorktreePath, statusAtStart, + ...(existing?.metadata ? { metadata: existing.metadata } : {}), }; }, - finalizeTaskExecution: async ({ task, result }) => { + finalizeTaskExecution: async ({ task, result, domainEvents }) => { + const emittedTypes = new Set(domainEvents.map((event) => event.type)); + const additionalEvents: DomainEventEmission[] = []; + const emitEvent = ( + type: DomainEventEmission["type"], + payload?: DomainEventEmission["payload"], + ): void => { + if (emittedTypes.has(type)) { + return; + } + emittedTypes.add(type); + additionalEvents.push(payload ? { type, payload } : { type }); + }; + if (result.status === "failure" || result.status === "validation_fail") { await input.projectContextStore.patchState({ upsertTasks: [ { taskId: task.taskId, id: task.taskId, - status: toTaskStatusForFailure(result.status), + status: toTaskStatusForFailure(result.status, task.statusAtStart), worktreePath: task.worktreePath, title: task.taskId, + ...(task.metadata ? { metadata: task.metadata } : {}), }, ], }); return; } + if (task.statusAtStart === "conflict") { + const attempts = readMergeConflictAttempts(task.metadata); + const metadata: JsonObject = { + ...(task.metadata ?? {}), + mergeConflict: { + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: "resolved", + resolvedAt: new Date().toISOString(), + }, + }; + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "resolving_conflict", + worktreePath: task.worktreePath, + title: task.taskId, + metadata, + }, + ], + }); + + emitEvent("merge_conflict_resolved", { + summary: `Merge conflicts resolved for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: task.worktreePath, + attempts, + }, + }); + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + worktreePath: task.worktreePath, + mergeConflictStatus: "resolved", + mergeConflictAttempts: attempts, + } as JsonObject, + }; + } + if (shouldMergeFromStatus(task.statusAtStart)) { - await this.sessionWorktreeManager.mergeTaskIntoBase({ + const attemptsBeforeMerge = readMergeConflictAttempts(task.metadata); + if (task.statusAtStart === "resolving_conflict") { + emitEvent("merge_retry_started", { + summary: `Retrying merge for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: task.worktreePath, + nextAttempt: attemptsBeforeMerge + 1, + maxAttempts: this.settings.mergeConflictMaxAttempts, + }, + }); + } + + const mergeOutcome = await this.sessionWorktreeManager.mergeTaskIntoBase({ taskId: task.taskId, baseWorkspacePath: input.session.baseWorkspacePath, taskWorktreePath: task.worktreePath, }); + if (mergeOutcome.kind === "success") { + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "merged", + title: task.taskId, + metadata: { + ...(task.metadata ?? {}), + mergeConflict: { + attempts: attemptsBeforeMerge, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: "merged", + mergedAt: new Date().toISOString(), + }, + }, + }, + ], + }); + + emitEvent("branch_merged", { + summary: `Task "${task.taskId}" merged into session base branch.`, + details: { + taskId: task.taskId, + worktreePath: task.worktreePath, + }, + }); + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + mergeStatus: "merged", + } as JsonObject, + }; + } + + if (mergeOutcome.kind === "conflict") { + const attempts = attemptsBeforeMerge + 1; + const exhausted = attempts >= this.settings.mergeConflictMaxAttempts; + const metadata: JsonObject = { + ...(task.metadata ?? {}), + mergeConflict: { + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: exhausted ? "unresolved" : "conflict", + conflictFiles: mergeOutcome.conflictFiles, + worktreePath: mergeOutcome.worktreePath, + detectedAt: new Date().toISOString(), + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }; + + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "conflict", + worktreePath: task.worktreePath, + title: task.taskId, + metadata, + }, + ], + }); + + emitEvent("merge_conflict_detected", { + summary: `Merge conflict detected for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: mergeOutcome.worktreePath, + conflictFiles: mergeOutcome.conflictFiles, + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }); + + if (exhausted) { + emitEvent("merge_conflict_unresolved", { + summary: + `Merge conflict attempts exhausted for task "${task.taskId}" ` + + `(${String(attempts)}/${String(this.settings.mergeConflictMaxAttempts)}).`, + details: { + taskId: task.taskId, + worktreePath: mergeOutcome.worktreePath, + conflictFiles: mergeOutcome.conflictFiles, + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + }, + }); + } + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + worktreePath: task.worktreePath, + mergeConflictStatus: exhausted ? "unresolved" : "conflict", + mergeConflictAttempts: attempts, + mergeConflictMaxAttempts: this.settings.mergeConflictMaxAttempts, + mergeConflictFiles: mergeOutcome.conflictFiles, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + } as JsonObject, + }; + } + await input.projectContextStore.patchState({ upsertTasks: [ { taskId: task.taskId, id: task.taskId, - status: "merged", + status: "failed", + worktreePath: task.worktreePath, title: task.taskId, + metadata: { + ...(task.metadata ?? {}), + mergeConflict: { + attempts: attemptsBeforeMerge, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: "fatal_error", + error: mergeOutcome.error, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }, }, ], }); - return; + + emitEvent("merge_conflict_unresolved", { + summary: `Fatal merge error for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: mergeOutcome.worktreePath, + error: mergeOutcome.error, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }); + emitEvent("task_blocked", { + summary: `Task "${task.taskId}" blocked due to fatal merge error.`, + details: { + taskId: task.taskId, + error: mergeOutcome.error, + }, + }); + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + worktreePath: task.worktreePath, + mergeStatus: "fatal_error", + mergeError: mergeOutcome.error, + } as JsonObject, + }; } + const nextMetadata = task.metadata + ? { + ...task.metadata, + } + : undefined; + await input.projectContextStore.patchState({ upsertTasks: [ { @@ -544,9 +799,18 @@ export class SchemaDrivenExecutionEngine { status: "review", worktreePath: task.worktreePath, title: task.taskId, + ...(nextMetadata ? { metadata: nextMetadata } : {}), }, ], }); + + if (additionalEvents.length > 0) { + return { + additionalEvents, + }; + } + + return; }, }; } diff --git a/src/agents/pipeline.ts b/src/agents/pipeline.ts index 6da7074..ad2d71f 100644 --- a/src/agents/pipeline.ts +++ b/src/agents/pipeline.ts @@ -171,6 +171,7 @@ export type TaskExecutionResolution = { taskId: string; worktreePath: string; statusAtStart: string; + metadata?: JsonObject; }; export type TaskExecutionLifecycle = { @@ -185,7 +186,13 @@ export type TaskExecutionLifecycle = { task: TaskExecutionResolution; result: ActorExecutionResult; domainEvents: DomainEvent[]; - }) => Promise; + }) => Promise< + | void + | { + additionalEvents?: DomainEventEmission[]; + handoffPayloadPatch?: JsonObject; + } + >; }; type QueueItem = { @@ -921,7 +928,7 @@ export class PipelineExecutor { customEvents: result.events, }); const topologyKind: NodeTopologyKind = node.topology?.kind ?? "sequential"; - const payloadForNext = { + let payloadForNext: JsonObject = { ...context.handoff.payload, ...(result.payload ?? {}), ...(taskResolution @@ -936,6 +943,34 @@ export class PipelineExecutor { this.shouldRetryValidation(node) && attempt <= maxRetriesForNode; + if (taskResolution && this.options.taskLifecycle) { + const finalization = await this.options.taskLifecycle.finalizeTaskExecution({ + sessionId, + node, + task: taskResolution, + result, + domainEvents, + }); + for (const eventEmission of finalization?.additionalEvents ?? []) { + domainEvents.push( + createDomainEvent({ + type: eventEmission.type, + source: "pipeline", + sessionId, + nodeId: node.id, + attempt, + payload: eventEmission.payload, + }), + ); + } + if (finalization?.handoffPayloadPatch) { + payloadForNext = { + ...payloadForNext, + ...finalization.handoffPayloadPatch, + }; + } + } + await this.lifecycleObserver.onNodeAttempt({ sessionId, node, @@ -948,16 +983,6 @@ export class PipelineExecutor { topologyKind, }); - if (taskResolution && this.options.taskLifecycle) { - await this.options.taskLifecycle.finalizeTaskExecution({ - sessionId, - node, - task: taskResolution, - result, - domainEvents, - }); - } - const emittedEventTypes = domainEvents.map((event) => event.type); nodeRecords.push({ nodeId: node.id, diff --git a/src/agents/project-context.ts b/src/agents/project-context.ts index 3544bd5..b907a03 100644 --- a/src/agents/project-context.ts +++ b/src/agents/project-context.ts @@ -9,6 +9,8 @@ export type ProjectTaskStatus = | "pending" | "in_progress" | "review" + | "conflict" + | "resolving_conflict" | "merged" | "failed" | "blocked" @@ -65,6 +67,8 @@ function toTaskStatus(value: unknown, label: string): ProjectTaskStatus { value === "pending" || value === "in_progress" || value === "review" || + value === "conflict" || + value === "resolving_conflict" || value === "merged" || value === "failed" || value === "blocked" || diff --git a/src/agents/session-lifecycle.ts b/src/agents/session-lifecycle.ts index bf2b764..6a60cc2 100644 --- a/src/agents/session-lifecycle.ts +++ b/src/agents/session-lifecycle.ts @@ -9,7 +9,7 @@ const execFileAsync = promisify(execFile); const SESSION_METADATA_FILE_NAME = "session-metadata.json"; -export type SessionStatus = "active" | "suspended" | "closed"; +export type SessionStatus = "active" | "suspended" | "closed" | "closed_with_conflicts"; export type SessionMetadata = { sessionId: string; @@ -24,6 +24,58 @@ export type CreateSessionRequest = { projectPath: string; }; +export type MergeTaskIntoBaseOutcome = + | { + kind: "success"; + taskId: string; + worktreePath: string; + baseWorkspacePath: string; + } + | { + kind: "conflict"; + taskId: string; + worktreePath: string; + baseWorkspacePath: string; + conflictFiles: string[]; + mergeBase?: string; + } + | { + kind: "fatal_error"; + taskId: string; + worktreePath: string; + baseWorkspacePath: string; + error: string; + mergeBase?: string; + }; + +export type CloseSessionOutcome = + | { + kind: "success"; + sessionId: string; + mergedToProject: boolean; + } + | { + kind: "conflict"; + sessionId: string; + worktreePath: string; + conflictFiles: string[]; + mergeBase?: string; + baseBranch?: string; + } + | { + kind: "fatal_error"; + sessionId: string; + error: string; + baseBranch?: string; + mergeBase?: string; + }; + +type GitExecutionResult = { + exitCode: number; + stdout: string; + stderr: string; +}; + function toErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; @@ -46,7 +98,12 @@ function assertNonEmptyString(value: unknown, label: string): string { } function toSessionStatus(value: unknown): SessionStatus { - if (value === "active" || value === "suspended" || value === "closed") { + if ( + value === "active" || + value === "suspended" || + value === "closed" || + value === "closed_with_conflicts" + ) { return value; } throw new Error(`Session status "${String(value)}" is not supported.`); @@ -73,12 +130,36 @@ function toSessionMetadata(value: unknown): SessionMetadata { } async function runGit(args: string[]): Promise { + const result = await runGitWithResult(args); + if (result.exitCode !== 0) { + throw new Error(`git ${args.join(" ")} failed: ${result.stderr || result.stdout || "unknown git error"}`); + } + return result.stdout.trim(); +} + +async function runGitWithResult(args: string[]): Promise { try { - const { stdout } = await execFileAsync("git", args, { + const { stdout, stderr } = await execFileAsync("git", args, { encoding: "utf8", }); - return stdout.trim(); + return { + exitCode: 0, + stdout: stdout.trim(), + stderr: stderr.trim(), + }; } catch (error) { + const failure = error as { + code?: number | string; + stdout?: string; + stderr?: string; + }; + if (typeof failure.code === "number") { + return { + exitCode: failure.code, + stdout: String(failure.stdout ?? "").trim(), + stderr: String(failure.stderr ?? "").trim(), + }; + } throw new Error(`git ${args.join(" ")} failed: ${toErrorMessage(error)}`); } } @@ -105,6 +186,18 @@ function sanitizeSegment(value: string, fallback: string): string { return normalized || fallback; } +function toGitFailureMessage(result: GitExecutionResult): string { + const details = result.stderr || result.stdout || "unknown git error"; + return `git command failed with exit code ${String(result.exitCode)}: ${details}`; +} + +function toStringLines(value: string): string[] { + return value + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0); +} + export class FileSystemSessionMetadataStore { private readonly stateRoot: string; @@ -306,58 +399,227 @@ export class SessionWorktreeManager { taskId: string; baseWorkspacePath: string; taskWorktreePath: string; - }): Promise { + }): Promise { const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath"); const taskWorktreePath = assertAbsolutePath(input.taskWorktreePath, "taskWorktreePath"); + const taskId = input.taskId; - await runGit(["-C", taskWorktreePath, "add", "-A"]); - const hasPending = await this.hasStagedChanges(taskWorktreePath); - if (hasPending) { - await runGit([ - "-C", - taskWorktreePath, - "commit", - "-m", - `ai_ops: finalize task ${input.taskId}`, - ]); + if (!(await pathExists(baseWorkspacePath))) { + throw new Error(`Base workspace "${baseWorkspacePath}" does not exist.`); + } + if (!(await pathExists(taskWorktreePath))) { + throw new Error(`Task worktree "${taskWorktreePath}" does not exist.`); } - const branchName = await runGit(["-C", taskWorktreePath, "rev-parse", "--abbrev-ref", "HEAD"]); - await runGit(["-C", baseWorkspacePath, "merge", "--no-ff", "--no-edit", branchName]); - await this.removeWorktree({ - repoPath: baseWorkspacePath, - worktreePath: taskWorktreePath, - }); - } + let mergeBase: string | undefined; + try { + await runGit(["-C", taskWorktreePath, "add", "-A"]); + const hasPending = await this.hasStagedChanges(taskWorktreePath); + if (hasPending) { + await runGit(["-C", taskWorktreePath, "commit", "-m", `ai_ops: finalize task ${taskId}`]); + } - async closeSession(input: { - session: SessionMetadata; - taskWorktreePaths: string[]; - mergeBaseIntoProject?: boolean; - }): Promise { - const projectPath = assertAbsolutePath(input.session.projectPath, "projectPath"); - const baseWorkspacePath = assertAbsolutePath(input.session.baseWorkspacePath, "baseWorkspacePath"); + const branchName = await runGit(["-C", taskWorktreePath, "rev-parse", "--abbrev-ref", "HEAD"]); + const baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]); + mergeBase = await this.tryReadMergeBase(baseWorkspacePath, baseBranch, branchName); - for (const taskWorktreePath of input.taskWorktreePaths) { - if (!taskWorktreePath.trim()) { - continue; + if (await this.hasOngoingMerge(taskWorktreePath)) { + return { + kind: "conflict", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(taskWorktreePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + const syncTaskBranch = await runGitWithResult([ + "-C", + taskWorktreePath, + "merge", + "--no-ff", + "--no-edit", + baseBranch, + ]); + + if (syncTaskBranch.exitCode === 1) { + return { + kind: "conflict", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(taskWorktreePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + if (syncTaskBranch.exitCode !== 0) { + return { + kind: "fatal_error", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + error: toGitFailureMessage(syncTaskBranch), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + if (await this.hasOngoingMerge(baseWorkspacePath)) { + return { + kind: "conflict", + taskId, + worktreePath: baseWorkspacePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(baseWorkspacePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + const mergeIntoBase = await runGitWithResult([ + "-C", + baseWorkspacePath, + "merge", + "--no-ff", + "--no-edit", + branchName, + ]); + + if (mergeIntoBase.exitCode === 1) { + return { + kind: "conflict", + taskId, + worktreePath: baseWorkspacePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(baseWorkspacePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + if (mergeIntoBase.exitCode !== 0) { + return { + kind: "fatal_error", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + error: toGitFailureMessage(mergeIntoBase), + ...(mergeBase ? { mergeBase } : {}), + }; } await this.removeWorktree({ repoPath: baseWorkspacePath, worktreePath: taskWorktreePath, }); + + return { + kind: "success", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + }; + } catch (error) { + return { + kind: "fatal_error", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + error: toErrorMessage(error), + ...(mergeBase ? { mergeBase } : {}), + }; + } + } + + async closeSession(input: { + session: SessionMetadata; + taskWorktreePaths: string[]; + mergeBaseIntoProject?: boolean; + }): Promise { + const projectPath = assertAbsolutePath(input.session.projectPath, "projectPath"); + const baseWorkspacePath = assertAbsolutePath(input.session.baseWorkspacePath, "baseWorkspacePath"); + if (!(await pathExists(projectPath))) { + throw new Error(`Project path "${projectPath}" does not exist.`); + } + if (!(await pathExists(baseWorkspacePath))) { + throw new Error(`Base workspace "${baseWorkspacePath}" does not exist.`); } - if (input.mergeBaseIntoProject) { - const baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]); - await runGit(["-C", projectPath, "merge", "--no-ff", "--no-edit", baseBranch]); - } + let baseBranch: string | undefined; + let mergeBase: string | undefined; - await this.removeWorktree({ - repoPath: projectPath, - worktreePath: baseWorkspacePath, - }); + try { + for (const taskWorktreePath of input.taskWorktreePaths) { + if (!taskWorktreePath.trim()) { + continue; + } + + await this.removeWorktree({ + repoPath: baseWorkspacePath, + worktreePath: taskWorktreePath, + }); + } + + if (input.mergeBaseIntoProject) { + baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]); + mergeBase = await this.tryReadMergeBase(projectPath, "HEAD", baseBranch); + + if (await this.hasOngoingMerge(projectPath)) { + return { + kind: "conflict", + sessionId: input.session.sessionId, + worktreePath: projectPath, + conflictFiles: await this.readConflictFiles(projectPath), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + const mergeResult = await runGitWithResult([ + "-C", + projectPath, + "merge", + "--no-ff", + "--no-edit", + baseBranch, + ]); + if (mergeResult.exitCode === 1) { + return { + kind: "conflict", + sessionId: input.session.sessionId, + worktreePath: projectPath, + conflictFiles: await this.readConflictFiles(projectPath), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } + if (mergeResult.exitCode !== 0) { + return { + kind: "fatal_error", + sessionId: input.session.sessionId, + error: toGitFailureMessage(mergeResult), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } + } + + await this.removeWorktree({ + repoPath: projectPath, + worktreePath: baseWorkspacePath, + }); + + return { + kind: "success", + sessionId: input.session.sessionId, + mergedToProject: input.mergeBaseIntoProject === true, + }; + } catch (error) { + return { + kind: "fatal_error", + sessionId: input.session.sessionId, + error: toErrorMessage(error), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } } private async removeWorktree(input: { @@ -386,4 +648,43 @@ export class SessionWorktreeManager { throw new Error(`Unable to inspect staged changes: ${toErrorMessage(error)}`); } } + + private async hasOngoingMerge(worktreePath: string): Promise { + const result = await runGitWithResult([ + "-C", + worktreePath, + "rev-parse", + "-q", + "--verify", + "MERGE_HEAD", + ]); + return result.exitCode === 0; + } + + private async readConflictFiles(worktreePath: string): Promise { + const result = await runGitWithResult([ + "-C", + worktreePath, + "diff", + "--name-only", + "--diff-filter=U", + ]); + if (result.exitCode !== 0) { + return []; + } + return toStringLines(result.stdout); + } + + private async tryReadMergeBase( + repoPath: string, + leftRef: string, + rightRef: string, + ): Promise { + const result = await runGitWithResult(["-C", repoPath, "merge-base", leftRef, rightRef]); + if (result.exitCode !== 0) { + return undefined; + } + const mergeBase = result.stdout.trim(); + return mergeBase || undefined; + } } diff --git a/src/config.ts b/src/config.ts index a914e7b..1c383b9 100644 --- a/src/config.ts +++ b/src/config.ts @@ -30,6 +30,7 @@ export type OrchestrationRuntimeConfig = { maxDepth: number; maxRetries: number; maxChildren: number; + mergeConflictMaxAttempts: number; }; export type DiscoveryRuntimeConfig = { @@ -77,6 +78,7 @@ const DEFAULT_ORCHESTRATION: OrchestrationRuntimeConfig = { maxDepth: 4, maxRetries: 2, maxChildren: 4, + mergeConflictMaxAttempts: 2, }; const DEFAULT_PROVISIONING: BuiltInProvisioningConfig = { @@ -411,6 +413,12 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly task.worktreePath) .filter((path): path is string => typeof path === "string" && path.trim().length > 0); - await runtime.worktreeManager.closeSession({ + const outcome = await runtime.worktreeManager.closeSession({ session, taskWorktreePaths, mergeBaseIntoProject: input.mergeToProject === true, }); + if (outcome.kind === "fatal_error") { + throw new Error(`Session close failed: ${outcome.error}`); + } + + if (outcome.kind === "conflict") { + return runtime.sessionStore.updateSession(session.sessionId, { + sessionStatus: "closed_with_conflicts", + }); + } + return runtime.sessionStore.updateSession(session.sessionId, { sessionStatus: "closed", }); @@ -448,7 +458,10 @@ export class UiRunService { if (input.sessionId?.trim() && !session) { throw new Error(`Session \"${sessionId}\" does not exist.`); } - if (session && session.sessionStatus === "closed") { + if ( + session && + (session.sessionStatus === "closed" || session.sessionStatus === "closed_with_conflicts") + ) { throw new Error(`Session \"${sessionId}\" is closed and cannot run new tasks.`); } const runId = randomUUID(); diff --git a/tests/config.test.ts b/tests/config.test.ts index fc74afa..1de9677 100644 --- a/tests/config.test.ts +++ b/tests/config.test.ts @@ -12,6 +12,7 @@ test("loads defaults and freezes config", () => { assert.equal(config.agentManager.maxConcurrentAgents, 4); assert.equal(config.orchestration.maxDepth, 4); + assert.equal(config.orchestration.mergeConflictMaxAttempts, 2); assert.equal(config.provisioning.portRange.basePort, 36000); assert.equal(config.discovery.fileRelativePath, ".agent-context/resources.json"); assert.equal(config.security.violationHandling, "hard_abort"); @@ -127,3 +128,10 @@ test("validates AGENT_WORKTREE_TARGET_PATH against parent traversal", () => { /must not contain "\.\." path segments/, ); }); + +test("validates AGENT_MERGE_CONFLICT_MAX_ATTEMPTS bounds", () => { + assert.throws( + () => loadConfig({ AGENT_MERGE_CONFLICT_MAX_ATTEMPTS: "0" }), + /AGENT_MERGE_CONFLICT_MAX_ATTEMPTS must be an integer >= 1/, + ); +}); diff --git a/tests/project-context.test.ts b/tests/project-context.test.ts index 3754e2b..c6445fc 100644 --- a/tests/project-context.test.ts +++ b/tests/project-context.test.ts @@ -62,6 +62,35 @@ test("project context store reads defaults and applies domain patches", async () assert.equal(updated.schemaVersion, 1); }); +test("project context accepts conflict-aware task statuses", async () => { + const root = await mkdtemp(resolve(tmpdir(), "ai-ops-project-context-conflict-")); + const store = new FileSystemProjectContextStore({ + filePath: resolve(root, "project-context.json"), + }); + + const updated = await store.patchState({ + upsertTasks: [ + { + taskId: "task-conflict", + id: "task-conflict", + title: "Resolve merge conflict", + status: "conflict", + }, + { + taskId: "task-resolving", + id: "task-resolving", + title: "Retry merge", + status: "resolving_conflict", + }, + ], + }); + + assert.deepEqual( + updated.taskQueue.map((task) => `${task.taskId}:${task.status}`), + ["task-conflict:conflict", "task-resolving:resolving_conflict"], + ); +}); + test("project context parser merges missing root keys with defaults", async () => { const root = await mkdtemp(resolve(tmpdir(), "ai-ops-project-context-")); const filePath = resolve(root, "project-context.json"); diff --git a/tests/run-service.test.ts b/tests/run-service.test.ts index 52e38bd..6635db3 100644 --- a/tests/run-service.test.ts +++ b/tests/run-service.test.ts @@ -184,3 +184,54 @@ test("run service creates, runs, and closes explicit sessions", async () => { code: "ENOENT", }); }); + +test("run service marks session closed_with_conflicts when close merge conflicts", async () => { + const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-run-service-close-conflict-")); + const stateRoot = resolve(workspaceRoot, "state"); + const envPath = resolve(workspaceRoot, ".env"); + const projectPath = resolve(workspaceRoot, "project"); + + await mkdir(projectPath, { recursive: true }); + await execFileAsync("git", ["init", projectPath], { encoding: "utf8" }); + await execFileAsync("git", ["-C", projectPath, "config", "user.name", "AI Ops"], { encoding: "utf8" }); + await execFileAsync("git", ["-C", projectPath, "config", "user.email", "ai-ops@example.local"], { encoding: "utf8" }); + await writeFile(resolve(projectPath, "README.md"), "base\n", "utf8"); + await execFileAsync("git", ["-C", projectPath, "add", "README.md"], { encoding: "utf8" }); + await execFileAsync("git", ["-C", projectPath, "commit", "-m", "initial"], { encoding: "utf8" }); + + await writeFile( + envPath, + [ + `AGENT_STATE_ROOT=${stateRoot}`, + "AGENT_WORKTREE_ROOT=.ai_ops/worktrees", + "AGENT_WORKTREE_BASE_REF=HEAD", + ].join("\n"), + "utf8", + ); + + const runService = new UiRunService({ + workspaceRoot, + envFilePath: ".env", + }); + + const createdSession = await runService.createSession({ + projectPath, + }); + + await writeFile(resolve(createdSession.baseWorkspacePath, "README.md"), "base branch update\n", "utf8"); + await execFileAsync("git", ["-C", createdSession.baseWorkspacePath, "add", "README.md"], { encoding: "utf8" }); + await execFileAsync("git", ["-C", createdSession.baseWorkspacePath, "commit", "-m", "base update"], { encoding: "utf8" }); + + await writeFile(resolve(projectPath, "README.md"), "project branch update\n", "utf8"); + await execFileAsync("git", ["-C", projectPath, "add", "README.md"], { encoding: "utf8" }); + await execFileAsync("git", ["-C", projectPath, "commit", "-m", "project update"], { encoding: "utf8" }); + + const closed = await runService.closeSession({ + sessionId: createdSession.sessionId, + mergeToProject: true, + }); + + assert.equal(closed.sessionStatus, "closed_with_conflicts"); + const baseWorkspaceStats = await stat(createdSession.baseWorkspacePath); + assert.equal(baseWorkspaceStats.isDirectory(), true); +}); diff --git a/tests/session-lifecycle.test.ts b/tests/session-lifecycle.test.ts index 7f253fb..737a8f6 100644 --- a/tests/session-lifecycle.test.ts +++ b/tests/session-lifecycle.test.ts @@ -44,6 +44,11 @@ test("session metadata store persists and updates session metadata", async () => const readBack = await store.readSession("session-abc"); assert.equal(readBack?.sessionStatus, "closed"); + + const closedWithConflicts = await store.updateSession("session-abc", { + sessionStatus: "closed_with_conflicts", + }); + assert.equal(closedWithConflicts.sessionStatus, "closed_with_conflicts"); }); test("session worktree manager provisions and merges task worktrees", async () => { @@ -86,11 +91,12 @@ test("session worktree manager provisions and merges task worktrees", async () = await writeFile(resolve(taskWorktreePath, "feature.txt"), "task output\n", "utf8"); - await manager.mergeTaskIntoBase({ + const mergeOutcome = await manager.mergeTaskIntoBase({ taskId: "task-1", baseWorkspacePath, taskWorktreePath, }); + assert.equal(mergeOutcome.kind, "success"); const mergedFile = await readFile(resolve(baseWorkspacePath, "feature.txt"), "utf8"); assert.equal(mergedFile, "task output\n"); @@ -104,13 +110,70 @@ test("session worktree manager provisions and merges task worktrees", async () = updatedAt: new Date().toISOString(), }; - await manager.closeSession({ + const closeOutcome = await manager.closeSession({ session, taskWorktreePaths: [], mergeBaseIntoProject: false, }); + assert.equal(closeOutcome.kind, "success"); await assert.rejects(() => stat(baseWorkspacePath), { code: "ENOENT", }); }); + +test("session worktree manager returns conflict outcome instead of throwing", async () => { + const root = await mkdtemp(resolve(tmpdir(), "ai-ops-session-worktree-conflict-")); + const projectPath = resolve(root, "project"); + const worktreeRoot = resolve(root, "worktrees"); + + await mkdir(projectPath, { recursive: true }); + await git(["init", projectPath]); + await git(["-C", projectPath, "config", "user.name", "AI Ops"]); + await git(["-C", projectPath, "config", "user.email", "ai-ops@example.local"]); + await writeFile(resolve(projectPath, "README.md"), "base\n", "utf8"); + await git(["-C", projectPath, "add", "README.md"]); + await git(["-C", projectPath, "commit", "-m", "initial commit"]); + + const manager = new SessionWorktreeManager({ + worktreeRoot, + baseRef: "HEAD", + }); + + const sessionId = "session-conflict-1"; + const baseWorkspacePath = manager.resolveBaseWorkspacePath(sessionId); + + await manager.initializeSessionBaseWorkspace({ + sessionId, + projectPath, + baseWorkspacePath, + }); + + const taskWorktreePath = ( + await manager.ensureTaskWorktree({ + sessionId, + taskId: "task-conflict", + baseWorkspacePath, + }) + ).taskWorktreePath; + + await writeFile(resolve(baseWorkspacePath, "README.md"), "base branch change\n", "utf8"); + await git(["-C", baseWorkspacePath, "add", "README.md"]); + await git(["-C", baseWorkspacePath, "commit", "-m", "base update"]); + + await writeFile(resolve(taskWorktreePath, "README.md"), "task branch change\n", "utf8"); + + const mergeOutcome = await manager.mergeTaskIntoBase({ + taskId: "task-conflict", + baseWorkspacePath, + taskWorktreePath, + }); + + assert.equal(mergeOutcome.kind, "conflict"); + if (mergeOutcome.kind !== "conflict") { + throw new Error("Expected merge conflict outcome."); + } + assert.equal(mergeOutcome.taskId, "task-conflict"); + assert.equal(mergeOutcome.worktreePath, taskWorktreePath); + assert.ok(mergeOutcome.conflictFiles.includes("README.md")); +});